1

pyspark で以下のように (R の cbind のように) 2 つの RDD を組み合わせることができることを私は知っています:

rdd3 = rdd1.zip(rdd2)

pyspark の 2 つの Dstream に対して同じことを実行したいと考えています。それは可能ですか、それとも代替手段ですか?

実際、MLlib ランダムフォレスト モデルを使用して、スパーク ストリーミングを使用して予測しています。最後に、機能 Dstream と予測 Dstream を組み合わせて、さらに下流の処理を行いたいと考えています。

前もって感謝します。

-オベイド

4

1 に答える 1

2

結局、以下を使用しています。

トリックは、「ネイティブ python マップ」と「スパーク スプリーミング トランスフォーム」を使用することです。エレガントな方法ではないかもしれませんが、うまくいきます:)。

def predictScore(texts, modelRF):
    predictions = texts.map( lambda txt :  (txt , getFeatures(txt)) ).\
     map(lambda (txt, features) : (txt ,(features.split(','))) ).\
     map( lambda (txt, features) : (txt, ([float(i) for i in features])) ).\
     transform( lambda  rdd: sc.parallelize(\
       map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda (x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\
       )\
     )
    # in the transform operation: x=text and y=features
    # Return will be tuple of (score,'original text')
    return predictions

同じ問題に直面している誰かを助けることを願っています。誰かがより良いアイデアを持っている場合は、ここに投稿してください。

-オベイド

注:問題をsparkユーザーリストにも提出し、そこにも回答を投稿しました。

于 2016-05-31T05:36:09.693 に答える