0

私はteradataからデータを取得しようとしており、次にprefectタスクを使用して寄木細工のファイルに読み書きしています

def fetch_data(host,db_name,user,password,query):
    'logic'

@task(name="Write dask dataframe into GPFS parquet file")
def write_data(dask_dataframe,file_name):
    'logic'

@task(name="Read data from GPFS parquet file into dask dataframe")
def read_data(file_name):
    'logic'

with Flow("Teradata Example") as flow:
    result = fetch_data(host,db_name,user,password,query)
    write_data(dask_dataframe=result,file_name=file_name)
    read_data(file_name=file_name)

flow.run()

しかし、同じフェッチャー コードがタスクとして実行されている場合、コードは失敗します。

@task(name="Fetch sql query data from teradta data source into dask dataframe")
def fetch_data(host,db_name,user,password,query):
    'logic'

@task(name="Write dask dataframe into GPFS parquet file")
def write_data(dask_dataframe,file_name):
    'logic'

@task(name="Read data from GPFS parquet file into dask dataframe")
def read_data(file_name):
    'logic'

with Flow("Teradata Example") as flow:
    result = fetch_data(host,db_name,user,password,query)
    write_data(dask_dataframe=result,file_name=file_name)
    read_data(file_name=file_name)

flow.run()

次の Teradata ファイル コードを追加しました。

def get_partitions(num_partitions):
    list_range =[]
    initial_start=0
    for i in range(num_partitions):
        amp_range = 3240//num_partitions
        start = (i*amp_range+1)*initial_start
        end   = (i+1)*amp_range
        list_range.append((start,end))
        initial_start = 1
    return list_range

@delayed
def load(query,start,end,connString):
    return pd.read_sql(query.format(start, end),connString)

class TeradataFetch(Task):
    def __init__(
        args)

    @defaults_from_attrs("fetch", "fetch_count", "query", "commit", "charset")
    def run(
        self,
        query: str,
    ) -> Any:
        try:
            results = from_delayed([load(query,start, end,connString) for start,end in get_partitions(self.num_partitions)])
            logging.debug("Fetch Results: %s", results)
            return results

        except Exception as e:
            raise e


誰かがここで提案/助けてくれますか?

4

0 に答える 0