1

Java フォーク結合プールで実行する必要がある文字列パーサー (大きなテキスト BLOB を解析する) があります。プールは他のスレッドよりも高速で、正規表現と xpath の両方を使用すると、解析時間が 30 分以上短縮されました。ただし、作成されるスレッドの数が劇的に増加し、スレッド プールが複数回呼び出されるため、スレッドを終了できるようにする必要があります。4 コア システムでプールを 1 コアだけに制限せずにスレッドの増加を減らすにはどうすればよいですか?

スレッド数が 40000 を超えており、プログラムが 10 回実行されており、ユーザーの実行スレッド数が 50000 スレッドに制限されているため、5000 に近づける必要があります。

この問題は、Windows と Linux の両方で発生しています。

私は:

  • 最大プロセッサを利用可能なプロセッサの数*構成可能な数に設定します。現在は 1 です
  • get() が呼び出された後のタスクのキャンセル
  • 私は必死なので、再インスタンス化する前にforkjoinプールを必死にnullに設定します

任意のヘルプをいただければ幸いです。ありがとう。

プールを停止、取得、再起動するために使用しているコードは次のとおりです。fjp.submit(TASK) を使用して各タスクをサブミットし、シャットダウン時にすべてのタスクを呼び出していることにも注意してください。

while(pages.size()>0) { log.info("現在アクティブなスレッド: "+Thread.activeCount()); log.info("反復で見つかったページ "+j+": "+pages.size());

        if(fjp.isShutdown())
        {
            fjp=new ForkJoinPool(Runtime.getRuntime().availableProcessors()*procnum);
        }

        i=0;
        //if asked to generate a hash, due this first
        if(getHash==true){
            log.info("Generating Hash");
            int s=pages.size();
            while(i<s){
                String withhash=null;
                String str=pages.get(0);

                if(str != null){
                    jmap=Json.read(str).asJsonMap();
                    jmap.put("offenderhash",Json.read(genHash(jmap.get("offenderhash").asString()+i)));

                    for(String k:jmap.keySet()){
                        withhash=(withhash==null)?"{\""+k+"\":\""+jmap.get(k).asString()+"\"":withhash+",\""+k+"\":\""+jmap.get(k).asString()+"\"";
                    }

                    if(withhash != null){
                        withhash+=",}";
                    }

                    pages.remove(0);
                    pages.add((pages.size()-1), withhash);
                    i++;
                }
            }
            i=0;
        }

        if(singlepats != null)
        {

        log.info("Found Singlepats");
        for(String row:pages)
        {   

            String str=row;
            str=str.replaceAll("\t|\r|\r\n|\n","");
            jmap=Json.read(str).asJsonMap();

            if(singlepats.containsKey("table"))
            {
                if(fjp.isShutdown())
                {
                    fjp=new ForkJoinPool((Runtime.getRuntime().availableProcessors()*procnum));
                }

                fjp=new ForkJoinPool((Runtime.getRuntime().availableProcessors()*procnum));

                if(jmap.get(column)!=null)
                {

                    if(test){
                        System.out.println("//////////////////////HTML////////////////////////\n"+jmap.get(column).asString()+"\n///////////////////////////////END///////////////////////////\n\n");
                    }

                    if(mustcontain != null)
                    {
                        if(jmap.get(column).asString().contains(mustcontain))
                        {
                            if(cannotcontain != null)
                            {
                                if(jmap.get(column).asString().contains(cannotcontain)==false)
                                results.add(fjp.submit(new ParsePage(replacementPattern,singlepats.get("table"),jmap.get(column).asString().replaceAll("\\s\\s", " "),singlepats, Calendar.getInstance().getTime().toString(), jmap.get("offenderhash").asString())));
                            }
                            else
                            {
                                results.add(fjp.submit(new ParsePage(replacementPattern,singlepats.get("table"),jmap.get(column).asString().replaceAll("\\s\\s", " "),singlepats, Calendar.getInstance().getTime().toString(), jmap.get("offenderhash").asString())));
                            }
                        }
                    }
                    else if(cannotcontain != null)
                    {
                        if(jmap.get(column).asString().contains(cannotcontain)==false)
                        {
                            results.add(fjp.submit(new ParsePage(replacementPattern,singlepats.get("table"),jmap.get(column).asString().replaceAll("\\s\\s", " "),singlepats, Calendar.getInstance().getTime().toString(), jmap.get("offenderhash").asString())));
                        }
                    }
                    else
                    {
                        results.add(fjp.submit(new ParsePage(replacementPattern,singlepats.get("table"),jmap.get(column).asString().replaceAll("\\s\\s", " "),singlepats, Calendar.getInstance().getTime().toString(), jmap.get("offenderhash").asString())));
                    }
                }
            }

            i++;

            if(((i%commit_size)==0 & i != 0) | i==pages.size() |pages.size()==1 & singlepats != null)
            {
                log.info("Getting Regex Results");

                log.info("Shutdown");

                try {
                    fjp.awaitTermination(termtime, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }

                fjp.shutdown();
                while(fjp.isTerminated()==false)
                {
                    try{
                        Thread.sleep(5);
                    }catch(InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                }


                for(Future<String> r:results)
                {
                    try {
                        add=r.get();
                        if(add.contains("No Data")==false)
                        {
                            parsedrows.add(add);
                        }

                        add=null;
                        if(r.isDone()==false)
                        {
                            r.cancel(true);
                        }

                        if(fjp.getActiveThreadCount()>0 && fjp.getRunningThreadCount()>0)
                        {
                            fjp.shutdownNow();
                        }

                        fjp=new ForkJoinPool(Runtime.getRuntime().availableProcessors()*procnum);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }

                results=new ArrayList<ForkJoinTask<String>>();

                if(parsedrows.size()>=commit_size)
                {

                    if(parsedrows.size()>=SPLITSIZE)
                    {
                        sendToDb(parsedrows,true);
                    }
                    else
                    {
                        sendToDb(parsedrows,false);
                    }

                    parsedrows=new ArrayList<String>();
                }


                //hint to the gc in case it actually pays off (think if i were a gambling man)
                System.gc();
                Runtime.getRuntime().gc();
            }


        }
        }
        log.info("REMAINING ROWS TO COMMIT "+parsedrows.size());
        log.info("Rows Left"+parsedrows.size());
        if(parsedrows.size()>0)
        {


            if(parsedrows.size()>=SPLITSIZE)
            {
                sendToDb(parsedrows,true);
            }
            else
            {
                sendToDb(parsedrows,false);
            }


            parsedrows=new ArrayList<String>();
        }

        records+=i;
        i=0;

//Query for more records to parse
4

2 に答える 2

3

結果ごとに新しい ForkJoinPool を作成しているようです。本当にやりたいことは、すべてのタスクが共有する単一の ForkJoinPool を作成することです。追加のプールは、追加の並列処理を利用できるようにはしないので、問題ないはずです。実行する準備が整ったタスクを取得したら、fjp を取得して呼び出します。fjp.execute(ForkJoinTask)またはForkJoinTask.fork()、既にタスクにいる場合。

複数のプールを作成することは、簿記の悪夢のように思えます。共有されているものだけで逃げるようにしてください。

于 2014-04-02T16:52:22.757 に答える