5

モデルを使用してバッチで予測を行い、Python で並行して行う必要があります。モデルをロードし、通常の for ループでデータ フレームを作成し、予測関数を使用すると、問題なく動作します。Python でマルチプロセッシングを使用してばらばらなデータ フレームを並列に作成し、予測関数を使用すると、for ループが無期限にフリーズします。動作が発生するのはなぜですか?

ここに私のコードのスニペットがあります:

with open('models/model_test.pkl', 'rb') as fin:
    pkl_bst = pickle.load(fin)

def predict_generator(X):

    df = X

    print(df.head())
    df = (df.groupby(['user_id']).recommender_items.apply(flat_map)
          .reset_index().drop('level_1', axis=1))
    df.columns = ['user_id', 'product_id']

    print('Merge Data')
    user_lookup = pd.read_csv('data/user_lookup.csv')
    product_lookup = pd.read_csv('data/product_lookup.csv')
    product_map = dict(zip(product_lookup.product_id, product_lookup.name))

    print(user_lookup.head())

    df = pd.merge(df, user_lookup, on=['user_id'])
    df = pd.merge(df, product_lookup, on=['product_id'])
    df = df.sort_values(['user_id', 'product_id'])

    users = df.user_id.values
    items = df.product_id.values
    df.drop(['user_id', 'product_id'], axis=1, inplace=True)

    print('Prediction Step')

    prediction = pkl_bst.predict(df, num_iteration=pkl_bst.best_iteration)
    print('Prediction Complete')

    validation = pd.DataFrame(zip(users, items, prediction),
                              columns=['user', 'item', 'prediction'])
    validation['name'] = (validation.item
                          .apply(lambda x: get_mapping(x, product_map)))
    validation = pd.DataFrame(zip(validation.user,
                              zip(validation.name,
                                  validation.prediction)),
                              columns=['user', 'prediction'])
    print(validation.head())

    def get_items(x):

        sorted_list = sorted(list(x), key=lambda i: i[1], reverse=True)[:20]
        sorted_list = random.sample(sorted_list, 10)
        return [k for k, _ in sorted_list]

    relevance = validation.groupby('user').prediction.apply(get_items)
    return relevance.reset_index()

これは機能しますが、非常に遅いです:

results = []
for d in df_list_sub:
    r = predict_generator(d)
    results.append(r)

これは壊れます:

from multiprocessing import Pool
import tqdm
pool = Pool(processes=8)
results = []
for x in tqdm.tqdm(pool.imap_unordered(predict_generator, df_list_sub), total=len(df_list_sub)):
    results.append(x)
    pass
pool.close()
pool.join()

誰かが私を助けてくれたらとてもありがたいです。

4

1 に答える 1