1

csv、tsv などから行をインポートするための firehose があるデフォルトの例とは異なり、データベースからレコードをインポートして druid に挿入できるようにするためのものはありますか? 何かご意見は?

これが私が考えていたことです-

"firehose": {
    "type" : "database",
        "datasource" : {
                 "connectURI" : "jdbc:mysql://localhost:3306/test",
                 "user" : "druid",
                 "password" : "xyz123"
        },
        "query" : "select * from table"
        "frequency" : "P1M"
}

jndi データソースやその他のいくつかを介して接続を取得するように拡張できます。この種の実装に問題はありますか?

4

1 に答える 1

-1
このアイデアはどうですか?これは、jdbc 取り込み用のカスタム ファイアホースです。
この場合、1 回限りのクエリの取り込みのみがサポートされます。
https://github.com/sirpkt/druid/tree/jdbc_firehose/extensions-contrib/jdbc-firehose
これはコード スニペットです。DBI ライブラリを使用して、既存のデータベース サーバーから結果セットを取得しようとします。
  public Firehose connect(final MapInputRowParser parser) throws IOException, ParseException, IllegalArgumentException
  {
    if (columns != null) {
      verifyParserSpec(parser.getParseSpec(), columns);
    }

    final Handle handle = new DBI(
        connectorConfig.getConnectURI(),
        connectorConfig.getUser(),
        connectorConfig.getPassword()
    ).open();

    final String query = makeQuery(columns);

    final ResultIterator<InputRow> rowIterator = handle
        .createQuery(query)
        .map(
            new ResultSetMapper<InputRow>()
            {
              List<String> queryColumns = (columns == null) ? Lists.<String>newArrayList(): columns;

              @Override
              public InputRow map(
                  final int index,
                  final ResultSet r,
                  final StatementContext ctx
              ) throws SQLException
              {
                try {
                  if (queryColumns.size() == 0)
                  {
                    ResultSetMetaData metadata = r.getMetaData();
                    for (int idx = 1; idx <= metadata.getColumnCount(); idx++)
                    {
                      queryColumns.add(metadata.getColumnName(idx));
                    }
                    Preconditions.checkArgument(queryColumns.size() > 0,
                        String.format("No column in table [%s]", table));
                    verifyParserSpec(parser.getParseSpec(), queryColumns);
                  }
                  ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder();
                  for (String column: queryColumns) {
                    builder.put(column, r.getObject(column));
                  }
                  return parser.parse(builder.build());

                } catch(IllegalArgumentException e) {
                  throw new SQLException(e);
                }
              }
            }
        ).iterator();
于 2016-06-10T13:54:13.483 に答える