4

I am using Cascading 2 to create Hadoop jobs and am trying to create a flow that starts with a single source. After a couple of functions are applied to the data I need to split the flow so that this data is used to create two separate reports (in two separate sinks).

    //SOURCE
    Scheme sourceScheme = new TextLine( new Fields( "line" ) );
    Tap source = new Hfs( sourceScheme, input );

    //REPORT1 SINK
    Scheme report1SinkScheme = new TextDelimited( Fields.ALL, ",","\"" );
    Tap report1Sink = new Hfs( report1SinkScheme, output1, SinkMode.REPLACE );

    //REPORT2 SINK
    Scheme report2SinkScheme = new TextDelimited( Fields.ALL, ",","\"" );
    Tap report2Sink = new Hfs( report2SinkScheme, output2, SinkMode.REPLACE );

    //INITIAL FUNCTIONS
    Pipe firstPipe = new Pipe("firstPipe");
    firstPipe = new Each(firstPipe, new Fields("line"), functionA);
    firstPipe = new Each(firstPipe, functionB, Fields.ALL);

    //REPORT1 FUNCTION
    report1Pipe = new Each(firstPipe, Fields.ALL, function1, Fields.RESULTS);

    //REPORT2 FUNCTION
    report2Pipe = new Each(firstPipe, Fields.ALL, function2, Fields.RESULTS);

    //CONNECT FLOW PARTS
    FlowDef flowDef = new FlowDef()
    .setName("report-flow")
    .addSource(firstPipe, source)
    .addSink(report1Pipe, report1Sink)
    .addSink(report2Pipe, report2Sink);

    new HadoopFlowConnector( properties ).connect( flowDef ).complete();

Currently this is giving me the error "java.lang.IllegalArgumentException: cannot add duplicate sink: firstPipe" but even after messing around with it for a while I get a variety of other issues to do with the flow set up.

Is it possible for someone to explain how to construct a flow of this form (one source, two sinks)? Do I need to create a Cascade instead? Or do I need an intermediate sink to hold the data before I split?

Please help!

4

2 に答える 2

5

カスケードのドキュメントに記載されている分割パターンを使用できます。次に例を示します。

public static void main(String[] args) {
    // source and sink
    Scheme sourceScheme = new TextLine(new Fields("line"));
    Tap source = new FileTap(sourceScheme, args[0]);

    Fields sinkFields = new Fields("word", "count");
    Scheme sinkScheme = new TextLine(sinkFields, sinkFields);
    Tap sink_one = new FileTap(sinkScheme, "out-one.txt");
    Tap sink_two = new FileTap(sinkScheme, "out-two.txt");

    // the pipe assembly
    Pipe assembly = new Pipe("wordcount");

    String regex = "\\w+";
    Function function = new RegexGenerator(new Fields("word"), regex);
    assembly = new Each(assembly, new Fields("line"), function);

    Aggregator count = new Count(new Fields("count"));

    // ...split into two pipes
    Pipe countOne = new Pipe("count-one", assembly);
    countOne = new GroupBy(countOne, new Fields("word"));
    countOne = new Every(countOne, count);

    Pipe countTwo = new Pipe("count-two", assembly);
    countTwo = new GroupBy(countTwo, new Fields("word"));
    countTwo = new Every(countTwo, count);

    // create the flow
    final List<Pipe> pipes = new ArrayList<Pipe>(2);
    pipes.add(countOne);
    pipes.add(countTwo);

    final Map<String, Tap> sinks = new HashMap<String, Tap>();
    sinks.put("count-one", sink_one);
    sinks.put("count-two", sink_two);

    FlowConnector flowConnector = new LocalFlowConnector();
    Flow flow = flowConnector.connect(source, sinks, pipes);

    flow.complete();
}
于 2012-09-04T00:58:05.290 に答える
4

分割パターンは、カスケーディング ユーザー ガイド (http://docs.cascading.org/cascading/2.1/userguide/htmlsingle/#N21362) にあり ます

もう 1 つの (より単純な) 例は、「せっかちな人のためのカスケーディング」のパート 5 & 6 に含まれています。

上記のコードに関する 1 つのポイントは、report1Pipeおよびの変数定義が欠落しているように見えることですreport2Pipe分割パターンを使用するには、各ブランチに名前が必要で、名前が異なる必要があります。

パイプ アセンブリの前の方から同じ名前を継承した 2 つの分岐があるため、例外がスローされます。したがって、たとえば、これらのflowDef.addSink(..)呼び出しはフロー プランナーにとってあいまいです。

そのため、「せっかちな」パート 5 では、「D」、「DF」、および「TF」ブランチが操作内でどのように命名されるかを見てください。

Cascading でこの命名を要求するのは少し直感に反するように思えるかもしれませんが、特定のブランチに失敗トラップデバッグなどを追加する場合、大規模で複雑なワークフローでは非常に重要になります。

あるいは、Clojure の Cascalog DSL ははるかに宣言的であるため、これは言語によって直接処理されます。分岐はサブクエリであり、トラップなどはサブクエリのクロージャー内で処理されます。

于 2013-01-03T05:44:40.587 に答える