8

StreamEx lib を使用して、以下のようにカスタム ForkJoinPool でストリームを並列化すると、後続のアクションはそのプールから並列スレッドで実行されることに気付きました。ただし、map() 操作を追加して結果のストリームを並列処理すると、プールからのスレッドが 1 つだけ使用されます。

以下は、この問題を示す最小限の作業例の完全なコード (すべてのインポートを除く) です。executeAsParallelFromList() メソッドと executeAsParallelAfterMap() メソッドの唯一の違いは、 .parallel() の前に .map(...) 呼び出しが追加されていることです。

import one.util.streamex.StreamEx;

public class ParallelExample {

private static final Logger logger = LoggerFactory.getLogger(ParallelExample.class);
private static ForkJoinPool s3ThreadPool = new ForkJoinPool(3);

public static List<String> getTestList(){
    int listSize = 10;
    List<String> testList = new ArrayList<>();
    for (int i=0; i<listSize; i++)
        testList.add("item_" + i);
    return testList;
}

public static void executeAsParallelFromList(){
    logger.info("executeAsParallelFromList():");
    List<String> testList = getTestList();
    StreamEx<String> streamOfItems = StreamEx
            .of(testList)
            .parallel(s3ThreadPool);
    logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
    streamOfItems.forEach(item -> handleItem(item));
}

public static void executeAsParallelAfterMap(){
    logger.info("executeAsParallelAfterMap():");
    List<String> testList = getTestList();
    StreamEx<String> streamOfItems = StreamEx
            .of(testList)
            .map(item -> item+"_mapped")
            .parallel(s3ThreadPool);
    logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
    streamOfItems.forEach(item -> handleItem(item));
}

private static void handleItem(String item){
    // do something with the item - just print for now
    logger.info("I'm handling item: {}", item);
}

}

両方の方法を実行する単体テスト:

public class ParallelExampleTest {

@Test
public void testExecuteAsParallelFromList() {
    ParallelExample.executeAsParallelFromList();
}

@Test
public void testExecuteAsParallelFromStreamEx() {
    ParallelExample.executeAsParallelAfterMap();
}

}

実行結果:

08:49:12.992 [main] INFO  marina.streams.ParallelExample - executeAsParallelFromList():
08:49:13.002 [main] INFO  marina.streams.ParallelExample - streamOfItems.isParallel(): true
08:49:13.040 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_6
08:49:13.040 [ForkJoinPool-1-worker-2] INFO  marina.streams.ParallelExample - I'm handling item: item_2
08:49:13.040 [ForkJoinPool-1-worker-3] INFO  marina.streams.ParallelExample - I'm handling item: item_1
08:49:13.041 [ForkJoinPool-1-worker-2] INFO  marina.streams.ParallelExample - I'm handling item: item_4
08:49:13.041 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_8
08:49:13.041 [ForkJoinPool-1-worker-3] INFO  marina.streams.ParallelExample - I'm handling item: item_0
08:49:13.041 [ForkJoinPool-1-worker-2] INFO  marina.streams.ParallelExample - I'm handling item: item_3
08:49:13.041 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_9
08:49:13.041 [ForkJoinPool-1-worker-3] INFO  marina.streams.ParallelExample - I'm handling item: item_5
08:49:13.041 [ForkJoinPool-1-worker-2] INFO  marina.streams.ParallelExample - I'm handling item: item_7

08:49:13.043 [main] INFO  marina.streams.ParallelExample - executeAsParallelAfterMap():
08:49:13.043 [main] INFO  marina.streams.ParallelExample - streamOfItems.isParallel(): true
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_0_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_1_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_2_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_3_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_4_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_5_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_6_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_7_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_8_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_9_mapped

ご覧のとおり、executeAsParallelFromList() の実行時には 3 つのスレッドすべてが使用されていますが、executeAsParallelAfterMap() の実行時には 1 つのスレッドしか使用されていません。

なんで?

ありがとう!

マリーナ

