6

私は Scala を初めて使用するということから始めましょう。しかし、アクター ベースの同時実行モデルは興味深いものであり、比較的単純なアプリケーションで試してみました。私が直面している問題は、アプリケーションを動作させることはできますが、結果は (リアルタイム、CPU 時間、およびメモリ使用量の点で) 同等の Java ベースのソリューションよりもはるかに効率が悪いということです。 ArrayBlockingQueue からメッセージをプルするスレッドを使用します。理由が知りたいです。おそらく私の Scala の知識不足が原因であり、すべての非効率性を引き起こしているのではないかと思いますが、アプリケーションを何度も作り直そうと試みた結果、うまくいきませんでした。コミュニティに助けを求めることにしました。

私の問題はこれです:次の形式の多くの行を含むgzipファイルがあります:

SomeID comma_separated_list_of_values

例えば:

1234 12,45,82

各行を解析し、コンマ区切りリスト内の各値の出現回数の全体的なカウントを取得したいと思います。

このファイルはかなり大きい (数 GB 圧縮) かもしれませんが、ファイルごとの一意の値の数はかなり少ないです (最大で 500)。これは、Actor ベースの並行 Scala アプリケーションを作成する絶好の機会だと思いました。私のソリューションには、パーサー アクターのプールを作成するメイン ドライバーが含まれます。次に、メイン ドライバーは stdin から行を読み取り、行を解析して値のローカル カウントを保持するアクタにその行を渡します。メイン ドライバーが最後の行を読み取ると、すべての行が読み取られたことを示すメッセージが各アクターに渡されます。アクターが「完了」メッセージを受け取ると、すべてのアクターからのカウントを合計するアグリゲーターにカウントを渡します。すべてのパーサーからのカウントが集計されると、メイン ドライバーは統計を出力します。

問題: 私が直面している主な問題は、このアプリケーションの信じられないほどの非効率性です。スレッドと ArrayBlockingQueue を使用する「同等の」Java アプリケーションよりもはるかに多くの CPU とメモリを使用します。これを概観するために、1,000 万行のテスト入力ファイルについて収集した統計をいくつか示します。

Scala 1 アクター (パーサー):

    real    9m22.297s
    user    235m31.070s
    sys     21m51.420s

Java 1 スレッド (パーサー):

    real    1m48.275s
    user    1m58.630s
    sys     0m33.540s

Scala 5 アクター:

    real    2m25.267s
    user    63m0.730s
    sys     3m17.950s

Java 5 スレッド:

    real    0m24.961s
    user    1m52.650s
    sys     0m20.920s

さらに、top は、Scala アプリケーションが約 10 倍の常駐メモリ サイズを持っていることを報告しています。つまり、ここでは CPU とメモリを桁違いに増やして、パフォーマンスを桁違いに低下させることについて話しているのですが、何が原因なのかわかりません。それは GC の問題ですか、それとも私が認識しているよりもはるかに多くのオブジェクトのコピーを作成しているのですか?

重要な場合とそうでない場合がある追加の詳細:

  • scala アプリケーションは Java クラスによってラップされているため、自己完結型の実行可能 JAR ファイルを提供できます (このアプリを実行するすべてのマシンに Scala jar があるわけではありません)。
  • アプリケーションは次のように呼び出されます: gunzip -c gzFilename | java -jar StatParser.jar

コードは次のとおりです。

メインドライバー:

import scala.actors.Actor._
import scala.collection.{ immutable, mutable }
import scala.io.Source

class StatCollector (numParsers : Int ) {
    private val parsers = new mutable.ArrayBuffer[StatParser]()
    private val aggregator = new StatAggregator()

    def generateParsers {
        for ( i <- 1 to numParsers ) {
            val parser = new StatParser( i, aggregator )
            parser.start
            parsers += parser
        }
    }


    def readStdin {
        var nextParserIdx = 0
        var lineNo = 1
        for ( line <- Source.stdin.getLines() ) {
            parsers( nextParserIdx ) ! line
            nextParserIdx += 1
            if ( nextParserIdx >= numParsers ) {
                nextParserIdx = 0
            }
            lineNo += 1
        }
    }

    def informParsers {
        for ( parser <- parsers ) {
            parser ! true
        }
    }

    def printCounts {
        val countMap = aggregator.getCounts()
        println( "ID,Count" )
        /*
        for ( key <- countMap.keySet ) {
            println( key + "," + countMap.getOrElse( key, 0 ) )
            //println( "Campaign '" + key + "': " + countMap.getOrElse( key, 0 ) )
        }
        */
        countMap.toList.sorted foreach {
            case (key, value) =>
                println( key + "," + value )
        }
    }

