4

次のシナリオを想定します。

XmlParser1メソッドを公開するXMLパーサーなど、バイトストリームを消費するライブラリのJavaクラスがありますxmlParser1.parse(inputStream)。このメソッドは通常、1 回の呼び出しですべてのバイトを消費し、最終的にブロックします。XmlParser2また、別のライブラリから、実装が 異なる、似たようなことを行う別のクラスもありますxmlParser2.parse(inputStream)。ここで、両方のパーサーで単一のストリームを解析したいと考えています。

私の最初の答えは次のとおりです。各クラスがストリームをどのように消費するかを制御することはできないため、できることはすべてのバイトをメモリ内または一時ファイルにバッファリングする (または可能であれば開く/再度開く) ことだけです。これらのコンシューマーの API は、本質的に非協調的です。

ここで、XmlParser1(実装と署名) を制御し、呼び出し元が上記の動作を合理的かつ効率的な方法で実装できるように、より柔軟で協調的な方法でコーディングしたいとします... 何を提案しますか?

私が検討しているいくつかの代替案:

1) をXmlParser1実装FilterInputStreamして、一部のクラス ( XmlParser1) がそこからバイトを読み込もうとすると、必要なものを内部的に解析し (反復的に、おそらく合理的なバッファリングを使用して)、生のバイトを返すようにします。FilterInputStream(これはコンセプトと正確には一致しません)。このようにして、クライアント コードはパーサーを単純に連鎖させることができます。

   public class XmlParser1 extends FilterInputStream {
       public XmlParser1(InputStream rawInputStream) { ... } 
       public int read(byte[] b, int off, int l) throws IOException {
           // this would invoke the underlying stream read, parse internall the read bytes,
           // and leave them in the buffer
       }
   }

   XmlParser1 parser1 = new XmlParser1(inputstream);
   XmlParser2 parser2 = new XmlParser2(parse); 
   parser2.parse(); // parser2 consumes all the input stream, which causes parser1 to read an parse it too

XmlParser12) at をバイトの消費者と見なす代わりに、シンクと見なします。バイト自体を食べさせず、スプーンで供給します。つまり、... を渡す代わりに、 をxmlParser1.parse(inputStream)渡し ます。これにより、クライアントは、バイトを透過的にクラスに渡し、同時に呼び出しを行う TeeInputStream を作成できます。xmlParser1.write(byte[])InputStreamOutputStreamXmlParser2XmlParser1.write()

いずれの場合も個別のスレッドは必要ないことに注意してください。

より良い代替案があるかどうかについて、どちらが概念的に好ましいかはわかりません。すでに議論されているはずの設計上の問題のように思えますが、必ずしもJavaに限定されているわけではありません。意見や参考文献を歓迎します。

4

5 に答える 5

1

スレッドが同じサーバー上にある場合、InputStreamsを分割するという考えは意味がありません。データを取得するために引き続き1つのInputStreamと1つのBufferedInputStreamのみを使用するため、InputStreamsからオブジェクトを作成してから、2つの異なる実行中のスレッドでそれらのオブジェクトを使用します。結論:JavaではいつでもInputStreamをブロックする必要はありません。ブロックすると、バッファ(またはパイプ)がオーバーフローするとどうなるので、私はそれを有害だとさえ考えます。キューオーバーフロー!

編集:ストリームを停止したい場合は、送信者にデータを送信しないように指示する必要があります。または、YouTubeのように、ビデオを部分にスライスし(つまり、1分間に1つの部分)、それらのみをプリロードします。一度にパーツを作成するため、ビデオを停止してもプリロードにはまったく影響しません。タイムラインの特定の位置(45秒、1分45秒、2分45秒など)に到達した場合にのみプリロードされるためです。 、これは実際にはプリロード手法であり、実際のストリーミングではありません。そのため、Youtubeはパケットドロップをいじる必要がありません。)

ただし、クライアントであるあなたのために、まだ数行の擬似コードがあります。

