MapReduce を使用して多数のドキュメントを処理しようとしています。アイデアは、マッパーでファイルをドキュメントに分割し、リデューサー フェーズでスタンフォード coreNLP アノテーターを適用することです。
「tokenize、ssplit、pos、lemma、ner」のかなり単純な(標準)パイプラインがあり、レデューサーは、これらのアノテーターをレデューサーによって渡された値に適用し、アノテーションを返す関数を呼び出すだけです(文字列のリストとして) 、ただし、生成される出力はガベージです。
マッパー内からアノテーション関数を呼び出すと、ジョブが期待される出力を返すことを観察しましたが、それは並列処理全体に勝っています。また、レデューサーで取得した値を無視し、ダミー文字列にアノテーターを適用すると、ジョブは期待される出力を返します。
これはおそらく、プロセスにスレッド セーフの問題があることを示していますが、注釈関数が同期され、パイプラインがプライベート ファイナルである場所を特定できません。
誰かがこれを解決する方法についていくつかの指針を提供できますか?
-Angshu
編集:
これは私のレデューサーがどのように見えるかです。これがより明確になることを願っています
public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
while (values.hasNext()) {
output.collect(key, new Text(se.getExtracts(values.next().toString()).toString()));
}
}
}
これは抽出を取得するためのコードです。
final StanfordCoreNLP pipeline;
public instantiatePipeline(){
Properties props = new Properties();
props.put("annotators", "tokenize, ssplit, pos, lemma, ner");
}
synchronized List<String> getExtracts(String l){
Annotation document = new Annotation(l);
ArrayList<String> ret = new ArrayList<String>();
pipeline.annotate(document);
List<CoreMap> sentences = document.get(SentencesAnnotation.class);
int sid = 0;
for(CoreMap sentence:sentences){
sid++;
for(CoreLabel token: sentence.get(TokensAnnotation.class)){
String word = token.get(TextAnnotation.class);
String pos = token.get(PartOfSpeechAnnotation.class);
String ner = token.get(NamedEntityTagAnnotation.class);
String lemma = token.get(LemmaAnnotation.class);
Timex timex = token.get(TimeAnnotations.TimexAnnotation.class);
String ex = word+","+pos+","+ner+","+lemma;
if(timex!=null){
ex = ex+","+timex.tid();
}
else{
ex = ex+",";
}
ex = ex+","+sid;
ret.add(ex);
}
}