割り当ての問題があり、SO コミュニティに、spark データフレーム (spark 3.1+ を使用) にこれを実装するための最良の方法を尋ねたいと思いました。最初に問題を説明してから、実装に移ります。
問題は次のとおりです。最大 N 個のタスクと最大 N 個の個人があります (この問題の場合、N=10)。各個人には、各タスクを実行するためのコストがかかります。最小コストは 0 ドル、最大コストは 10 ドルです。これは、いくつかの注意点があるハンガリーのアルゴリズムの問題のようなものです。
- 10 個未満のタスクおよび/または 10 個未満の個人が存在し、誰かにタスクが割り当てられなくても (またはタスクが個人に割り当てられなくても) よい場合があります。
- [より複雑なエッジケース/私が問題を抱えているもの] - リストには、フラグを持つタスクが 1 つある可能性があります
multiTask=True
(複数存在することはできず、存在しないmultiTask
可能性もあります)。ワーカーのコストが multiTask よりも低い場合、ワーカーx
は自動的に multiTask に割り当てられ、multiTask は最適化中に取得されたと見なされます。- いくつかの例を紹介します。この例では、マルチ タスクに割り当てられる x 値は 1 です。
- 10 人中 1 人のワーカーの multiTask コストが 0.25 の場合、そのワーカーは multiTask に割り当てられ、残りの 9 人のワーカーは他の 9 つのタスクに割り当てられます。
- 10 人のうち 2 人のワーカーのコストが multiTask で 1 未満の場合、その両方が multiTask に割り当てられ、残りの 9 つのタスクのうち 8 つに他の 8 人のワーカーが割り当てられます。1 つのタスクは誰にも割り当てられません。
- 10 個のワーカーすべてのコストが multiTask で 1 未満の場合、それらすべてが multiTask に割り当てられます。これは非常にまれですが、可能です。
- multiTask のコストが 1 未満のワーカーがいない場合、コストを最小限に抑えるために、最適化中に multiTask は 1 人の担当者にのみ割り当てられます。
- いくつかの例を紹介します。この例では、マルチ タスクに割り当てられる x 値は 1 です。
Spark データフレームは次のようになります。注: 簡単にするために、N=3 (3 タスク、3 人) の例を示しています。
from pyspark.sql import Row
rdd = spark.sparkContext.parallelize([
Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=220, cost=1.50, isMultiTask=False),
Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=110, cost=2.90, isMultiTask=True),
Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=190, cost=0.80, isMultiTask=False),
Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=220, cost=1.80, isMultiTask=False),
Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=110, cost=0.90, isMultiTask=True),
Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=190, cost=9.99, isMultiTask=False),
Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=220, cost=1.20, isMultiTask=False),
Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=110, cost=0.25, isMultiTask=True),
Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=190, cost=4.99, isMultiTask=False)
])
df = spark.createDataFrame(rdd)
日付/場所のグループ化ごとにこの割り当ての問題を解決する必要があるため、日付/場所があることがわかります。これを解決するために、各ワーカーとタスクに ID に基づいて「インデックス」を割り当てdense_rank()
、パンダ UDF を使用し、インデックスに基づいて N x N numpy 配列を設定し、linear_sum_assignment
関数を呼び出すことで解決することを計画していました。ただし、マルチタスクでレイアウトした2番目のエッジケースのために、この計画がうまくいくとは思いません。
worker_order_window = Window.partitionBy("date", "locationId").orderBy("workerId")
task_order_window = Window.partitionBy("date", "locationId").orderBy("taskId")
# get the dense_rank because will use this to assign a worker ID an index for the np array for linear_sum_assignment
# dense_rank - 1 as arrays are 0 indexed
df = df.withColumn("worker_idx", dense_rank().over(worker_order_window) - 1)
df = df.withColumn("task_idx", dense_rank().over(task_order_window) - 1)
def linear_assignment_udf(pandas_df: pd.DataFrame) -> pd.DataFrame:
df_dict = pandas_df.to_dict('records')
# in case there are less than N rows/columns
N = max(pandas_df.shape[0], pandas_df.shape[1])
arr = np.zeros((N,N))
for row in df_dict:
# worker_idx will be the row number, task idx will be the col number
worker_idx = row.get('worker_idx')
task_idx = row.get('task_idx')
arr[worker_idx][task_idx] = row.get('cost')
rids, cids = linear_sum_assignment(n)
return_list = []
# now want to return a dataframe that says which task_idx a worker has
for r, c in zip(rids, cids):
for d in df_dict:
if d.get('worker_idx') == r:
d['task_assignment'] = c
return_list.append(d)
return pd.DataFrame(return_list)
schema = StructType.fromJson(df.schema.jsonValue()).add('task_assignment', 'integer')
df = df.groupBy("date", "locationId").applyInPandas(linear_assignment_udf, schema)
df = df.withColumn("isAssigned", when(col("task_assignment") == col("task_idx"), True).otherwise(False))
ご覧のとおり、このケースは multiTask をまったくカバーしていません。これを可能な限り最も効率的な方法で解決したいので、pandas udf や scipy に縛られません。