1

編集:答えは、私がすべてのメモリを消費していたため、OSがプロセスを軸にしているということでした

コアで平均負荷を1:1に保つのに十分なサブプロセスを生成していますが、1時間以内のある時点で、このスクリプトは数日間実行される可能性があり、3つのプロセスが実行されます。

tipu   14804  0.0  0.0 328776   428 pts/1    Sl   00:20   0:00 python run.py
tipu   14808 64.4 24.1 2163796 1848156 pts/1 Rl   00:20  44:41 python run.py
tipu   14809  8.2  0.0      0     0 pts/1    Z    00:20   5:43 [python] <defunct>
tipu   14810 60.3 24.3 2180308 1864664 pts/1 Rl   00:20  41:49 python run.py
tipu   14811 20.2  0.0      0     0 pts/1    Z    00:20  14:04 [python] <defunct>
tipu   14812 22.0  0.0      0     0 pts/1    Z    00:20  15:18 [python] <defunct>
tipu   15358  0.0  0.0 103292   872 pts/1    S+   01:30   0:00 grep python

なぜこれが起こっているのか分かりません。マスターとスレーブが接続されています。必要に応じてmysql/pgラッパーを添付することもできますが、何か提案はありますか?

slave.py

from boto.s3.key import Key
import multiprocessing
import gzip
import os
from  mysql_wrapper import MySQLWrap
from pgsql_wrapper import PGSQLWrap
import boto
import re

class Slave:

    CHUNKS = 250000

    BUCKET_NAME = "bucket"
    AWS_ACCESS_KEY = ""
    AWS_ACCESS_SECRET = ""
    KEY = Key(boto.connect_s3(AWS_ACCESS_KEY, AWS_ACCESS_SECRET).get_bucket(BUCKET_NAME))
    S3_ROOT = "redshift_data_imports"
    COLUMN_CACHE = {}
    DEFAULT_COLUMN_VALUES = {}

    def __init__(self, job_queue):
        self.log_handler = open("logs/%s" % str(multiprocessing.current_process().name), "a");
        self.mysql = MySQLWrap(self.log_handler)
        self.pg = PGSQLWrap(self.log_handler)
        self.job_queue = job_queue


    def do_work(self):
        self.log(str(os.getpid()))
        while True:

            #sample job in the abstract: mysql_db.table_with_date-iteration
            job = self.job_queue.get()

            #queue is empty
            if job is None:
                self.log_handler.close()
                self.pg.close()
                self.mysql.close()
                print("good bye and good day from %d" % (os.getpid()))
                self.job_queue.task_done()
                break

            #curtail iteration
            table = job.split('-')[0]

            #strip redshift table from job name
            redshift_table = re.sub(r"(_[1-9].*)", "", table.split(".")[1])

            iteration = int(job.split("-")[1])
            offset = (iteration - 1) * self.CHUNKS

            #columns redshift is expecting
            #bad tables will slip through and error out, so we catch it
            try:
                colnames = self.COLUMN_CACHE[redshift_table]
            except KeyError:
                self.job_queue.task_done()
                continue

            #mysql fields to use in SELECT statement
            fields = self.get_fields(table)

            #list subtraction determining which columns redshift has that mysql does not
            delta = (list(set(colnames) - set(fields.keys())))

            #subtract columns that have a default value and so do not need padding
            if delta:
                delta = list(set(delta) - set(self.DEFAULT_COLUMN_VALUES[redshift_table]))

            #concatinate columns with padded \N
            select_fields = ",".join(fields.values()) + (",\\N" * len(delta))

            query = "SELECT %s FROM %s LIMIT %d, %d" % (select_fields, table,
                    offset, self.CHUNKS)

            rows = self.mysql.execute(query)

            self.log("%s: %s\n" % (table, len(rows)))

            if not rows:
                self.job_queue.task_done()
                continue

            #if there is more data potentially, add it to the queue
            if len(rows) == self.CHUNKS:
                self.log("putting %s-%s" % (table, (iteration+1)))
                self.job_queue.put("%s-%s" % (table, (iteration+1)))

            #various characters need escaping
            clean_rows = []
            redshift_escape_chars = set( ["\\", "|", "\t", "\r", "\n"] )
            in_chars = ""

            for row in rows:
                new_row = []
                for value in row:
                    if value is not None:
                        in_chars = str(value)
                    else:
                        in_chars = ""

                    #escape any naughty characters
                    new_row.append("".join(["\\" + c if c in redshift_escape_chars else c for c in in_chars]))
                new_row = "\t".join(new_row)
                clean_rows.append(new_row)

            rows = ",".join(fields.keys() + delta)
            rows += "\n" + "\n".join(clean_rows)

            offset = offset + self.CHUNKS

            filename = "%s-%s.gz" % (table, iteration) 
            self.move_file_to_s3(filename, rows)

            self.begin_data_import(job, redshift_table, ",".join(fields.keys() +
               delta))

            self.job_queue.task_done()


    def move_file_to_s3(self, uri, contents):

        tmp_file = "/dev/shm/%s" % str(os.getpid())

        self.KEY.key = "%s/%s" % (self.S3_ROOT, uri)
        self.log("key is %s" % self.KEY.key )

        f = gzip.open(tmp_file, "wb")
        f.write(contents)
        f.close()

        #local saving allows for debugging when copy commands fail
        #text_file = open("tsv/%s" % uri, "w")
        #text_file.write(contents)
        #text_file.close()

        self.KEY.set_contents_from_filename(tmp_file, replace=True)

    def get_fields(self, table):
        """
            Returns a dict used as: 
                {"column_name": "altered_column_name"}
            Currently only the debug column gets altered
        """
        exclude_fields = ["_qproc_id", "_mob_id", "_gw_id", "_batch_id", "Field"]

        query = "show columns from %s" % (table)
        fields = self.mysql.execute(query)

        #key raw field, value mysql formatted field
        new_fields = {}

        #for field in fields:
        for field in [val[0] for val in fields]:
            if field in exclude_fields:
                continue
            old_field = field

            if "debug_mode" == field.strip():
                field = "IFNULL(debug_mode, 0)"

            new_fields[old_field] = field

        return new_fields

    def log(self, text):
        self.log_handler.write("\n%s" % text)

    def begin_data_import(self, table, redshift_table, fields):
        query = "copy %s (%s) from 's3://bucket/redshift_data_imports/%s' \
            credentials 'aws_access_key_id=%s;aws_secret_access_key=%s' delimiter '\\t' \
            gzip NULL AS '' COMPUPDATE ON ESCAPE IGNOREHEADER 1;" \
            % (redshift_table, fields, table, self.AWS_ACCESS_KEY, self.AWS_ACCESS_SECRET)
        self.pg.execute(query)