    def processFromStdIn {
        aggregator.start

        generateParsers

        readStdin
        process
    }

    def process {

        informParsers

        var completedParserCount = aggregator.getNumParsersAggregated
        while ( completedParserCount < numParsers ) {
            Thread.sleep( 250 )
            completedParserCount = aggregator.getNumParsersAggregated
        }

        printCounts
    }
}

パーサー アクター:

import scala.actors.Actor
import collection.mutable.HashMap
import scala.util.matching

class StatParser( val id: Int, val aggregator: StatAggregator ) extends Actor {

    private var countMap = new HashMap[String, Int]()
    private val sep1 = "\t"
    private val sep2 = ","


    def getCounts(): HashMap[String, Int] = {
        return countMap
    }

    def act() {
        loop {
            react {
                case line: String =>
                    {
                        val idx = line.indexOf( sep1 )
                        var currentCount = 0
                        if ( idx > 0 ) {
                            val tokens = line.substring( idx + 1 ).split( sep2 )
                            for ( token <- tokens ) {
                                if ( !token.equals( "" ) ) {
                                    currentCount = countMap.getOrElse( token, 0 )
                                    countMap( token ) = ( 1 + currentCount )
                                }
                            }

                        }
                    }
                case doneProcessing: Boolean =>
                    {
                        if ( doneProcessing ) {
                            // Send my stats to Aggregator
                            aggregator ! this
                        }
                    }
            }
        }
    }
}

アグリゲーター アクター:

import scala.actors.Actor
import collection.mutable.HashMap

class StatAggregator extends Actor {
    private var countMap = new HashMap[String, Int]()
    private var parsersAggregated = 0

    def act() {
        loop {
            react {
                case parser: StatParser =>
                    {
                        val cm = parser.getCounts()
                        for ( key <- cm.keySet ) {
                            val currentCount = countMap.getOrElse( key, 0 )
                            val incAmt = cm.getOrElse( key, 0 )
                            countMap( key ) = ( currentCount + incAmt )
                        }
                        parsersAggregated += 1
                    }
            }
        }
    }

    def getNumParsersAggregated: Int = {
        return parsersAggregated
    }

    def getCounts(): HashMap[String, Int] = {
        return countMap
    }
}

ここで何が起こっているのかを理解するために提供できる助けがあれば、大歓迎です。

前もって感謝します!

- - 編集 - -

多くの人から Java コードを求められたので、比較のために作成した単純な Java アプリを次に示します。これが優れた Java コードではないことはわかっていますが、Scala アプリケーションのパフォーマンスを見たときに、Java スレッドベースの実装がベースラインとしてどのように機能するかを簡単に確認するために何かを作成しました。

解析スレッド:

import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class JStatParser extends Thread
{
    private ArrayBlockingQueue<String> queue;
    private Map<String, Integer> countMap;
    private boolean done;

    public JStatParser( ArrayBlockingQueue<String> q )
    {
        super( );
        queue = q;
        countMap = new Hashtable<String, Integer>( );
        done = false;
    }

    public Map<String, Integer> getCountMap( )
    {
        return countMap;
    }

    public void alldone( )
    {
        done = true;
    }

    @Override
    public void run( )
    {
        String line = null;
        while( !done || queue.size( ) > 0 )
        {
            try
            {
                // line = queue.take( );
                line = queue.poll( 100, TimeUnit.MILLISECONDS );
                if( line != null )
                {
                    int idx = line.indexOf( "\t" ) + 1;
                    for( String token : line.substring( idx ).split( "," ) )
                    {
                        if( !token.equals( "" ) )
                        {
                            if( countMap.containsKey( token ) )
                            {
                                Integer currentCount = countMap.get( token );
                                currentCount++;
                                countMap.put( token, currentCount );
                            }
                            else
                            {
                                countMap.put( token, new Integer( 1 ) );
                            }
                        }
                    }
                }
            }
            catch( InterruptedException e )
            {
                // TODO Auto-generated catch block
                System.err.println( "Failed to get something off the queue: "
                        + e.getMessage( ) );
                e.printStackTrace( );
            }
        }
    }
}

運転者:

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ArrayBlockingQueue;

