これは私が以前に問題を解決しようとした方法です。基本的に、ここにあるようなプロデューサースレッドがあり、ファイルを読み取り、アイテムをキューに入れます。次に、キューから物事を読み取り、それらを処理するワーカースレッドがあります。コードは以下のとおりですが、基本的にはあなたがしていることと同じように見えます。私が見つけたのは、ディスクの読み取りに比べて、行ごとに実行する必要のある処理が非常に速いため、これによって速度がほとんど向上しないことです。あなたがしなければならない解析がかなり集中的であるか、チャンクがかなり大きい場合、あなたはこの方法でそれをすることをいくらかスピードアップするのを見ることができます。ただし、プロセスがIOバウンドであるため、それがごくわずかである場合は、パフォーマンスの向上をあまり期待しないでください。このような状況では、ディスクアクセスを並列化する必要がありますが、これは1台のマシンでは実際には実行できません。
public static LinkedBlockingQueue<Pair<String, String>> mappings;
public static final Pair<String, String> end =
new Pair<String, String>("END", "END");
public static AtomicBoolean done;
public static NpToEntityMapping mapping;
public static Set<String> attested_nps;
public static Set<Entity> possible_entities;
public static class ProducerThread implements Runnable {
private File f;
public ProducerThread(File f) {
this.f = f;
}
public void run() {
try {
BufferedReader reader = new BufferedReader(new FileReader(f));
String line;
while ((line = reader.readLine()) != null) {
String entities = reader.readLine();
String np = line.trim();
mappings.put(new Pair<String, String>(np, entities));
}
reader.close();
for (int i=0; i<num_threads; i++) {
mappings.put(end);
}
} catch (InterruptedException e) {
System.out.println("Producer thread interrupted");
} catch (IOException e) {
System.out.println("Producer thread threw IOException");
}
}
}
public static class WorkerThread implements Runnable {
private Dictionary dict;
private EntityFactory factory;
public WorkerThread(Dictionary dict, EntityFactory factory) {
this.dict = dict;
this.factory = factory;
}
public void run() {
try {
while (!done.get()) {
Pair<String, String> np_ent = mappings.take();
if (np_ent == end) {
done.set(false);
continue;
}
String entities = np_ent.getRight();
String np = np_ent.getLeft().toLowerCase();
if (attested_nps == null || attested_nps.contains(np)) {
int np_index = dict.getIndex(np);
HashSet<Entity> entity_set = new HashSet<Entity>();
for (String entity : entities.split(", ")) {
Entity e = factory.createEntity(entity.trim());
if (possible_entities != null) {
possible_entities.add(e);
}
entity_set.add(e);
}
mapping.put(np_index, entity_set);
}
}
} catch (InterruptedException e) {
System.out.println("Worker thread interrupted");
}
}
}
編集:
プロデューサースレッドとワーカースレッドを開始するメインスレッドのコードは次のとおりです。
Thread producer = new Thread(new ProducerThread(f), "Producer");
producer.start();
ArrayList<Thread> workers = new ArrayList<Thread>();
for (int i=0; i<num_threads; i++) {
workers.add(new Thread(new WorkerThread(dict, factory), "Worker"));
}
for (Thread t : workers) {
t.start();
}
try {
producer.join();
for (Thread t : workers) {
t.join();
}
} catch (InterruptedException e) {
System.out.println("Main thread interrupted...");
}
また、プロデューサースレッドで行われる作業をメインスレッドで行うだけで、メインコード内の別のスレッドを開始して結合する必要がなくなります。ただし、ファイルを確認する前に必ずワーカースレッドを開始し、作業が完了したらワーカースレッドに参加してください。ただし、その方法とここでの方法のパフォーマンスの違いについてはよくわかりません。