1
Have written a service which reads events from events hub, in local system its working but when deployed as an App service to cloud not able to reads the events.
Below is the stack trace while reading events from Azure eventhub.

> 2020-04-07 09:42:59.021  INFO 54245 --- [pool-4-thread-4]
> c.m.a.eventhubs.impl.BaseLinkHandler     : closeSession for
> clientName[cbs], linkName[cbs:sender], errorCondition[null],
> errorDescription[null] 2020-04-07 09:42:59.022  INFO 54245 ---
> [pool-4-thread-4] c.m.a.eventhubs.impl.BaseLinkHandler     :
> onLinkLocalClose clientName[cbs], linkName[cbs:receiver],
> errorCondition[null], errorDescription[null] 2020-04-07 09:42:59.022 
> INFO 54245 --- [pool-4-thread-4]
> c.m.azure.eventhubs.impl.SessionHandler  : onSessionLocalClose
> connectionId[cbs-session], entityName[MF_7a461b_1586238177759],
> condition[Error{condition=null, description='null', info=null}]
> 2020-04-07 09:42:59.061  INFO 54245 --- [pool-4-thread-4]
> c.m.a.eventhubs.impl.BaseLinkHandler     : onLinkRemoteClose
> clientName[cbs], linkName[cbs:sender], errorCondition[null],
> errorDescription[null] 2020-04-07 09:42:59.061  INFO 54245 ---
> [pool-4-thread-4] c.m.a.eventhubs.impl.BaseLinkHandler     :
> processOnClose clientName[cbs], linkName[cbs:sender],
> errorCondition[null], errorDescription[null] 2020-04-07 09:42:59.061 
> INFO 54245 --- [pool-4-thread-4] c.m.a.eventhubs.impl.BaseLinkHandler 
> : onLinkRemoteClose clientName[cbs], linkName[cbs:receiver],
> errorCondition[null], errorDescription[null] 2020-04-07 09:42:59.061 
> INFO 54245 --- [pool-4-thread-4] c.m.a.eventhubs.impl.BaseLinkHandler 
> : processOnClose clientName[cbs], linkName[cbs:receiver],
> errorCondition[null], errorDescription[null] 2020-04-07 09:42:59.061 
> INFO 54245 --- [pool-4-thread-4] c.m.a.e.impl.RequestResponseOpener   
> : requestResponseChannel.onClose complete
> clientId[MF_7a461b_1586238177759], session[cbs-session], link[cbs],
> endpoint[$cbs] 2020-04-07 09:42:59.062  INFO 54245 ---
> [pool-4-thread-4] c.m.a.eventhubs.impl.MessagingFactory    :
> messagingFactory[MF_7a461b_1586238177759],
> hostName[products-dev.servicebus.windows.net], info[cbsChannel closed]
> 2020-04-07 09:42:59.062  INFO 54245 --- [pool-4-thread-4]
> c.m.a.eventhubs.impl.ConnectionHandler   : onConnectionRemoteClose
> hostname[products-dev.servicebus.windows.net:5671],
> connectionId[MF_7a461b_1586238177759], errorCondition[null],
> errorDescription[null] 2020-04-07 09:42:59.062  WARN 54245 ---
> [pool-4-thread-4] c.m.a.eventhubs.impl.MessagingFactory    :
> onConnectionError messagingFactory[MF_7a461b_1586238177759],
> hostname[products-dev.servicebus.windows.net], error[null] 2020-04-07
> 09:42:59.062  INFO 54245 --- [pool-4-thread-4]
> c.m.a.eventhubs.impl.ConnectionHandler   : onTransportClosed
> hostname[products-dev.servicebus.windows.net:5671],
> connectionId[MF_7a461b_1586238177759], error[n/a] 2020-04-07
> 09:42:59.062  INFO 54245 --- [pool-4-thread-4]
> c.m.a.eventhubs.impl.CustomIOHandler     : onTransportClosed
> name[MF_7a461b_1586238177759],
> hostname[products-dev.servicebus.windows.net:5671] 2020-04-07
> 09:42:59.062  INFO 54245 --- [pool-4-thread-4]
> c.m.a.eventhubs.impl.ConnectionHandler   : onConnectionUnbound
> hostname[products-dev.servicebus.windows.net:5671],
> connectionId[MF_7a461b_1586238177759], state[CLOSED],
> remoteState[CLOSED] 2020-04-07 09:42:59.062  INFO 54245 ---
> [pool-4-thread-4] c.m.azure.eventhubs.impl.SessionHandler  :
> onSessionFinal connectionId[MF_7a461b_1586238177759],
> entityName[cbs-session], condition[null], description[null] 2020-04-07
> 09:42:59.062  INFO 54245 --- [pool-4-thread-4]
> c.m.azure.eventhubs.impl.SessionHandler  : onSessionFinal
> connectionId[MF_7a461b_1586238177759], entityName[products],
> condition[null], description[null] 2020-04-07 09:42:59.062  INFO 54245
> --- [pool-4-thread-4] c.m.a.eventhubs.impl.ConnectionHandler   : onConnectionFinal hostname[products-dev.servicebus.windows.net:5671],
> connectionId[MF_7a461b_1586238177759], errorCondition[null],
> errorDescription[null] 2020-04-07 09:42:59.062  WARN 54245 ---
> [pool-4-thread-4] c.m.a.eventhubs.impl.MessagingFactory    :
> messagingFactory[MF_7a461b_1586238177759],
> hostName[products-dev.servicebus.windows.net], message[stopping the
> reactor because thread was interrupted or the reactor has no more
> events to process.]


