私はマイクロサービス監視アプリに取り組んでいます。私のアプリは、新しい消費されたレコードを受信したときに、それに応じて GUI を更新することを想定しています。つまり、新しいレコードを受信したとき:
1) それが表すリクエストが正当なフローの一部であるかどうか、およびそのフローがすでに GUI に表示されているかどうかを確認します。表現とは、完全な流れを表す一連の円を意味します。たとえば、トランザクション (MS1 が要求を受信) を取得した場合、正当なフロー num 1: MS1 から MS2 から MS3 へのフローであるため、GUI は 2 つの灰色の円 (MS1 から MS2 および MS2 から MS3) を含むテーブル列を追加します。次に、MS1 から受信したレコード: MS2 が消費されると、最初の円を緑色に塗ります。
私の問題は、Amazon の KCL コード (ここに示されている) を「利用する」方法がわからないことです。つまり、消費されたレコードがJavaFX GUIでイベントをトリガーし、それに応じてGUIを更新する方法がわかりません。
助けていただければ幸いです。
パッケージ com.kinesisdataconsumer;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.DATA_STATUS;
import com.DataBase;
import com.MonitoringLogicImpl;
import com.kinesisdataproducer.Producer;
import com.Transaction;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
public class Consumer implements IRecordProcessorFactory {
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
public DataBase dataBase;
public ArrayList<Transaction> transactionList;
public MonitoringLogicImpl monitoringLogic;
private final AtomicLong largestTimestamp = new AtomicLong(0);
private final List<Long> sequenceNumbers = new ArrayList<>();
private final Object lock = new Object();
public Consumer(DataBase database, ArrayList<Transaction> transactions, MonitoringLogicImpl monitoringLogicImplementation){
dataBase = database;
transactionList = transactions;
monitoringLogic = monitoringLogicImplementation;
}
private class RecordProcessor implements IRecordProcessor {
@Override
public void initialize(String shardId) {}
@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
long timestamp = 0;
List<Long> seqNos = new ArrayList<>();
for (Record r : records) {
timestamp = Math.max(timestamp, Long.parseLong(r.getPartitionKey()));
try {
byte[] b = new byte[r.getData().remaining()];
r.getData().get(b);
seqNos.add(Long.parseLong(new String(b, "UTF-8").split("#")[0]));
//this thread adds the transaction to the DB
Thread addTransactionToDBThread = new Thread() {
public void run() {
try {
JSONObject jsonObj = new JSONObject(new String(b, "UTF-8").split("#")[1]);
Transaction transaction = Transaction.convertJsonToTransaction(jsonObj);
//add the transaction to the database
dataBase.addTransactionToDB(transaction);
//update the user-interface about the last transaction in the system
DATA_STATUS transactionStatus = monitoringLogic.getStatus(transaction);
monitoringLogic.updateUI(transaction.getUuid(), transaction.getSender(), transaction.getReceiver(), transactionStatus);
Thread.sleep(1000);
} catch(InterruptedException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
};
addTransactionToDBThread.start();
} catch (Exception e) {
log.error("Error parsing record", e);
System.exit(1);
}
}
synchronized (lock) {
if (largestTimestamp.get() < timestamp) {
log.info(String.format(
"Found new larger timestamp: %d (was %d), clearing state",
timestamp, largestTimestamp.get()));
largestTimestamp.set(timestamp);
sequenceNumbers.clear();
}
// Only add to the shared list if our data is from the latest run.
if (largestTimestamp.get() == timestamp) {
sequenceNumbers.addAll(seqNos);
Collections.sort(sequenceNumbers);
}
}
try {
checkpointer.checkpoint();
} catch (Exception e) {
log.error("Error while trying to checkpoint during ProcessRecords", e);
}
}
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
log.info("Shutting down, reason: " + reason);
try {
checkpointer.checkpoint();
} catch (Exception e) {
log.error("Error while trying to checkpoint during Shutdown", e);
}
}
}
/**
* Log a message indicating the current state.
*/
public void logResults() {
synchronized (lock) {
if (largestTimestamp.get() == 0) {
return;
}
if (sequenceNumbers.size() == 0) {
log.info("No sequence numbers found for current run.");
return;
}
// The producer assigns sequence numbers starting from 1, so we
// start counting from one before that, i.e. 0.
long last = 0;
long gaps = 0;
for (long sn : sequenceNumbers) {
if (sn - last > 1) {
gaps++;
}
last = sn;
}
log.info(String.format(
"Found %d gaps in the sequence numbers. Lowest seen so far is %d, highest is %d",
gaps, sequenceNumbers.get(0), sequenceNumbers.get(sequenceNumbers.size() - 1)));
}
}
@Override
public IRecordProcessor createProcessor() {
return this.new RecordProcessor();
}
public void consumeData() {
KinesisClientLibConfiguration config =
new KinesisClientLibConfiguration(
"KinesisProducerLibSampleConsumer",
Producer.STREAM_NAME,
new DefaultAWSCredentialsProviderChain(),
"KinesisProducerLibSampleConsumer")
.withRegionName(Producer.REGION)
.withInitialPositionInStream(InitialPositionInStream.LATEST);
final Consumer consumer = new Consumer(dataBase, transactionList, monitoringLogic);
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
consumer.logResults();
}
}, 10, 1, TimeUnit.SECONDS);
new Worker.Builder()
.recordProcessorFactory(consumer)
.config(config)
.build()
.run();
}
}