0

Flume-ng は初めてです。テキスト ファイルを他のプログラム (エージェント) に転送できるプログラムを作成する必要があります。エージェント、つまりホスト IP、ポート番号などについて知っておく必要があることはわかっています。次に、ソース、シンク、およびチャネルを定義する必要があります。ログファイルをサーバーに転送したいだけです。私のクライアントコードは次のとおりです。public class MyRpcClientFacade {

public class MyClient{

  private RpcClient client;
  private String hostname;
  private int port;

  public void init(String hostname, int port) {
        this.hostname = hostname;
        this.port = port;
        this.client = RpcClientFactory.getDefaultInstance(hostname, port);

      }

      public void sendDataToFlume(String data) {
        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
        try {
          client.append(event);
        } catch (EventDeliveryException e) {
          client.close();
          client = null;
          client = RpcClientFactory.getDefaultInstance(hostname, port);
        }
      }

      public void cleanUp() {
        client.close();
      }
}

上記のコードはString、指定されたプロセスにデータのみを送信できます。しかし、私はファイルを送信する必要があります。Source,Channel and Sinkさらに、サーバーに書き込む必要があるかどうか教えてください。もしそうなら、これら3つの設定方法と書き方。私を助けてください。の小さなサンプルを与えるSource,Sink And Channel

4

1 に答える 1

0

実際には、各ノードで Flume クライアントを取得するだけです。次に、その動作に関する情報を提供する構成ファイルを提供します。たとえば、ノードがファイルを読み取り (新しい行をそれぞれ読み取り、それらをイベントとして channel に送信する)、RPC ソケットを介してファイルの内容を送信する場合。構成は次のようになります。

  # sources/sinks/channels list
  <Agent>.sources = <Name Source1>
  <Agent>.sinks = <Name Sink1>
  <Agent>.channels = <Name Channel1> 
  # Channel attribution to a source
  <Agent>.sources.<Name Source1>.channels = <Name Channel1>
  # Channel attribution to sink
  <Agent>.sinks.<Name Sink1>.channels = <Name Channel1>
  # Configuration (sources,channels and sinks)
  # Source properties : <Name Source1>
  <Agent>.sources.<Name Source1>.type = exec
  <Agent>.sources.<Name Source1>.command = tail -F test
  <Agent>.sources.<Name Source1>.channels = <Name Channel1>
  # Channel properties : <Name Channel1>
  <Agent>.channels.<Name Channel1>.type = memory
  <Agent>.channels.<Name Channel1>.capacity = 1000
  <Agent>.channels.<Name Channel1>.transactionCapacity = 1000
  # Sink properties : <Name Sink1>
  <Agent>.sinks.<Nom Sink1>.type = avro
  <Agent>.sinks.<Nom Sink1>.channel = <Nom Channel1>
  <Agent>.sinks.<Nom Sink1>.hostname = <HOST NAME or IP>
  <Agent>.sinks.<Nom Sink1>.port = <PORT NUMBER>

次に、エージェントを設定する必要があります。エージェントは、同じポートで avro ソースを読み取り、イベントを保存したい方法で処理します。役に立てば幸いです;)

于 2013-10-01T09:58:55.943 に答える