0

タイトルが少し曖昧な場合はご容赦ください。私が達成しようとしていることをもう少しよく説明しようと思います。

私が実装した外部インターフェースの一部であるparsebytesと呼ばれる関数があります。バイトの配列と長さが必要です。この特定のプログラムのすべての解析は単一のスレッドで実行されるため、データをできるだけ早くparsebytesから取得して、より多くのデータをオフラインで取得できるようにしたいと考えています。擬似コードでの私の方法論はこれです:外部で実行されているスレッド(ParserThreadClass)を作成します。parsebytesが呼び出されるたびに、すべてのバイトをループしてbyteQueue.add(bytes [i])を実行することにより、バイトをParserThreadClassのキューに入れます。このコードは、synchronized(byteQueue)に囲まれています。これにより、事実上、parsebytesが解放され、戻ってより多くのデータを取得できるようになります。

それが起こっている間、私のParserThreadClassも実行されています。これはrun()関数のコードです

while (!shutdown) //while the thread is still running
    {
        synchronized (byteQueue) 
        {
            bytes.addAll(byteQueue);  //an arraylist
            byteQueue.clear();
        }

        parseMessage();   //this will take the bytes arraylist and build an xml message.
    }

私はここで過度に非効率的ですか?もしそうなら、誰かが私がこれにどのように取り組むべきかについての考えを私に与えることができますか?

4

1 に答える 1

2

これは私が以前に問題を解決しようとした方法です。基本的に、ここにあるようなプロデューサースレッドがあり、ファイルを読み取り、アイテムをキューに入れます。次に、キューから物事を読み取り、それらを処理するワーカースレッドがあります。コードは以下のとおりですが、基本的にはあなたがしていることと同じように見えます。私が見つけたのは、ディスクの読み取りに比べて、行ごとに実行する必要のある処理が非常に速いため、これによって速度がほとんど向上しないことです。あなたがしなければならない解析がかなり集中的であるか、チャンクがかなり大きい場合、あなたはこの方法でそれをすることをいくらかスピードアップするのを見ることができます。ただし、プロセスがIOバウンドであるため、それがごくわずかである場合は、パフォーマンスの向上をあまり期待しないでください。このような状況では、ディスクアクセスを並列化する必要がありますが、これは1台のマシンでは実際には実行できません。

public static LinkedBlockingQueue<Pair<String, String>> mappings;
public static final Pair<String, String> end =
    new Pair<String, String>("END", "END");
public static AtomicBoolean done;
public static NpToEntityMapping mapping;
public static Set<String> attested_nps;
public static Set<Entity> possible_entities;

public static class ProducerThread implements Runnable {
    private File f;

    public ProducerThread(File f) {
        this.f = f;
    }

    public void run() {
        try {
            BufferedReader reader = new BufferedReader(new FileReader(f));
            String line;
            while ((line = reader.readLine()) != null) {
                String entities = reader.readLine();
                String np = line.trim();
                mappings.put(new Pair<String, String>(np, entities));
            }
            reader.close();
            for (int i=0; i<num_threads; i++) {
                mappings.put(end);
            }
        } catch (InterruptedException e) {
            System.out.println("Producer thread interrupted");
        } catch (IOException e) {
            System.out.println("Producer thread threw IOException");
        }
    }
}

public static class WorkerThread implements Runnable {
    private Dictionary dict;
    private EntityFactory factory;

    public WorkerThread(Dictionary dict, EntityFactory factory) {
        this.dict = dict;
        this.factory = factory;
    }

    public void run() {
        try {
            while (!done.get()) {
                Pair<String, String> np_ent = mappings.take();
                if (np_ent == end) {
                    done.set(false);
                    continue;
                }
                String entities = np_ent.getRight();
                String np = np_ent.getLeft().toLowerCase();
                if (attested_nps == null || attested_nps.contains(np)) {
                    int np_index = dict.getIndex(np);
                    HashSet<Entity> entity_set = new HashSet<Entity>();
                    for (String entity : entities.split(", ")) {
                        Entity e = factory.createEntity(entity.trim());
                        if (possible_entities != null) {
                            possible_entities.add(e);
                        }
                        entity_set.add(e);
                    }
                    mapping.put(np_index, entity_set);
                }
            }
        } catch (InterruptedException e) {
            System.out.println("Worker thread interrupted");
        }
    }
}

編集:

プロデューサースレッドとワーカースレッドを開始するメインスレッドのコードは次のとおりです。

    Thread producer = new Thread(new ProducerThread(f), "Producer");
    producer.start();
    ArrayList<Thread> workers = new ArrayList<Thread>();
    for (int i=0; i<num_threads; i++) {
        workers.add(new Thread(new WorkerThread(dict, factory), "Worker"));
    }
    for (Thread t : workers) {
        t.start();
    }
    try {
        producer.join();
        for (Thread t : workers) {
            t.join();
        }
    } catch (InterruptedException e) {
        System.out.println("Main thread interrupted...");
    }

また、プロデューサースレッドで行われる作業をメインスレッドで行うだけで、メインコード内の別のスレッドを開始して結合する必要がなくなります。ただし、ファイルを確認する前に必ずワーカースレッドを開始し、作業が完了したらワーカースレッドに参加してください。ただし、その方法とここでの方法のパフォーマンスの違いについてはよくわかりません。

于 2012-11-19T14:49:24.140 に答える