1

これは、クラスターを介したストリーム処理のためにノード化された処理にタプルを発行するための入力スパウトのコード スニペットです。問題は、 BlockingQueue が InterruptedException をスローしていることです。

private SpoutOutputCollector collector;
public BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>();

public boolean isDistributed() {
    return true;
    }


public void open(@SuppressWarnings("rawtypes") final Map conf, final TopologyContext context,
final SpoutOutputCollector collector) {

    this.collector=collector;

}

@Override
public void nextTuple() {


    try {
        //Utils.sleep(100);
        collector.emit(new Values("Single Temperature Reading", blockingQueue.take()));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }



}

public void readInputfile() throws IOException, InterruptedException{
    FileInputStream file = new FileInputStream("/home/529076/Desktop/Temperature");
    DataInputStream readDate=new DataInputStream(file);
    BufferedReader readText=new BufferedReader(new InputStreamReader(readDate));

    String line;
    String singleReading = null;
    while((line=readText.readLine())!=null){
         singleReading=line;
         blockingQueue.add(singleReading);

    }

}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("Single Temperature Reading"));
}

例外の説明は次のとおりです:---

java.lang.InterruptedException10930 [Thread-20] INFO backtype.storm.util - 非同期ループが中断されました!

at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:1961)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1996)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
at com.tcs.storm.test.InputStreamSpout.nextTuple(InputStreamSpout.java:65)
at backtype.storm.daemon.task$fn__3349$fn__3404.invoke(task.clj:413)

そして nextTuple(InputStreamSpout.java:65 は ------>

        collector.emit(new Values("Single Temperature Reading", blockingQueue.take()));

ありがとう

4

2 に答える 2

1

このエラーは、BlockingQueue が出力コレクターで初期化されていないことが原因です。

于 2012-07-30T11:30:40.090 に答える
-1

Storm では、prepare()/open() メソッドで Bolts/Spouts のフィールドを初期化することをお勧めします。

これは、トポロジのアップロード元のノードでボルト/スパウトが作成されるためです。BlockingQueue はそこに構築されます。その後、ボルト/スパウトはシリアライズされてワーカーノードに配布され、そこでデシリアライズされます。このシリアル化/逆シリアル化のプロセスでは、Bolt/Spout の構築時に設定されたフィールドのすべてのプロパティを保持できない可能性があります。prepare() または open() で初期化されたフィールドには、この問題はありません。

于 2013-01-28T18:47:05.937 に答える