1

javaを使用して実装された通常のパブリッシャーとサブスクライバーを作成しました。これは、コンテンツを合計サイズ5MBの1MBとして読み取り、1MBごとにサブスクライバーに公開するように機能します。データは正常に公開されています。既存のファイルのコンテンツ。最後に、ファイル内の最後の1MBのデータしか見つかりませんでした。この問題を解決する方法を教えてください。また、発行者と加入者のソースコードを添付しました。

Publisher:

public class MessageDataPublisher {
    static StringBuffer fileContent;
    static RandomAccessFile randomAccessFile ;

    public static void main(String[] args) throws IOException {
        MessageDataPublisher msgObj=new MessageDataPublisher();

        String fileToWrite="test.txt";
        msgObj.towriteDDS(fileToWrite);
    }


    public void towriteDDS(String fileName) throws IOException{

        DDSEntityManager mgr=new DDSEntityManager();
        String partitionName="PARTICIPANT";



        // create Domain Participant
        mgr.createParticipant(partitionName);

        // create Type
        BinaryFileTypeSupport binary=new BinaryFileTypeSupport();
        mgr.registerType(binary);


        // create Topic
        mgr.createTopic("Serials");

        // create Publisher
        mgr.createPublisher();

        // create DataWriter
        mgr.createWriter();

        // Publish Events

        DataWriter dwriter = mgr.getWriter();
        BinaryFileDataWriter binaryWriter=BinaryFileDataWriterHelper.narrow(dwriter);


        int bufferSize=1024*1024;


        File readfile=new File(fileName);
        FileInputStream is = new FileInputStream(readfile);
        byte[] totalbytes = new byte[is.available()];
        is.read(totalbytes);
        byte[] readbyte = new byte[bufferSize];
        BinaryFile binaryInstance;

        int k=0;
        for(int i=0;i<totalbytes.length;i++){
            readbyte[k]=totalbytes[i];
            k++;
            if(k>(bufferSize-1)){
                binaryInstance=new BinaryFile();
                binaryInstance.name="sendpublisher.txt";
                binaryInstance.contents=readbyte;
                int status = binaryWriter.write(binaryInstance, HANDLE_NIL.value);
                ErrorHandler.checkStatus(status, "MsgDataWriter.write");

                ErrorHandler.checkStatus(status, "MsgDataWriter.write");

                k=0;
                }

        }
        if(k < (bufferSize-1)){
            byte[] remaingbyte = new byte[k];                   
            for(int j=0;j<(k-1);j++){
                remaingbyte[j]=readbyte[j];
            }
            binaryInstance=new BinaryFile();
            binaryInstance.name="sendpublisher.txt";
            binaryInstance.contents=remaingbyte;
            int status = binaryWriter.write(binaryInstance, HANDLE_NIL.value);
            ErrorHandler.checkStatus(status, "MsgDataWriter.write");

        }       
        is.close();


        try {
            Thread.sleep(4000);

        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // clean up
        mgr.getPublisher().delete_datawriter(binaryWriter);
        mgr.deletePublisher();
        mgr.deleteTopic();
        mgr.deleteParticipant();

    }




}


Subscriber:


public class MessageDataSubscriber {
    static RandomAccessFile randomAccessFile ;
    public static void main(String[] args) throws IOException {
        DDSEntityManager mgr = new DDSEntityManager();
        String partitionName = "PARTICIPANT";

        // create Domain Participant
        mgr.createParticipant(partitionName);

        // create Type
        BinaryFileTypeSupport msgTS = new BinaryFileTypeSupport();
        mgr.registerType(msgTS);

        // create Topic
        mgr.createTopic("Serials");

        // create Subscriber
        mgr.createSubscriber();

        // create DataReader
        mgr.createReader();

        // Read Events
        DataReader dreader = mgr.getReader();
        BinaryFileDataReader binaryReader=BinaryFileDataReaderHelper.narrow(dreader);
        BinaryFileSeqHolder binaryseq=new BinaryFileSeqHolder();
        SampleInfoSeqHolder infoSeq = new SampleInfoSeqHolder();
        boolean terminate = false;
        int count = 0;

        while (!terminate && count < 1500) {
             // To run undefinitely
            binaryReader.take(binaryseq, infoSeq, 10,
                    ANY_SAMPLE_STATE.value, ANY_VIEW_STATE.value,ANY_INSTANCE_STATE.value);
                for (int i = 0; i < binaryseq.value.length; i++) {
                    toWrtieXML(binaryseq.value[i].contents);
                    terminate = true;
            }

            try
            {
                Thread.sleep(200);
            }
            catch(InterruptedException ie)
            {
            }
            ++count;

        }
            binaryReader.return_loan(binaryseq,infoSeq);

        // clean up

        mgr.getSubscriber().delete_datareader(binaryReader);
        mgr.deleteSubscriber();
        mgr.deleteTopic();
        mgr.deleteParticipant();

    }

    private static void toWrtieXML(byte[] bytes) throws IOException {
        // TODO Auto-generated method stub
        File Writefile=new File("samplesubscriber.txt");
        if(!Writefile.exists()){
            randomAccessFile = new RandomAccessFile(Writefile, "rw");
            randomAccessFile.write(bytes, 0, bytes.length);
            randomAccessFile.close();
            }
            else{
                randomAccessFile = new RandomAccessFile(Writefile, "rw");
                long i=Writefile.length();
                randomAccessFile.seek(i);
                randomAccessFile.write(bytes, 0, bytes.length);
                randomAccessFile.close();
            }


    }
}

前もって感謝します

4

1 に答える 1

1

あなたの問題はいくつかの異なる原因の結果である可能性があるため、あなたの質問に決定的な答えを与えることは困難です。また、問題の原因が特定されたら、おそらくそれを軽減するための複数のオプションがあります。

最初に見る場所は読者側です。コードは、take()各テイクの間に200ミリ秒の休止を伴うループで実行します。DataReaderのQoS設定によっては、アプリケーションが200ミリ秒スリープしているときに、サンプルがDataReaderで上書きされる状況に直面する場合があります。ギガビットイーサネットを介してこれを行う場合、通常のDDS製品は、そのスリープ期間内に1メガバイトの5つのチャンクを実行できます。つまり、デフォルトの1桁のバッファーは、スリープ中に4回上書きされます。

このシナリオは、のデフォルトの履歴QoS設定を使用した場合に発生する可能性があります。BinaryFileDataReaderつまりhistory.kind = KEEP_LAST、とhistory.depth = 1です。後者をより大きな値、たとえば20に増やすと、スリープ中にファイルの20チャンクを保持できるキューが作成されます。今のところそれで十分でしょう。

これで問題が解決しない場合は、他の考えられる原因を調べることができます。

于 2012-07-16T03:31:10.147 に答える