2

Javaでの並行プログラミングは初めてです。

非常に急速に成長するログファイルを読み取り、分析し、処理する必要があるため、高速である必要があります。私の考えは、ファイルを(行ごとに)読み取ることでした。関連する行に一致すると、それらの行を別のスレッドに渡し、その行でさらに処理を行うことができます。次のコード例では、これらのスレッドを「IOThread」と呼んでいます。

私の問題は、 IOthread.run() の BufferedReader readline が明らかに返らないことです。スレッド内でストリームを読み取る有効な方法は何ですか? 以下のものよりも良いアプローチはありますか?

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

class IOThread extends Thread {
    private InputStream is;
    private int t;

    public IOThread(InputStream is, int t)  {
        this.is = is;
        this.t = t;
        System.out.println("iothread<" + t + ">.init");
    }

    public void run() {
        try {
            System.out.println("iothread<" + t + ">.run");
            String line;

            BufferedReader streamReader = new BufferedReader(new InputStreamReader(is));
            while ((line = streamReader.readLine()) != null) {
                System.out.println("iothread<" + t + "> got line " + line);
            }
            System.out.println("iothread " + t + " end run");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

public class Stm {
    public Stm(String filePath) {
        System.out.println("start");

        try {
            BufferedReader reader = new BufferedReader(new FileReader(filePath));

            PipedOutputStream po1 = new PipedOutputStream();
            PipedOutputStream po2 = new PipedOutputStream();
            PipedInputStream pi1 = new PipedInputStream(po1);
            PipedInputStream pi2 = new PipedInputStream(po2);
            IOThread it1 = new IOThread(pi1,1);
            IOThread it2 = new IOThread(pi2,2);

            it1.start();
            it2.start();
//          it1.join();
//          it2.join();

            String line;
            while ((line = reader.readLine()) != null) {
                System.out.println("got line " + line);

                if (line.contains("aaa")) {
                    System.out.println("passing to thread 1: " + line);  
                    po1.write(line.getBytes());
                } else if (line.contains("bbb")) {
                    System.out.println("passing to thread 2: " + line);  
                    po2.write(line.getBytes());
                }
            }
            reader.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new Stm(args[0]);
    }

}

入力ファイルの例は次のとおりです。

line 1
line 2
line 3 aaa ...
line 4
line 5 bbb ...
line 6 aaa ...
line 7
line 8 bbb ...
line 9 bbb ...
line 10

入力ファイルのファイル名を引数として上記のコードを呼び出します。

4

2 に答える 2

4

iothreadのリーダーは、次の理由でwhileループの最初の反復の先頭に留まり続けます。STMスレッドから読み取り行の内容を渡しますが、改行文字を追加しません(\ n) 。バッファリングされたリーダーは(.readLine()のように)改行文字を待機するため、永久に待機します。次のようにコードを変更できます。

   if (line.contains("aaa")) {
                System.out.println("passing to thread 1: " + line);  
                byte[] payload = (line+"\n").getBytes();
                po1.write(payload);
            } else if (line.contains("bbb")) {
                System.out.println("passing to thread 2: " + line);  
                byte[] payload = (line+"\n").getBytes();
                po2.write(payload);
            }

しかし、これはまったく洗練されたソリューションではないことを言わなければなりません。ブロッキングキューなどを使用して、IOThreadsにコンテンツを提供することができます。このようにして、入力を文字列からバイトに変換して文字列に戻すことを回避できます(すべてのストリームを削除することはできません)。

于 2012-10-10T11:48:10.273 に答える
2

私見あなたはそれを逆に持っています。ファイルからデータを読み取るためではなく、「処理」するための複数のスレッドを作成します。ファイルからデータを読み取るときは、とにかくボトルネックになるため、複数のスレッドを使用しても違いはありません。最も簡単な解決策は、特定のスレッドでできるだけ速く行を読み取り、その行を共有キューに格納することです。このキューは、関連する処理を実行するために、任意の数のスレッドからアクセスできます。

このようにして、I/Oまたはリーダースレッドがデータの読み取り/待機でビジーである間に、実際に並行処理を行うことができます。可能であれば、リーダー スレッドの「ロジック」を最小限に抑えます。これらの行を読むだけで、ワーカー スレッドに実際の面倒な作業 (パターンのマッチング、さらなる処理など) を任せることができます。スレッドセーフキューを使用するだけで、コーシャになるはずです。

EDIT:BlockingQueue配列ベースまたはリンクリストベースのいずれかのバリアントを使用してください。

于 2012-10-10T11:37:34.687 に答える