1

さまざまなトピックで Kafka をリッスンする単純なアプリケーションを Scala で作成しています。書き込みに関するイベントが発生すると、ElasticSearch にデータを書き留めます。ElasticSearch Java APIのラッパーとしてelastic4sを使用しています。したがって、さまざまなトピックのリッスンは、Futures で実行される並行プロセスとして実装されます。私のアプリのこのコードは次のようになります

Future { container.orangesHandler.runMessagesHandling() }
Future { container.lemonsHandler.runMessagesHandling() }
Future { container.limesHandler.runMessagesHandling() }
Future { container.mandarinsHandler.runMessagesHandling() }

ElasticSearch に書き込むために、 ElasticSearch private val client = ElasticClient.transport(uri)に接続し、データを書き込むElasticClient を含むヘルパー オブジェクトがあります。書き込みを実装するために、このオブジェクトには次のメソッドがあります。

def update(indexName: String, objectType: String, data: String) =
        checkForIndexAndTypeExistence(indexName, objectType).flatMap { response =>
            client.execute {
                index into indexName -> objectType source data
            }
        }

private def checkForIndexAndTypeExistence(indexName: String, objectType: String) =
    client.execute(indexExists(indexName)).flatMap { response =>
        if (!response.isExists)
            client.execute(create index indexName mappings (mapping(objectType) templates(dynamicTemplate)))
        else
            checkForTypeExistence(indexName, objectType)
    }

private def checkForTypeExistence(indexName: String, objectType: String) =
    client.execute(typesExist(objectType) in indexName).flatMap { response =>
        if (!response.isExists())
            client.execute(putMapping(indexName / objectType) templates(dynamicTemplate))
        else Future(true)
    }

問題は、3 つの先物を実行するとすべて正常に動作することです。しかし、4 番目の Future を追加すると機能しません。具体的には、クライアントは機能しません。完了しないPromiseで応答するだけです。1 つの興味深い詳細: クアッドコア プロセッサを搭載したコンピューターでこのアプリを実行すると、4 つの Future で問題なく動作します。

4

0 に答える 0