public class JPS
{
    public static void main( String[] args )
    {
        if( args.length <= 0 || args.length > 2 || args[0].equals( "-?" ) )
        {
            System.err.println( "Usage: JPS [filename]" );
            System.exit( -1 );
        }

        int numParsers = Integer.parseInt( args[0] );
        ArrayBlockingQueue<String> q = new ArrayBlockingQueue<String>( 1000 );
        List<JStatParser> parsers = new ArrayList<JStatParser>( );

        BufferedReader reader = null;

        try
        {
            if( args.length == 2 )
            {
                reader = new BufferedReader( new FileReader( args[1] ) );
            }
            else
            {
                reader = new BufferedReader( new InputStreamReader( System.in ) );
            }

            for( int i = 0; i < numParsers; i++ )
            {
                JStatParser parser = new JStatParser( q );
                parser.start( );
                parsers.add( parser );
            }

            String line = null;
            while( (line = reader.readLine( )) != null )
            {
                try
                {
                    q.put( line );
                }
                catch( InterruptedException e )
                {
                    // TODO Auto-generated catch block
                    System.err.println( "Failed to add line to q: "
                            + e.getMessage( ) );
                    e.printStackTrace( );
                }
            }

            // At this point, we've put everything on the queue, now we just
            // need to wait for it to be processed.
            while( q.size( ) > 0 )
            {
                try
                {
                    Thread.sleep( 250 );
                }
                catch( InterruptedException e )
                {
                }
            }

            Map<String,Integer> countMap = new Hashtable<String,Integer>( );
            for( JStatParser jsp : parsers )
            {
                jsp.alldone( );
                Map<String,Integer> cm = jsp.getCountMap( );
                for( String key : cm.keySet( ) )
                {
                    if( countMap.containsKey( key ))
                    {
                        Integer currentCount = countMap.get(  key );
                        currentCount += cm.get( key );
                        countMap.put( key, currentCount );
                    }
                    else
                    {
                        countMap.put(  key, cm.get( key ) );
                    }
                }
            }

            System.out.println( "ID,Count" );
            for( String key : new TreeSet<String>(countMap.keySet( ))  )
            {
                System.out.println( key + "," + countMap.get( key ) );
            }

            for( JStatParser parser : parsers )
            {
                try
                {
                    parser.join( 100 );
                }
                catch( InterruptedException e )
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

            System.exit(  0  );
        }
        catch( IOException e )
        {
            System.err.println( "Caught exception: " + e.getMessage( ) );
            e.printStackTrace( );
        }
    }
}
4

2 に答える 2

7

これが俳優にとって良いテストケースかどうかはわかりません。まず、役者同士のやり取りがほとんどありません。これは単純な map/reduce であり、並行性ではなく並列性を必要とします。

アクターのオーバーヘッドもかなり大きく、実際にいくつのスレッドが割り当てられているかわかりません。プロセッサの数によっては、Java プログラムよりもスレッド数が少ない場合があります。速度が 5 倍ではなく 4 倍であることを考えると、これは事実のようです。

アクターの記述方法は、アイドル状態のアクター、つまり数百または数千のアクターが存在する状況に最適化されていますが、実際の作業を行っているアクターは常に少数です。while/receiveの代わりにloop/を使用してアクターを記述した場合react、パフォーマンスが向上します。

アクターは、アクター オブジェクトのメソッドを呼び出しているというアクターの原則の 1 つに違反したことを除けば、多くのコンピューターにアプリケーションを簡単に配布できます。アクターでそれを行うべきではありません。実際、Akka はそうすることを妨げています。これを行うよりアクターらしい方法は、アグリゲーターが各アクターにキー セットを要求し、結合を計算してから、キーごとにすべてのアクターにそのキーのカウントを送信するように要求することです。

ただし、アクターのオーバーヘッドが表示されているものかどうかはわかりません。あなたは Java 実装に関する情報を提供しませんでしたが、変更可能なマップを使用していると思います。おそらく単一の同時変更可能なマップを使用することさえあります。これは、Scala で行っていることとは非常に異なる実装です。

また、ファイルがどのように読み取られるか (このような大きなファイルにはバッファリングの問題がある可能性があります)、または Java でどのように解析されるかについての情報もありません。ほとんどの作業はファイルの読み取りと解析であり、トークンはカウントされないため、実装の違いは他の問題を簡単に克服できます。

最後に、常駐メモリ サイズについては、Scala には (JVM がもたらすものに加えて) 9 MB のライブラリがあり、これはご覧のとおりかもしれません。もちろん、Java で単一の同時実行マップを使用しているのに対して、Scala で 6 つの不変マップを使用している場合、メモリ使用パターンに大きな違いが生じることは間違いありません。

于 2012-07-30T17:25:31.080 に答える
-1

Scala のアクターはAkka のアクターに最後の日を譲ります...そしてさらに多くのものが来ています - Viktor は最後を最高のものにするためにさらに努力しています: https://twitter.com/viktorklang/status/229694698397257728

ところで: オープンソースは大きな力です! この日は、すべての JVM ベースのコミュニティの休日です。

http://www.marketwire.com/press-release/azul-systems-announces-new-initiative-support-open-source-community-with-free-zing-jvm-1684899.htm

于 2012-07-30T17:35:01.203 に答える