BufferedOutputStream bos = new BufferedOutputStream(/*yourBasicInputStream*/);
ObjectOutputStream oos = new ObjectOutputStream(bos);  //Or use another wrapper
oos.writeObject(yourObjectToSend);      //Or use another parser: Look into the API: ObjectInputStream

メインスレッドコントローラー(別名サーバー)内のクラス変数:

Thread thread1;  //e.g. a GUI controller
Thread thread2;  //e.g. a DB controller

サーバー(またはサーバーによって開始された別のサーバースレッド、両方のスレッドをパラメーターとして使用):

BufferedInputStream bis = new BufferedInputStream(/*yourBasicInputStream*/);
ObjectInputStream ois = new ObjectInputStream(bis);   //Or use another wrapper
//now we use an interface MyNetObject implementing the method getTarget(), but
//also an abstract class would be possible (with a more complex getTarget-method):
MyNetObject netObject = (MyNetObject) ois.readObject();   //Or use another parser...
if(netObject.getTarget()=="Thread1ClassANDThread2Class"){
    thread1.activateSync(netObject);        //notify...  
    thread2.activateSync(netObject);        //...both threads!
}
else if(netObject.getTarget()=="Thread1Class"){
    thread1.activate(netObject);        //give thread1 a notification
}
else if(netObject.getTarget()=="Thread2Class"){
    thread2.activate(netObject);        //give thread2 a notification
}
else {//do something else...}

「activateSync(netObject)」メソッドを同期することを忘れないでください。ただし、オブジェクトに変更を加えたい場合に限ります(読み取りを同期する必要はなく、書き込みのみを同期する必要があります)。

public void activateSync(MyNetObject netObject){
    synchronize(netObject){
        //do whatever you wanna do with the object...the other thread will always get the actual object!
    }
}

それは簡単で、速く、一貫性があります...そして完全にオブジェクト指向です。あなたがアイデアを得るといいのですが。;)

アップデート:

ストリームまたはリーダーも実際には「パーサー」であることを理解することが重要です。重要な違いが1つあります。ストリームは(通常)ネットワーク駆動型のクラスであり、charを除くあらゆる種類のデータの書き込みと読み取りに使用されます。リーダーはあらゆる種類のテキスト/文字を読み取るために使用されますが。したがって、適切な実装は次のようになります。いくつかのストリームで着信パケットを読み取り、データを適切なオブジェクトに格納するだけです。次に、あらゆる種類のリーダーで使用できる汎用オブジェクトがあります。読み取る画像だけがある場合はreadUTF()、クラスObjectInputStreamhttp://docs.oracle.com/javase/1.4.2/docs/api/java/io/ObjectInputStream.html)でパーサーを試して、文字列を生成できます。 :

BufferedInputStream bis = new BufferedInputStream(/*yourBasicInputStream*/);
ObjectInputStream ois = new ObjectInputStream(bis);
String string = ois.readUTF();   //Or another usable parser/method
XmlParser1.read(string);      //for reads there is...
XmlParser2.read(string);      //...no synchronisation needed!

さて、残っているのは、その文字列の読み方をパーサーに教えることだけです。オブジェクト文字列自体は「シンク」と見なすことができますが。これがうまくいかない場合は、別のパーサー/メソッドを見つけて「シンク」オブジェクトを作成してください。

