10

割り当ての問題があり、SO コミュニティに、spark データフレーム (spark 3.1+ を使用) にこれを実装するための最良の方法を尋ねたいと思いました。最初に問題を説明してから、実装に移ります。

問題は次のとおりです。最大 N 個のタスクと最大 N 個の個人があります (この問題の場合、N=10)。各個人には、各タスクを実行するためのコストがかかります。最小コストは 0 ドル、最大コストは 10 ドルです。これは、いくつかの注意点があるハンガリーのアルゴリズムの問​​題のようなものです。

  1. 10 個未満のタスクおよび/または 10 個未満の個人が存在し、誰かにタスクが割り当てられなくても (またはタスクが個人に割り当てられなくても) よい場合があります。
  2. [より複雑なエッジケース/私が問題を抱えているもの] - リストには、フラグを持つタスクが 1 つある可能性がありますmultiTask=True(複数存在することはできず、存在しないmultiTask可能性もあります)。ワーカーのコストが multiTask よりも低い場合、ワーカーxは自動的に multiTask に割り当てられ、multiTask は最適化中に取得されたと見なされます。
    • いくつかの例を紹介します。この例では、マルチ タスクに割り当てられる x 値は 1 です。
      • 10 人中 1 人のワーカーの multiTask コストが 0.25 の場合、そのワーカーは multiTask に割り当てられ、残りの 9 人のワーカーは他の 9 つのタスクに割り当てられます。
      • 10 人のうち 2 人のワーカーのコストが multiTask で 1 未満の場合、その両方が multiTask に割り当てられ、残りの 9 つのタスクのうち 8 つに他の 8 人のワーカーが割り当てられます。1 つのタスクは誰にも割り当てられません。
      • 10 個のワーカーすべてのコストが multiTask で 1 未満の場合、それらすべてが multiTask に割り当てられます。これは非常にまれですが、可能です。
      • multiTask のコストが 1 未満のワーカーがいない場合、コストを最小限に抑えるために、最適化中に multiTask は 1 人の担当者にのみ割り当てられます。

Spark データフレームは次のようになります。: 簡単にするために、N=3 (3 タスク、3 人) の例を示しています。

from pyspark.sql import Row

rdd = spark.sparkContext.parallelize([
  Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=220, cost=1.50, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=110, cost=2.90, isMultiTask=True),
  Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=190, cost=0.80, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=220, cost=1.80, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=110, cost=0.90, isMultiTask=True),
  Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=190, cost=9.99, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=220, cost=1.20, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=110, cost=0.25, isMultiTask=True),
  Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=190, cost=4.99, isMultiTask=False)
])

df = spark.createDataFrame(rdd)

日付/場所のグループ化ごとにこの割り当ての問題を解決する必要があるため、日付/場所があることがわかります。これを解決するために、各ワーカーとタスクに ID に基づいて「インデックス」を割り当てdense_rank()、パンダ UDF を使用し、インデックスに基づいて N x N numpy 配列を設定し、linear_sum_assignment関数を呼び出すことで解決することを計画していました。ただし、マルチタスクでレイアウトした2番目のエッジケースのために、この計画がうまくいくとは思いません。

worker_order_window = Window.partitionBy("date", "locationId").orderBy("workerId")
task_order_window = Window.partitionBy("date", "locationId").orderBy("taskId")

# get the dense_rank because will use this to assign a worker ID an index for the np array for linear_sum_assignment
# dense_rank - 1 as arrays are 0 indexed
df = df.withColumn("worker_idx", dense_rank().over(worker_order_window) - 1) 
df = df.withColumn("task_idx", dense_rank().over(task_order_window) - 1)


def linear_assignment_udf(pandas_df: pd.DataFrame) -> pd.DataFrame:
  df_dict = pandas_df.to_dict('records')
  # in case there are less than N rows/columns
  N = max(pandas_df.shape[0], pandas_df.shape[1])
  arr = np.zeros((N,N))
  for row in df_dict: 
    # worker_idx will be the row number, task idx will be the col number
    worker_idx = row.get('worker_idx')
    task_idx = row.get('task_idx')
    arr[worker_idx][task_idx] = row.get('cost')
  rids, cids = linear_sum_assignment(n)
  
  return_list = []
  # now want to return a dataframe that says which task_idx a worker has 
  for r, c in zip(rids, cids):
    for d in df_dict: 
      if d.get('worker_idx') == r:
        d['task_assignment'] = c
        return_list.append(d)
  return pd.DataFrame(return_list)
      
  
  
schema = StructType.fromJson(df.schema.jsonValue()).add('task_assignment', 'integer')
df = df.groupBy("date", "locationId").applyInPandas(linear_assignment_udf, schema)

df = df.withColumn("isAssigned", when(col("task_assignment") == col("task_idx"), True).otherwise(False))

ご覧のとおり、このケースは multiTask をまったくカバーしていません。これを可能な限り最も効率的な方法で解決したいので、pandas udf や scipy に縛られません。

4

1 に答える 1