私は Flink と Cassandra で作業しようとしています。どちらも超並列環境ですが、連携させるのは困難です。
現在、N 個のオブジェクトが読み取られた後にクエリを終了する可能性がある、異なるトークン範囲によって Cassandra から並列読み取りを行う操作を行う必要があります。
バッチ モードの方が適していますが、DataStreams も可能です。LongCounter (下記参照) を試しましたが、期待どおりに動作しませんでした。私は彼らとの全体的な合計を得ることができませんでした. ローカル値のみ。
この操作 CassandraRequester は、並列化が約 64 または 128 の並列コンテキストで実行されるため、非同期モードは必須ではありません。
これは私の試みです
class CassandraRequester<T> (val klass: Class<T>, private val context: FlinkCassandraContext):
RichFlatMapFunction<CassandraTokenRange, T>() {
companion object {
private val session = ApplicationContext.session!!
private var preparedStatement: PreparedStatement? = null
private val manager = MappingManager(session)
private var mapper: Mapper<*>? = null
private val log = LoggerFactory.getLogger(CassandraRequesterStateless::class.java)
public const val COUNTER_ROWS_NUMBER = "flink-cassandra-select-count"
}
private lateinit var counter: LongCounter
override fun open(parameters: Configuration?) {
super.open(parameters)
if(preparedStatement == null)
preparedStatement = session.prepare(context.prepareQuery()).setConsistencyLevel(ConsistencyLevel.LOCAL_ONE)
if(mapper == null) {
mapper = manager.mapper<T>(klass)
}
counter = runtimeContext.getLongCounter(COUNTER_ROWS_NUMBER)
}
override fun flatMap(tokenRange: CassandraTokenRange, collector: Collector<T>) {
val bs = preparedStatement!!.bind(tokenRange.start, tokenRange.end)
val rs = session.execute(bs)
val resultSelect = mapper!!.map(rs)
val iter = resultSelect.iterator()
while (iter.hasNext()) when {
this.context.maxRowsExtracted == 0L || counter.localValue < context.maxRowsExtracted -> {
counter.add(1)
collector.collect(iter.next() as T)
}
else -> {
collector.close()
return
}
}
}
}
このような場合、クエリを終了することはできますか?