ここで説明するソリューション(適切なパーサーでクラスObjectInputStreamを使用する)は、多くの場合、ビッグデータでも機能することに注意してください(ネット経由で送信する前に、1GBのファイルをいくつかの文字列/オブジェクト「パケット」にスライスするだけです。トレントと同じです)。ただし、ビデオ/オーディオストリーミングでは機能しません。パケットドロップが発生し、とにかくまったく異なるソリューションが必要になる可能性があります(これは科学そのものです:http ://www.google.ch/search?q = video + stream +パケット+ドロップ)。

于 2013-02-07T20:35:55.573 に答える
1

2 つのパーサーが 2 つの別々のスレッドで実行されていると仮定すると、次のようになります (動作するコードではありません)。

public class Test extends FilterInputStream {
    byte[] buf = new byte[8192];
    int len;
    Thread thread = null;

    @Override
    public synchronized int read(byte[] b, int off, int l) throws IOException {
        while (thread == Thread.currentThread() && len > 0) {
            thread.wait();
        }
        if (len > 0) {
            System.arraycopy(buf, 0, b, off, l);
            len = 0;
            return l;
        }
        len = super.read(b, off, l);
        System.arraycopy(b, off, buf, 0, len);
        thread = Thread.currentThread();
        notify();
        return len;
    }

つまり、#1 はバイトを読み取って buf に保存し、#2 がバッファからすべてを読み取るまで、#1 による次の試行はブロックされます。

于 2013-01-04T06:03:18.287 に答える
0

「それらの消費者のAPIは本質的に非協力的です」とあなたは言いました。だから、それらを作ろうとしないでください、隔離を保ち、彼らが望むものを彼らに与えてください。個別の入力ストリーム。

スレッドに実際の入力ストリームを読み取り、2つの出力ストリームに書き込みます。

次に、それらの出力ストリームから入力ストリームを作成します。これは、パイプストリームを使用して行うことができます。

pipedInputStream1 = new PipedInputStream(pipedOutputStream1);

ByteArrayInputStream(((ByteArrayOutputStream)byteOutputStream1).toByteArray());

于 2013-02-11T17:12:19.937 に答える
0

XmlParse1 と XmlParser2 の内部で何が起こっているのかは不明ですが、InputStream バイトではなく最終的な XML データを実際に気にしていると仮定すると、StAX XMLEvent API に切り替えます。両方のパーサーにXMLEventConsumerを実装させることができます。次に、実際のストリームを解析してイベントをコンシューマーに渡す外側のループがあります。

public void parseXml(InputStream stream) {
  XMLEventReader reader = ...; // convert stream into XMLEventReader

  XMLConsumer[] consumers = new XMLConsumer[]{new XmlParser1(), new XmlParser2()};

  while(reader.hasNext()) {
    XMLEvent event = reader.nextEvent();
    for(XMLConsumer consumer : consumers) {
      consumer.add(event));
  }
}
于 2013-02-08T19:08:54.933 に答える
0

元の入力ストリームを Apache Commons TeeInputStream にプルして、OutputStream を作成しようとしました。 http://commons.apache.org/io/api-release/org/apache/commons/io/input/TeeInputStream.html

IeeInputStream によって書き込まれる OutputStream として、Java PipedOutputStream を使用しました。

この PipedOutputStream を Java PipedInputStream に接続しました。

これにより、TeeInputStream と PipedInputStream を読み取ることができました。それがうまくいくかどうか、または少なくとも次のステップを提供するかどうかはわかりません.

ReaderThread クラスを作成して、それらを並行して読み取ることができるかどうかを確認しました。

  private static class ReaderThread extends Thread
  {
    InputStream inStream;
    int threadId;
    public ReaderThread(int threadId, InputStream inStream)
    {
      this.inStream = inStream;
      this.threadId = threadId;
    }

    @Override
    public void run()
    {
      try
      {
        int c = inStream.read();
        while (c != -1)
        {
          System.out.println("From ("+threadId+ ") "+c);
          c = inStream.read();
        }
      }
      catch (Exception e)
      {
        e.printStackTrace();
      }
    }
  }

これは、次のコードから実行されました。

InputStream inStream = new FileInputStream(fileName);
PipedInputStream pipedInStream = new PipedInputStream();
OutputStream pipedOutStream = new PipedOutputStream(pipedInStream);
TeeInputStream tin = new TeeInputStream(inStream,
  pipedOutStream);

ReaderThread firstThread = new ReaderThread(1,tin);
ReaderThread secondThread = new ReaderThread(2,pipedInStream);

firstThread.start();
secondThread.start();
于 2013-02-07T17:26:21.923 に答える