0

私は、特定の種類のファイルが見つかったときに、それらを別のスレッドでダウンロード、エンコード、およびアップロードするシステムを持っています。

while(true) {
    for(SftpClient c : clients) {
        try {
            filenames = c.list("*.wav", "_rdy_");
        } catch (SftpException e) {
            e.printStackTrace();
        }
        if(filenames.size() > 0) {
            //AudioThread run() method handles the download, encode, and upload
            AudioThread at = new AudioThread(filenames);
            at.setNode(c.getNode());
            Thread t = new Thread(at);
            t.start();
        }
    }
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

AudioThreadのrunメソッド

public void run() {
    System.out.println("Running...");
    this.buildAsteriskMapping();
    this.connectToSFTP();
    ac = new AudioConvert();
    this.connectToS3();

    String downloadDir = "_rough/" + getNode() + "/" + Time.getYYYYMMDDDate() + "/";
    String encodeDir = "_completed" + getNode() + "/" + Time.getYYYYMMDDDate() + "/";
    String uploadDir = getNode() + "/" + Time.getYYYYMMDDDate() + "/";

    System.out.println("Downloading...");
    try {
        sftp.get(filenames, downloadDir);
    } catch (SftpException e) {
        //download failed
        System.out.println("DL Failed...");
        e.printStackTrace();
    }

    System.out.println("Encoding...");
    try {
        ac.encodeWavToMP3(filenames, downloadDir, encodeDir);
    } catch (IllegalArgumentException | EncoderException e) {
        System.out.println("En Failed...");
        e.printStackTrace();
    }

    System.out.println("Uploading...");
    try {
        s3.upload(filenames, encodeDir, uploadDir);
    } catch (AmazonClientException e) {
        System.out.println("Up Failed...");
        e.printStackTrace();
    }

}

ダウンロード方法:

public void get(ArrayList<String> src, String dest) throws SftpException {
    for(String file : src) {
        System.out.println(dest + file);
        channel.get(file, dest + file);
    }
}

エンコード方法:

public void encodeWavToMP3(ArrayList<String> filenames, String downloadDir, String encodeDir) throws IllegalArgumentException, EncoderException {
    for(String f : filenames) {
        File wav = new File(downloadDir + f);
        File mp3 = new File(encodeDir + wav.getName().replace(".wav", ".mp3"));
        encoder.encode(wav, mp3, attrs);
    }
}

アップロード方法:

public void upload(ArrayList<String> filenames, String encodeDir, String uploadDir)  throws AmazonClientException, AmazonServiceException {
    for(String f : filenames) {
        s3.putObject(new PutObjectRequest(bucketName, uploadDir, new File(encodeDir + f)));
    }
}

問題は、すべてのスレッドに同じファイル(またはほぼ同じファイル)をダウンロードし続けることです。ダウンロード中のファイルを保持するクライアントごとに変数を追加したいのですが、この変数からリスト/ファイル名を削除する方法がわかりません。解決策は何でしょうか?上司はまた、x個のスレッドのみを実行できるようにしたいと考えています。

4

2 に答える 2

4

実際にダウンロードを行うコードが欠落しているため、問題を確認するのは難しいです:P

ただし、代わりにある種のExecutorServiceを使用します。

基本的に、各ダウンロード要求をサービスに追加し(ダウンロードするファイルへの参照と、ファイルを取得するために必要なその他の関連情報を含む「DownloadTask」にラップされます)、残りはサービスに任せます。

ダウンロードタスクは、適切と思われる既存のファイルを考慮に入れるようにコーディングできます。

要件に応じて、これはシングルスレッドまたはマルチスレッドのサービスになります。また、アップロードクエストを配置することもできます。

詳細については、エグゼキュータトレイルを確認してください

一般的な考え方は、一種の生産者/消費者パターンを使用することです。(少なくとも)ダウンロードするすべてのファイルを検索するスレッドがあり、ファイルごとに、それをエグゼキューターサービスに追加します。ファイルがダウンロードされたら、リクエストをキューに入れて同じサービスにアップロードします。

このようにして、同期とスレッド管理によるすべての混乱を回避します:D

スキャンタスクでも同じアイデアを使用できます。クライアントごとに、個別のサービスへのタスクを実行できます。

于 2012-10-12T21:09:29.033 に答える
1

whileループでAudioThreadをインスタンス化するコードに問題があります。

スレッドを作成してt.start()を実行すると、すべてのダウンロード、エンコード、およびアップロードが非同期で行われることに注意してください。したがって、スレッドを開始した後、作成した最初のスレッドがまだ最初のファイルセットを処理している間に、ループを継続してc.list(...)への別の呼び出しを実行します。呼び出しでファイルパターンを指定し、現在処理されているファイルをマークするコードがないため、後続のc.list()呼び出しでおそらく同じファイルのセットが返されます。

私のおすすめ:

  • 前の投稿で説明したように、Executors.newFixedThreadPool(int nThreads)を使用します。そして、スレッドの数をマシンのプロセッサの数に指定します。whileループの前にこれを実行します。
  • ftp s.list()から取得したファイル名ごとに、Callableクラスを作成し、ExecutorService.invokeAll(Collection <Callable <T >>タスク)を呼び出します。作成するCallableのコードは、AudioThreadコードです。一度に1つのファイルのみを処理するようにAudioThreadコードを変更します(可能な場合)。このようにして、ファイルごとに並行してダウンロード、アップロード、エンコードを行います。
  • どのファイルがすでに処理されたかをマークするコードを追加します。次のc.list()呼び出しで返されないように、処理したファイルの名前を別の名前に変更するコードを追加することをお勧めします。
  • whileループブロックの後にExecutorService.shutdown(...)を呼び出します
于 2012-10-13T04:55:09.357 に答える