1

大きな CSV 遠隔ファイルをダウンロードし、すべての行を受信時に MySQL にプッシュしたいと考えていcsv.readerます。遠隔ファイルを解析するために使用します。1000 のバッチで MySQL に行を追加します。

問題は、ピアとの接続が 5 分後にタイムアウトになり、ファイルを 1 分以内にダウンロードできても、MySQL へのプッシュにはそれ以上の時間がかかることです。

ピアとの接続が mySQL の制約を待たないように、ダウンロード ジョブとプッシュ ジョブを非同期で動作させる方法はありますか?

避けたい

  1. 不要な場合はメモリ内のファイル全体をダウンロードする
  2. 最初の行がダウンロードされるとすぐに mysql にプッシュし始めます
  3. 一時ファイルをいじる必要がある

基本的に、私は自分の python スクリプトにcurl file | my_script_that_pushes_values.sh.

これが私がしていることの実例です:

csvReader = csv.reader(distantfile)
valuesBuffer = []
for row in csvReader:
  valuesBuffer.append(getValues(row))
  if len(valuesBuffer) % 1000 = 0:
    pushValuesIntoMySQL(valuesBuffer)
    valuesBuffer = []
pushValuesIntoMySQL(valuesBuffer)
4

1 に答える 1

2

LOAD DATA LOCAL INFILEcsv入力をサポートしているため、ファイル全体をサーバーにコピーしてから使用します。

LOAD DATA INFILE 'data.txt' INTO TABLE tbl_name
  FIELDS TERMINATED BY ',' ENCLOSED BY '"'
  LINES TERMINATED BY '\r\n'
  IGNORE 1 LINES;

このソリューションが気に入らない場合は、自動mysql_ping()再接続に使用できます (使用しているコネクタでサポートされていることを願っています) 。

サーバーへの接続が機能しているかどうかを確認します。接続がダウンし、自動再接続が有効になっている場合、再接続が試行されます。接続がダウンしていて、自動再接続が無効になっている場合、mysql_ping() はエラーを返します。


また、ファイルをダウンロードできるが、MySQL の遅延が原因でタイムアウトになるという問題がある場合は、2 つのスレッドで実行し、同期することができqueueます。

# Prepare queue and end signaling handler
q = queue.Queue()
done = threading.Event()

# Function that fetches items from q and puts them into db after
# certain amount is reached
def store_db():
    items=[]

    # Until we set done
    while not done.is_set():
        try:
            # We may have 500 records and thread be done... prevent deadlock
            items.append(q.get(timeout=5))
            if len(items) > 1000:
                insert_into(items)
                items = []
            q.task_done()
         # If you wait longer then 5 seconds < exception
         except queue.Empty: pass

    if items:
        insert_into(items)

# Fetch all data in a loop
def continous_reading():
    # Fetch row
    q.put(row)

# Start storer thread
t = threading.Thread(target=store_db)
t.daemon = True
t.start()

continous_reading()
q.join() # Wait for all task to be processed
done.set() # Signal store_db that it can terminate
t.join() # to make sure the items buffer is stored into the db
于 2013-04-27T21:40:25.763 に答える