注: この例は意図的に単純化されています。問題を説明するために、できるだけ最小限にしようとしました。明らかに実際には、 map() 、 handleItem() などでさらに多くのことが行われており、入力データはさらに興味深いものです (AWS S3 バケット/プレフィックスを並行して処理しようとしています)。

4

2 に答える 2

3

問題は、map(...)メソッドを呼び出すとすぐに、StreamEx がその時点でのシーケンシャル/パラレル構成 (シーケンシャル) で基になる Java 8 ストリームを作成し、parallel(...)その後呼び出しても基になる Java 8 ストリームが更新されないように見えることです。

解決策は、何を達成しようとしているかによって異なります。map(...)操作を並行して実行してもよろしい場合は、操作を上に移動parallel(...)して、of(...).

ただし、いくつかの操作を並列操作の前に順次実行する場合は、2 つのストリームを使用することをお勧めします。たとえば、サンプル コードのスタイルに従います。

public static void executeAsParallelAfterMapV2() {
    logger.info("executeAsParallelAfterMapV2():");
    List<String> testList = getTestList();
    StreamEx<String> sequentialStream = StreamEx
            .of(testList)
            .map(item -> {
                logger.info("Mapping {}", item);
                return item + "_mapped";
            });
    logger.info("sequentialStream.isParallel(): {}", sequentialStream.isParallel());

    List<String> afterSequentialProcessing = sequentialStream.toList();
    StreamEx<String> streamOfItems = StreamEx.of(afterSequentialProcessing)
            .parallel(s3ThreadPool);
    logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
    streamOfItems.forEach(item -> handleItem(item));
}

これにより、次のような結果が得られます。

20:43:36.835 [main] INFO scott.streams.ParallelExample - executeAsParallelAfterMapV2():
20:43:36.883 [main] INFO scott.streams.ParallelExample - sequentialStream.isParallel(): false
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_0
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_1
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_2
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_3
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_4
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_5
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_6
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_7
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_8
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_9
20:43:36.886 [main] INFO scott.streams.ParallelExample - streamOfItems.isParallel(): true
20:43:36.889 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_6_mapped
20:43:36.889 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_2_mapped
20:43:36.890 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_8_mapped
20:43:36.890 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_5_mapped
20:43:36.890 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_4_mapped
20:43:36.890 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_9_mapped
20:43:36.890 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_1_mapped
20:43:36.890 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_3_mapped
20:43:36.890 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_7_mapped
20:43:36.890 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_0_mapped

余談...

興味深いことに、(StreamEx を使用せずに) Java 8 ストリームを直接作成し、parallel()操作を の下に配置するmap(...)と、(全体の) ストリームの型が並列になるように更新されます。

public static void executeAsParallelAfterMapJava8Stream() throws InterruptedException {
    logger.info("executeAsParallelAfterMapJava8Stream():");
    List<String> testList = getTestList();

    s3ThreadPool.submit(() -> {
        Stream<String> streamOfItems = testList.stream()
                .map(item -> {
                    logger.info("Mapping {}", item);
                    return item + "_mapped";
                })
                .parallel();
        logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
        streamOfItems.forEach(item -> handleItem(item));
    }).join();
}

同様の単体テストを作成すると、次のような結果が得られます。

20:36:23.469 [main] INFO scott.streams.ParallelExample - executeAsParallelAfterMapJava8Stream():
20:36:23.517 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - streamOfItems.isParallel(): true
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_6
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - Mapping item_2
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - Mapping item_8
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_6_mapped
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_2_mapped
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_8_mapped
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_5
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - Mapping item_4
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - Mapping item_9
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_5_mapped
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_4_mapped
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_9_mapped
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_1
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - Mapping item_3
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - Mapping item_7
20:36:23.521 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_1_mapped
20:36:23.521 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_3_mapped
20:36:23.521 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_7_mapped
20:36:23.521 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_0
20:36:23.521 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_0_mapped
于 2016-10-21T19:45:14.583 に答える