私はteradataからデータを取得しようとしており、次にprefectタスクを使用して寄木細工のファイルに読み書きしています
def fetch_data(host,db_name,user,password,query):
'logic'
@task(name="Write dask dataframe into GPFS parquet file")
def write_data(dask_dataframe,file_name):
'logic'
@task(name="Read data from GPFS parquet file into dask dataframe")
def read_data(file_name):
'logic'
with Flow("Teradata Example") as flow:
result = fetch_data(host,db_name,user,password,query)
write_data(dask_dataframe=result,file_name=file_name)
read_data(file_name=file_name)
flow.run()
しかし、同じフェッチャー コードがタスクとして実行されている場合、コードは失敗します。
@task(name="Fetch sql query data from teradta data source into dask dataframe")
def fetch_data(host,db_name,user,password,query):
'logic'
@task(name="Write dask dataframe into GPFS parquet file")
def write_data(dask_dataframe,file_name):
'logic'
@task(name="Read data from GPFS parquet file into dask dataframe")
def read_data(file_name):
'logic'
with Flow("Teradata Example") as flow:
result = fetch_data(host,db_name,user,password,query)
write_data(dask_dataframe=result,file_name=file_name)
read_data(file_name=file_name)
flow.run()
次の Teradata ファイル コードを追加しました。
def get_partitions(num_partitions):
list_range =[]
initial_start=0
for i in range(num_partitions):
amp_range = 3240//num_partitions
start = (i*amp_range+1)*initial_start
end = (i+1)*amp_range
list_range.append((start,end))
initial_start = 1
return list_range
@delayed
def load(query,start,end,connString):
return pd.read_sql(query.format(start, end),connString)
class TeradataFetch(Task):
def __init__(
args)
@defaults_from_attrs("fetch", "fetch_count", "query", "commit", "charset")
def run(
self,
query: str,
) -> Any:
try:
results = from_delayed([load(query,start, end,connString) for start,end in get_partitions(self.num_partitions)])
logging.debug("Fetch Results: %s", results)
return results
except Exception as e:
raise e
誰かがここで提案/助けてくれますか?