0

プロデューサーがHDFSでのファイルへの書き込みを終了すると、コンシューマープロセスがプロデューサーによって作成されたデータを読み取ることを保証したいと思います。以下は、アプリケーションで使用される1つのアプローチであり、改善しようとしています。

プロデューサー:

private void produce(String file, int sleepSeconds) throws Exception {
        Configuration conf = new Configuration();
        conf.addResource(new Path(
                "C:\\dev\\software\\hadoop-0.22.0-src\\conf\\core-site.xml"));
        conf.set("fs.defaultFS", "hdfs://XXX:9000");
        FileSystem fileSystem = FileSystem.get(conf);

        Path path = new Path(file);
        if (fileSystem.exists(path)) {
            fileSystem.delete(path, false);
        }
        System.out.println("Creating file");
        FSDataOutputStream out = fileSystem.create(path);
        System.out.println("Writing data");
        out.writeUTF("--data--");
        System.out.println("Sleeping");
        Thread.sleep(sleepSeconds * 1000L);
        System.out.println("Writing data");
        out.writeUTF("--data--");
        System.out.println("Flushing");
        out.flush();
        out.close();
        fileSystem.close();
        System.out.println("Releasing lock on file");
    }

消費者:

private void consume(String file) throws Exception {
        Configuration conf = new Configuration();
        conf.addResource(new Path(
                "C:\\dev\\software\\hadoop-0.22.0-src\\conf\\core-site.xml"));
        conf.set("fs.defaultFS", "hdfs://XXX:9000");
        FileSystem fileSystem = FileSystem.get(conf);

        Path path = new Path(file);
        if (fileSystem.exists(path)) {
            System.out.println("File exists");
        } else {
            System.out.println("File doesn't exist");
            return;
        }
        FSDataOutputStream fsOut = null;
        while (fsOut == null) {
            try {
                fsOut = fileSystem.append(path);
            } catch (IOException e) {
                Thread.sleep(1000);
            }
        }
        FSDataInputStream in = fileSystem.open(path);
        OutputStream out = new BufferedOutputStream(System.out);
        byte[] b = new byte[1024];
        int numBytes = 0;
        while ((numBytes = in.read(b)) > 0) {
            out.write(b, 0, numBytes);
        }
        in.close();
        out.close();
        if (fsOut != null)
            fsOut.close();
        fileSystem.close();
        System.out.println("Releasing lock on file");
    }

プロセスの実行方法の要件は次のとおりです。

  1. (スレッドではなく)プロデューサープロセスが開始されます。thread.sleepは、一連のデータベース呼び出しとビジネスロジックをシミュレートします

  2. コンシューマープロセス(スレッドではない)は、プロデューサーがロックを解放するまでブロックする別のマシンで開始されます。コンシューマーが読み取りている間、他のプロセスはデータファイルを変更しないでください

HDFS java APIを使用して、リーダーがデータを見逃さないことを保証すると同時に、このコード/デザインを改善する方法についてのアドバイスはありますか?

4

1 に答える 1

1

1つの解決策は、一時的な接尾辞/プレフィックスを付けてファイルに書き込み、書き込みが完了したらファイルの名前を変更することです。

たとえば、ファイルfile1.txtに出力します。

  • .file1.txtまたはという名前のファイルに書き込むfile1.txt.tmp
  • 完了したらファイルを閉じます
  • .file1.txtの名前を変更するかfile1.txt.tmpfile1.txt
  • その間、消費者はfile1.txt利用可能になるのを待っています
于 2012-05-30T16:27:40.307 に答える