master.py

from slave import Slave as Slave 
import multiprocessing
from mysql_wrapper import MySQLWrap as MySQLWrap
from pgsql_wrapper import PGSQLWrap as PGSQLWrap


class Master:

    SLAVE_COUNT = 5

    def __init__(self):
        self.mysql = MySQLWrap()
        self.pg = PGSQLWrap()

    def do_work(table):
        pass

    def get_table_listings(self):
        """Gathers a list of MySQL log tables needed to be imported"""

        query = 'show databases'
        result = self.mysql.execute(query)

        #turns list[tuple] into a flat list
        databases = list(sum(result, ()))

        #overriding during development
        databases = ['db1', 'db2', 'db3']]

        exclude = ('mysql', 'Database', 'information_schema')
        scannable_tables = []

        for database in databases:
            if database in exclude:
                continue

            query = "show tables from %s" % database
            result = self.mysql.execute(query)

            #turns list[tuple] into a flat list
            tables = list(sum(result, ()))

            for table in tables:
                exclude = ("Tables_in_%s" % database, "(", "201303", "detailed", "ltv")

                #exclude any of the unfavorables
                if any(s in table for s in exclude):
                    continue

                scannable_tables.append("%s.%s-1" % (database, table))

        return scannable_tables

    def init(self):
        #fetch redshift columns once and cache
        #get columns from redshift so we can pad the mysql column delta with nulls
        tables = ('table1', 'table2', 'table3')

        for table in tables:

            #cache columns
            query = "SELECT column_name FROM information_schema.columns WHERE \
            table_name = '%s'" % (table)
            result = self.pg.execute(query, async=False, ret=True)
            Slave.COLUMN_CACHE[table] = list(sum(result, ()))

            #cache default values
            query = "SELECT column_name FROM information_schema.columns WHERE \
            table_name = '%s' and column_default is not \
            null" % (table)

            result = self.pg.execute(query, async=False, ret=True)

            #turns list[tuple] into a flat list
            result = list(sum(result, ()))

            Slave.DEFAULT_COLUMN_VALUES[table] = result

    def run(self):
        self.init()

        job_queue = multiprocessing.JoinableQueue()
        tables = self.get_table_listings()
        for table in tables:
            job_queue.put(table)

        processes = []
        for i in range(Master.SLAVE_COUNT):
            process = multiprocessing.Process(target=slave_runner, args=(job_queue,))
            process.daemon = True
            process.start()
            processes.append(process)

        #blocks this process until queue reaches 0
        job_queue.join()

        #signal each child process to GTFO
        for i in range(Master.SLAVE_COUNT):
            job_queue.put(None)

        #blocks this process until queue reaches 0
        job_queue.join()

        job_queue.close()

        #do not end this process until child processes close out
        for process in processes:
            process.join()

        #toodles !
        print("this is master saying goodbye")


