0

実行中のビヘイビアを並行して処理するカスタム ランナーを使用して、ガーキン ベースのテスト スイートを実行するためにビヘイビアを使用します。

これはローカル (Windows 8.1) マシンで完全に機能し、os.environ.update を使用してサブプロセス内の環境変数を変更できます。

これは、Ubuntu 14.04 サーバーでは失敗し、実行する各テストのデータベース名と一致する環境変数を変更できません。私がやっていることのコードを取り除いたものは次のとおりです。

def create_database(name):
    #create a postgres database, this works.    
    return "our_test_database_%s" % name

def drop_database(name):
    #drop a postgres database, also works
    return name

def get_features():
    return [feature for feature in os.listdir(features) if feature.endswith(".feature")

def main():
    manager = multiprocessing.Manager()
    databases = manager.Queue()
    cpu_count = multiprocessing.cpu_count()

    for i in range(cpu_count):
        databases.put(create_database(str(i)))

    pool = multiprocessing.Pool(processes=cpu_count, maxtaskperchild=1)
    results = pool.map(run_test, (feature, databases for feature in features), chunksize=1)

    while database = databases.get_nowait():
        drop_database(database)

def run_test(feature, databases):
    database = databases.get(block=True)
    os.environ.update({
        'DATABASE_URL': database
    })

    config = behave.configuration.Configuration(("--no-logcapture", "--tags=~@skip", "-f", "plain", feature))
    runner = behave.runner.Runner(config)
    failed = runner.run()

    databases.put(database)

内部では、Flask アプリケーションのテストにデータベースを使用します。Flask は、実行時に設定された環境変数を見つけることができません。

編集:何が変わったのかわかりません。サーバーと私のマシンで同じバージョンの Python を使用しており、既知のすべての使用済みパッケージの同じバージョンを使用しています。環境変数が適切に更新されていないため、後のコードでアクセスできません。

4

2 に答える 2

1

キーワード引数 toを使用して、すべてのワーカーに法線を渡すことで、 a の使用multiprocessing.Manager()を完全に避けることができます。initializermultiprocessing.Poolmultiprocessing.Queue

def main():
    databases = multiprocessing.Queue()
    cpu_count = multiprocessing.cpu_count()

    for i in range(cpu_count):
        databases.put(create_database(str(i)))

    pool = multiprocessing.Pool(processes=cpu_count, maxtasksperchild=1, 
                                initializer=init, initargs=(databases,))
    results = pool.map(run_test, features, chunksize=1)

    while database = databases.get_nowait():
        drop_database(database)

def init(dbs):
    global databases
    databases = dbs

def run_test(feature):
    database = databases.get(block=True)  # databases will be defined in the global namespace
    os.environ.update({
        'DATABASE_URL': database
    })

    config = behave.configuration.Configuration(("--no-logcapture", "--tags=~@skip", "-f", "plain", feature))
    runner = behave.runner.Runner(config)
    failed = runner.run()

これは、 で起こっていることに実際に対処するものではありませんがManager、問題を回避できます (これManagerが本当の根本原因であると仮定します)。

于 2014-08-26T01:16:04.033 に答える