Code:

イベント プロセッサ クラスを EventProcessorHost のインスタンスに登録すると、イベント処理が開始されます。ホスト インスタンスは、イベント ハブの一部のパーティションでリースを取得し、他のホスト インスタンスから一部を盗む可能性があります。これにより、すべてのホスト インスタンスでパーティションが均等に分散されます。リースされたパーティションごとに、ホスト インスタンスは提供されたイベント プロセッサ クラスのインスタンスを作成し、そのパーティションからイベントを受信して​​イベント プロセッサ インスタンスに渡します。

EventProcessorHost には 2 つのエラー通知システムがあります。レシーバーの障害など、特定のパーティションに関連するエラーの通知は、onError メソッドを介してそのパーティションのイベント プロセッサ インスタンスに配信されます。初期化の失敗など、特定のパーティションに関連付けられていないエラーの通知は、EventProcessorOptions オブジェクトを介して指定された一般的な通知ハンドラーに配信されます。このような通知ハンドラを提供する必要はありませんが、提供しないと、特定のエラーが発生したことを認識できない場合があります。

@RestController public class ReceiveEventsController {

プライベート静的最終ロガー logger = LoggerFactory.getLogger(ReceiveEventsController.class);

@Value("${spring.cloud.azure.eventhub.namespace}")
private String namespaceName;

@Value("${spring.cloud.azure.eventhub.name}")
private String eventHubName;

@Value("${spring.cloud.azure.eventhub.sas.key.name}")
private String sasKeyName;

@Value("${spring.cloud.azure.eventhub.sas.key.value}")
private String sasKey;

@Value("${spring.cloud.stream.bindings.input.group}")
private String consumerGroupName;

@Value("${spring.cloud.azure.eventhub.storage.connection.string}")
private String storageConnectionString;

@Value("${spring.cloud.azure.eventhub.checkpoint-container}")
private String storageContainerName;

@Value("${spring.cloud.azure.eventhub.storage.hostname.prefix}")
private String hostNamePrefix;

@PostMapping("/receive/events")
public String postMessage() throws EventHubException, IOException, InterruptedException, ExecutionException, URISyntaxException {

    URI uri = new URI("sb://products-dev.servicebus.windows.net");
    ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder()
            .setEndpoint(uri)
            .setEventHubName(eventHubName)
            .setSasKeyName(sasKeyName)
            .setSasKey(sasKey);
    EventProcessorHost host = EventProcessorHost.EventProcessorHostBuilder
            .newBuilder(EventProcessorHost.createHostName(hostNamePrefix), consumerGroupName)
            .useAzureStorageCheckpointLeaseManager(storageConnectionString, storageContainerName, null)
            .useEventHubConnectionString(eventHubConnectionString.toString(), eventHubName)
            .build();

    logger.info("Registering host named " + host.getHostName()+ "Endpoint " + eventHubConnectionString.getEndpoint());
    EventProcessorOptions options = new EventProcessorOptions();
    options.setExceptionNotification(new ErrorNotificationHandler());

    host.registerEventProcessor(EventProcessor.class, options)
    .whenComplete((unused, e) ->
    {
        if (e != null)
        {
            logger.info("Failure while registering: " + e.toString());
            if (e.getCause() != null)
            {
                logger.info("Inner exception: " + e.getCause().toString());
            }
        }
    })
    .thenAccept((unused) ->
    {
        logger.info("Press enter to stop.");
        try 
        {
            System.in.read();
        }
        catch (Exception e)
        {
            logger.info("Keyboard read failed: " + e.toString());
        }
    })
    .thenCompose((unused) ->
    {
        return host.unregisterEventProcessor();
    })
    .exceptionally((e) ->
    {
        logger.info("Failure while unregistering: " + e.toString());
        if (e.getCause() != null)
        {
            logger.info("Inner exception: " + e.getCause().toString());
        }
        return null;
    })
    .get();

    logger.info("End of PRODUCT");
    return "Event Received";
}

}

4

0 に答える 0