0

トレースバックのスナップショット

プロセス プール エグゼキューターと一緒に namedtuple を使用しようとしていますが、単体テストを作成するときに、以下のエラーが発生し続けます。コード内の dut は、2 つの関数が格納されている python ファイルを表し、単体テストから呼び出しています。

以下は私が持っているコードです

def cr8_dfs_frm_parq(file,DEBUG=False):
    """Function to query each parquet and create 2 dataframes from the parquet.
        1) Dataframe (pivot table) with ECID as index, testname as columns and result as data
        2) Dataframe containing 2 columns, test time and testname 
        Both dataframes are required for Lean coverage analysis in function Parallel_execution

    Args:
        file (string): Path where the current parquet file is stored 

    Returns:
        pvt_df(df): Pivot table dataframe with testname as rows and ECID as cols and result as data
        data_df(df): dataframe containing list of tests and associated test time 
    """
    data_df = Query().get_data(user='temp', collection_or_file=file, parallel=False, max_cpu=1)
    if DEBUG:
        print("Name of the parquet file queried was - {} and it's shape is {}".format(os.path.basename(file),data_df.shape))
    
    if(set(data_df.columns.to_list()) != set(['ecid', 'tname_tnum', 'result', 'test_time'])):
        data_df = pd.DataFrame()
        pvt_df = pd.DataFrame()
        #providing an error to the user that the current file might be corrupt or not in format needed for script to process and will proceed to the next parquet
        log.error('Parquet file {} does not contain 4 columns as expected, please check and provide the correct parquet file with below columns \n {} .'.format(os.path.basename(file),list(['ecid', 'tname_tnum', 'result', 'test_time'])))
        
        return pvt_df,data_df

    data_df.drop_duplicates(subset=['tname_tnum','ecid'],keep = 'last',inplace=True) 
    data_df.reset_index(drop=True,inplace=True)
    #extracting pvt table of ecid,results and testname for lean coverage analysis
    pvt_df = data_df.pivot(index='ecid',columns='tname_tnum',values='result')
    #dataframe containing only test time and testname for creating a dictionary later
    data_df = data_df[['tname_tnum','test_time']]
    data_df.drop_duplicates(subset=['tname_tnum','test_time'],keep = 'last',inplace=True)


    if DEBUG :
        print("Pivot table shape after processing parquet {} is - {}".format(os.path.basename(file),pvt_df.shape))
    dfs_pr_parq = dfs(pvt_df,data_df)
    return dfs_pr_parq
def cr8_dfs_parl(max_cpu,parqs_lst,DEBUG=False):
    """Function that queries parquets parallel and creates 2 dataframes for lean coverage analysis later
        1) Dataframe with ECID as index, testname as column and results as data 
        2) Dataframe with test_time and testname as columns to create a test_time look up dictionary

    Args:
        max_cpu (int): Number of CPU cores to be used in parallel
        parqs_lst (list): List of parquets that need to be merged for computing Jaccard similarity 

    Returns:
        pvt_df(df): Dataframe containing ECID's, testname and results 
        ts_tt_df_cnct(df): Dataframe containing testname and testtime
    """

    pvt_df_cncat = pd.DataFrame()
    ts_tt_df_cncat = pd.DataFrame()
     
    if not (sys.version_info.major == 3 and sys.version_info.minor >= 8):
        if DEBUG:
            print("Processing of parquets needs Python 3.8 or higher. Please upgrade your Python version from {}.{}.".format(sys.version_info.major, sys.version_info.minor))
            print("Exiting Script : will not reduce parquets until Python version has been upgraded to 3.8 or higher as indicated above")
        
        raise RuntimeError(" Pythonn version {}.{} is old, please upgrade to Python 3.8 or higher for the python script to be run".format(sys.version_info.major, sys.version_info.minor))

    with ProcessPoolExecutor(max_workers=max_cpu) as executor:
        all_task = []
        for file in parqs_lst:
            all_task.append(executor.submit(cr8_dfs_frm_parq,file,DEBUG))
        
        for future in as_completed(all_task):
            dfs_pr_parq = future.result()
            #concatenate parquets and remove duplicate ECID's
            pvt_df_cncat = pd.concat([pvt_df_cncat,dfs_pr_parq.pvt_df],axis=0,copy=False)
            pvt_df_cncat = pvt_df_cncat.iloc[np.unique( pvt_df_cncat.index.values, return_index = True )[1]]
            #concatenating test time to calculate mean time across different parquets
            #come back and fix this
            ts_tt_df_cncat = pd.concat([ts_tt_df_cncat,dfs_pr_parq.ts_tt_df],copy=False)
            if DEBUG:
                print('Pivot table shape currently is - {}'.format(pvt_df_cncat.shape))
                print('Test time dataframe shape currently is - {}'.format(ts_tt_df_cncat.shape))
    #returning a named tuple with 2 dataframes for easier parsing between functions
    cnct_dfs = dfs(pvt_df_cncat,ts_tt_df_cncat)
    if DEBUG:
        print('Pivot table with ECID as index final shape is - {}'.format(cnct_dfs.pvt_df.shape))
        print('Test time dataframe final shape is - {}'.format(cnct_dfs.ts_tt_df.shape))
    return cnct_dfs

これにより、以下に示すようにエラーが発生します Traceback (most recent call last): TypeError: cannot pickle '_io.TextIOWrapper' object

これで単体テストを実行すると、表示されるエラーが発生します

    def test04_cr8_dfs_parl(self):
        
        """ checks if execution of files happened in parallel and reported shape is as expected
        """
        
        #making a copy of old parquets to new directory to ensure that parquets are recent
        shutil.copytree(self.inp_dir,self.copy_dir,copy_function=shutil.copy)

        parqs_lst = dut.get_parquet_list(parquet_dir=self.copy_dir,no_of_weeks = 1)

        self.dfs =dut.cr8_dfs_parl(max_cpu=6,parqs_lst=parqs_lst,DEBUG=self.debug)

        # #verifying dataframe shape is correct
        # with self.subTest('shape1'):
        #     self.assertEqual(dut_dfs.pvt_df.shape, (5509, 2660))
        
        # with self.subTest('shape2'):
        #     self.assertEqual(dut_dfs.ts_tt_df.shape, (7570, 2))
        
        self.tearDownClass()

以下はスタック トレースです。

"""
Traceback (most recent call last):
  File "/usr/lib/python3.9/multiprocessing/queues.py", line 245, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_io.TextIOWrapper' object
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr2/nandeepd/PTE_CHAR/effective_coverage_tool/jaccard_similarity/test/test_jaccard_similarity.py", line 135, in test04_cr8_dfs_parl
    self.dfs =dut.cr8_dfs_parl(max_cpu=6,parqs_lst=parqs_lst,DEBUG=self.debug)
  File "/usr2/nandeepd/PTE_CHAR/effective_coverage_tool/jaccard_similarity/jaccard_similarity.py", line 163, in cr8_dfs_parl
    dfs_pr_parq = future.result()
  File "/usr/lib/python3.9/concurrent/futures/_base.py", line 438, in result
    return self.__get_result()
  File "/usr/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
    raise self._exception
  File "/usr/lib/python3.9/multiprocessing/queues.py", line 245, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_io.TextIOWrapper' object

----------------------------------------------------------------------
Ran 1 test in 0.181s
4

0 に答える 0