1

BigData DB (私の場合は Cassandra) に列名 col1、col2、col3、val1、val2 の行があります。

SQLアプローチでは、col1、col2またはcol2、col1またはその他の可能な方法でグループ化できます。このようにして、ツリー階層を簡単に形成できます。

しかし、現在、グループ化をサポートしていないデータを保存するために Cassandra を使用しています。したがって、グループ化と集計を行うために Storm を使用したいと考えています。集計とグループ化を行うサンプルコードをいくつか書きましたが、それを達成できるかどうかについて意見を述べることができません。

データはこんな感じ

col1,col2,col3,val1,val2
------------------------
a1,b1,c1,10,20
a1,b1,c2,11,13
a1,b2,c1,9,15
a1,b2,c3,13,88
a2,b1,c1,30,44
a2,b3,c2,22,33
a4,b4,c4,99,66

Excel ピボットのように、階層 root->child1->child2->child3-val1,val2 を構築したい場合、階層が col1->col2->col3 の場合、次のようになります。

a1          {43,136}
    --b1        {21,33}
        --c1    10,20
        --c2    11,13
    --b2        {22,103}
        --c1    9,15
        --c3    13,88
a2          {52,77}
    --b1        {30,44}
        --c1    30,44
    --b3        {22,33}
    --c2    22,33
a4          {99,66}
    --b4        {99,66}
        --c4    99,66

この場合、データは次のようになります

c1          {49,79}
    --a1        {19,35}
        --b1    10,20
        --b2    9,15
    --a2        {30,44}
        --b1    30,44
c2          {11,13}
    --a1        {11,13}
        --b1    11,13
    --a2        {22,33}
        --b3    22,33
c3          {13,88}
    --a1        {13,88}
        --b2    13,88
c4          {99,66}
    --a4        {99,66}
        --b4    99,66

私のトライデント コードのいくつかの行はこのように見えますが、期待どおりに動作していません。

topology.newStream("aggregation", spout)
.groupBy(new Fields("col1","col2","col3","val1","val2"))
.aggregate(new Fields("val1","val2"), new Sum(), new Fields("val1sum","val2sum"))
.each(new Fields("col1","col2","col3","val1sum","val2sum"), new Utils.PrintFilter());

上記の変換を行うために、Trident API サポートの有無にかかわらず、Storm を使用したいと考えています。誰でもそれを達成する方法を教えてもらえますか? プログラムのアイデアは大歓迎です。

4

1 に答える 1

0

groupBy にはディメンション (col1、col2、および col3) のみを含め、メジャー (val1、val2) は含めないでください。また、複数のメジャーを集計する必要がある場合は、chainedAgg() コンストラクトを使用する必要があります。以下は、ユース ケースの変更されたトポロジ コードです。

            topology.newStream("aggregation", spout)
    .groupBy(new Fields("col1","col2"))
    .chainedAgg()
    .aggregate(new Fields("val1"), new Sum(), new Fields("val1sum"))
    .aggregate(new Fields("val2"), new Sum(), new Fields("val2sum"))
    .chainEnd()
    .each(new Fields("col1","col2","val1sum", "val2sum"), new Utils.PrintFilter());

期待どおり、次の出力が生成されます。

PartitionId=0、[a1、b1、21、33]

PartitionId=0、[a1、b2、22、103]

PartitionId=0、[a4、b4、99、66]

PartitionId=0、[a2、b1、30、44]

PartitionId=0、[a2、b3、22、33]

乾杯!

MK

于 2014-02-07T19:01:31.013 に答える