def slave_runner(queue):
    slave = Slave(queue)
    slave.do_work()
4

1 に答える 1

6

確かに十分な情報はありませんがSlave.do_work、未処理の例外が発生していることが問題である可能性が非常に高いです。(さまざまな異なる条件でそれを実行できるコード行が多数あります。)

これを行うと、子プロセスは終了します。

POSIX システムでは…詳細は少し複雑ですが、単純なケース (ここにあるもの) では、終了する子プロセスは、リープさ<defunct>れるまでプロセスとして残ります (親がそのwait上にあるため、または終了します)。親コードはキューが終了するまで子を待機しないため、まさにそれが起こります。

したがって、簡単なダクトテープの修正があります。

def do_work(self):
    self.log(str(os.getpid()))
    while True:
        try:
            # the rest of your code
        except Exception as e:
            self.log("something appropriate {}".format(e))
            # you may also want to post a reply back to the parent

tryまた、問題が発生する可能性のあるさまざまな段階をすべて区別できるように、大規模なものをさまざまなものに分割することもできます (特に、返信が必要であることを意味するものと、返信が必要でないことを意味するものがある場合)。


ただし、あなたがやろうとしていることは の動作を正確に複製しているように見えますがmultiprocessing.Pool、いくつかの場所でバーを逃しています。これは疑問を提起します: そもそもなぜ使用Poolしないのですか? mapその後、ファミリ メソッドの 1 つを使用して、物事をさらに単純化/最適化できます。たとえば、全体を次のMaster.runように減らすことができます。

self.init()
pool = multiprocessing.Pool(Master.SLAVE_COUNT, initializer=slave_setup)
pool.map(slave_job, tables)
pool.join()

これにより、例外が処理され、後で必要になった場合に値/例外を返すことができlogging、独自のライブラリを構築する代わりに組み込みライブラリを使用できるようになります。へのわずか数十行の小さなコード変更Slaveだけで済みます。


ジョブ内から新しいジョブを送信したい場合、これを行う最も簡単な方法は、おそらく -Futureベースの API を使用することです (これにより、将来の結果に焦点が当てられ、プール/エグゼキュータがそれらを提供する愚かなものになり、代わりにプールに焦点を当て、結果をそれが返すばかげたものにします)が、Pool同様にそれを行う方法は複数あります。たとえば、現在、各ジョブから何も返していないため、tables実行するリストを返すだけで済みます。これを行う方法を示す簡単な例を次に示します。

import multiprocessing

def foo(x):
    print(x, x**2)
    return list(range(x))

if __name__ == '__main__':
    pool = multiprocessing.Pool(2)
    jobs = [5]
    while jobs:
        jobs, oldjobs = [], jobs
        for job in oldjobs:
            jobs.extend(pool.apply(foo, [job]))
    pool.close()
    pool.join()

明らかに、ループ全体を、たとえば に供給されるリスト内包表記に置き換えることで、これを少し要約できますitertools.chain。また、「サブミッター」オブジェクトを各ジョブに渡し、それを返す代わりに追加することで、見栄えをよくすることができます。新しい仕事のリストなど。しかし、それがどれほど少ないかを示すために、できるだけ明確にしたかったのです。


いずれにしても、明示的なキューの方が理解しやすく、管理しやすいと思われる場合は、それを選択してください。multiprocessing.workerおよび/またはのソースを見て、concurrent.futures.ProcessPoolExecutor自分で何をする必要があるかを確認してください。それほど難しいことではありませんが、間違っている可能性があることは十分にあります (個人的には、このようなことを自分でやろうとすると、常に少なくとも 1 つのエッジ ケースを忘れてしまいます)、コードを見て正しく理解するのはうまくいきます。


あるいは、ここで使用できない唯一の理由は、プロセスごとの状態 ( 、など)concurrent.futures.ProcessPoolExecutorを初期化する必要があるためのようです。これは、おそらく非常に適切なキャッシングの理由によるものです。(これに Web サービス クエリやデータベース接続などが含まれる場合、タスクごとに 1 回は実行したくないでしょう!) しかし、これにはいくつかの方法があります。boto.s3.key.KeyMySqlWrap

しかし、文書化されていない関数をサブクラス化ProcessPoolExecutorしてオーバーライドし(その単純さについてはソース_adjust_process_countを参照してください)、setup 関数を渡すことができます。そして… 必要なのはそれだけです。

または、組み合わせて使用​​することもできます。fromをFuturefromで囲みます。concurrent.futuresAsyncResultmultiprocessing

于 2013-03-26T21:38:04.177 に答える