プロセス プール エグゼキューターと一緒に namedtuple を使用しようとしていますが、単体テストを作成するときに、以下のエラーが発生し続けます。コード内の dut は、2 つの関数が格納されている python ファイルを表し、単体テストから呼び出しています。
以下は私が持っているコードです
def cr8_dfs_frm_parq(file,DEBUG=False):
"""Function to query each parquet and create 2 dataframes from the parquet.
1) Dataframe (pivot table) with ECID as index, testname as columns and result as data
2) Dataframe containing 2 columns, test time and testname
Both dataframes are required for Lean coverage analysis in function Parallel_execution
Args:
file (string): Path where the current parquet file is stored
Returns:
pvt_df(df): Pivot table dataframe with testname as rows and ECID as cols and result as data
data_df(df): dataframe containing list of tests and associated test time
"""
data_df = Query().get_data(user='temp', collection_or_file=file, parallel=False, max_cpu=1)
if DEBUG:
print("Name of the parquet file queried was - {} and it's shape is {}".format(os.path.basename(file),data_df.shape))
if(set(data_df.columns.to_list()) != set(['ecid', 'tname_tnum', 'result', 'test_time'])):
data_df = pd.DataFrame()
pvt_df = pd.DataFrame()
#providing an error to the user that the current file might be corrupt or not in format needed for script to process and will proceed to the next parquet
log.error('Parquet file {} does not contain 4 columns as expected, please check and provide the correct parquet file with below columns \n {} .'.format(os.path.basename(file),list(['ecid', 'tname_tnum', 'result', 'test_time'])))
return pvt_df,data_df
data_df.drop_duplicates(subset=['tname_tnum','ecid'],keep = 'last',inplace=True)
data_df.reset_index(drop=True,inplace=True)
#extracting pvt table of ecid,results and testname for lean coverage analysis
pvt_df = data_df.pivot(index='ecid',columns='tname_tnum',values='result')
#dataframe containing only test time and testname for creating a dictionary later
data_df = data_df[['tname_tnum','test_time']]
data_df.drop_duplicates(subset=['tname_tnum','test_time'],keep = 'last',inplace=True)
if DEBUG :
print("Pivot table shape after processing parquet {} is - {}".format(os.path.basename(file),pvt_df.shape))
dfs_pr_parq = dfs(pvt_df,data_df)
return dfs_pr_parq
def cr8_dfs_parl(max_cpu,parqs_lst,DEBUG=False):
"""Function that queries parquets parallel and creates 2 dataframes for lean coverage analysis later
1) Dataframe with ECID as index, testname as column and results as data
2) Dataframe with test_time and testname as columns to create a test_time look up dictionary
Args:
max_cpu (int): Number of CPU cores to be used in parallel
parqs_lst (list): List of parquets that need to be merged for computing Jaccard similarity
Returns:
pvt_df(df): Dataframe containing ECID's, testname and results
ts_tt_df_cnct(df): Dataframe containing testname and testtime
"""
pvt_df_cncat = pd.DataFrame()
ts_tt_df_cncat = pd.DataFrame()
if not (sys.version_info.major == 3 and sys.version_info.minor >= 8):
if DEBUG:
print("Processing of parquets needs Python 3.8 or higher. Please upgrade your Python version from {}.{}.".format(sys.version_info.major, sys.version_info.minor))
print("Exiting Script : will not reduce parquets until Python version has been upgraded to 3.8 or higher as indicated above")
raise RuntimeError(" Pythonn version {}.{} is old, please upgrade to Python 3.8 or higher for the python script to be run".format(sys.version_info.major, sys.version_info.minor))
with ProcessPoolExecutor(max_workers=max_cpu) as executor:
all_task = []
for file in parqs_lst:
all_task.append(executor.submit(cr8_dfs_frm_parq,file,DEBUG))
for future in as_completed(all_task):
dfs_pr_parq = future.result()
#concatenate parquets and remove duplicate ECID's
pvt_df_cncat = pd.concat([pvt_df_cncat,dfs_pr_parq.pvt_df],axis=0,copy=False)
pvt_df_cncat = pvt_df_cncat.iloc[np.unique( pvt_df_cncat.index.values, return_index = True )[1]]
#concatenating test time to calculate mean time across different parquets
#come back and fix this
ts_tt_df_cncat = pd.concat([ts_tt_df_cncat,dfs_pr_parq.ts_tt_df],copy=False)
if DEBUG:
print('Pivot table shape currently is - {}'.format(pvt_df_cncat.shape))
print('Test time dataframe shape currently is - {}'.format(ts_tt_df_cncat.shape))
#returning a named tuple with 2 dataframes for easier parsing between functions
cnct_dfs = dfs(pvt_df_cncat,ts_tt_df_cncat)
if DEBUG:
print('Pivot table with ECID as index final shape is - {}'.format(cnct_dfs.pvt_df.shape))
print('Test time dataframe final shape is - {}'.format(cnct_dfs.ts_tt_df.shape))
return cnct_dfs
これにより、以下に示すようにエラーが発生します Traceback (most recent call last): TypeError: cannot pickle '_io.TextIOWrapper' object
これで単体テストを実行すると、表示されるエラーが発生します
def test04_cr8_dfs_parl(self):
""" checks if execution of files happened in parallel and reported shape is as expected
"""
#making a copy of old parquets to new directory to ensure that parquets are recent
shutil.copytree(self.inp_dir,self.copy_dir,copy_function=shutil.copy)
parqs_lst = dut.get_parquet_list(parquet_dir=self.copy_dir,no_of_weeks = 1)
self.dfs =dut.cr8_dfs_parl(max_cpu=6,parqs_lst=parqs_lst,DEBUG=self.debug)
# #verifying dataframe shape is correct
# with self.subTest('shape1'):
# self.assertEqual(dut_dfs.pvt_df.shape, (5509, 2660))
# with self.subTest('shape2'):
# self.assertEqual(dut_dfs.ts_tt_df.shape, (7570, 2))
self.tearDownClass()
以下はスタック トレースです。
"""
Traceback (most recent call last):
File "/usr/lib/python3.9/multiprocessing/queues.py", line 245, in _feed
obj = _ForkingPickler.dumps(obj)
File "/usr/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_io.TextIOWrapper' object
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr2/nandeepd/PTE_CHAR/effective_coverage_tool/jaccard_similarity/test/test_jaccard_similarity.py", line 135, in test04_cr8_dfs_parl
self.dfs =dut.cr8_dfs_parl(max_cpu=6,parqs_lst=parqs_lst,DEBUG=self.debug)
File "/usr2/nandeepd/PTE_CHAR/effective_coverage_tool/jaccard_similarity/jaccard_similarity.py", line 163, in cr8_dfs_parl
dfs_pr_parq = future.result()
File "/usr/lib/python3.9/concurrent/futures/_base.py", line 438, in result
return self.__get_result()
File "/usr/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
raise self._exception
File "/usr/lib/python3.9/multiprocessing/queues.py", line 245, in _feed
obj = _ForkingPickler.dumps(obj)
File "/usr/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_io.TextIOWrapper' object
----------------------------------------------------------------------
Ran 1 test in 0.181s