9

Apache Camel SQL バッチ挿入プロセスを使用しています。

  1. 私のアプリケーションは、約 2000 のチケットを含む Active MQ からチケットを読み取っています。

  2. バッチを 100 として更新しました。

  3. 私が発砲しているクエリは次のとおりです。

    sql.subs.insertCdr= insert into subscription_logs(master_id,request_type,req_desc,msisdn,amount,status,resp_code,resp_desc,channel,transaction_id,se_mode,be_mode,sub_type,sub_timeleft,srv_name,srv_id,start_date,end_date,operator,circle,country,time_offset,retry_count,user_status,previous_state,se_reqrecvtime,se_respsenttime,be_reqsenttime,be_resprecvtime,cp_id,cp_name,sub_srvname,sub_srvid,msg_senderid,msg_text,call_back_url,call_back_resp,client_ip,se_sysIp,language,cp_callbackurlhittime,action,alert,notification_url,notification_resp) values(:#masterId, :#requestType,:#reqDesc,:#msisdnCdr,:#price,:#status,:#responseCode,:#reason,:#channel,:#transactionId,:#seMode,:#beMode,:#subType,:#subTimeLeft,:#serviceName,:#serviceId,:#subStartDate,:#cdrEndDate,:#operator,:#circle,:#country,:#timeOffset,:#retryCount,:#userStatus,:#previousState,:#seReqRecvTime,:#seRespSentTime,:#beReqSentTime,:#beRespRecvTime,:#cpId,:#cpName,:#subServiceName,:#subServiceId,:#shortCode,:#message,:#callBackUrl,:#callBackResp,:#clientIp,:#seSysIp,:#language,:#cpCallbackUrlHitTime,:#action,:#alert,:#notificationUrl,:#notificationResponse)

  4. SQL バッチ ルートは次のように定義されます。

    <pipeline>
       <log message="Going to insert in database"></log>
       <transform>
          <method ref="insertionBean" method="subsBatchInsertion"></method>
       </transform>
       <choice>
           <when>
               <simple>${in.header.subsCount} == ${properties:batch.size}</simple>
               <to uri="sql:{{sql.subs.insertCdr}}?batch=true"></to>
               <log message="Inserted rows ${body}"></log>
           </when>
       </choice>
    </pipeline>
    
  5. 以下は私のJavaコードです:

    public List<Map<String, Object>> subsBatchInsertion(Exchange exchange) {
    if (subsBatchCounter > batchSize) {
        subsPayLoad.clear();
        subsBatchCounter = 1;
    }
    subsPayLoad.add(generateInsert(exchange.getIn().getBody(SubscriptionCdr.class)));
    exchange.getIn().setHeader("subsCount", subsBatchCounter);
    subsBatchCounter++;
    return subsPayLoad;
    }
    
    public Map<String, Object> generateInsert(Cdr cdr) {
    Map<String, Object> insert = new HashMap<String, Object>();
    try {
        insert = BeanUtils.describe(cdr);
    } catch (Exception e) {
        Logger.sysLog(LogValues.error, this.getClass().getName()+" | "+Thread.currentThread().getStackTrace()[1].getMethodName(), coreException.GetStack(e));
    } 
    for (String name : insert.keySet()) {
        Logger.sysLog(LogValues.APP_DEBUG, this.getClass().getName(), name + ":"+ insert.get(name) + "\t");
    }
    return insert;
    }
    

ここでの問題は、ActiveMQ に約 120 のチケットがある場合、SQL バッチが値をデータベースに挿入することを開始する必要があることです。しかし、それにはもっと時間がかかります。ActiveMQ に約 500 のチケットがあると、挿入プロセスが開始されます。挿入プロセスの最適化に役立つ人はいますか? または他のアプローチ?

4

1 に答える 1

6

問題は、ActiceMQ コンシューマー番号にありました。

コンシューマー数を 1 に戻すと、バッチは時間通りに更新されます。

実際、コンシューマー数が 10 のとき、チケットは並行して消費されました。つまり、10 人のコンシューマーで activemq から消費された 100 のチケットの場合、各コンシューマーで約 10 のチケットがあり、より多くの時間が追加されます。コンシューマーのいずれかが 100 チケットを取得すると、バッチが更新されます。

そのため、コンシューマ カウントを 1 に変更すると、すべてのチケットが単一のコンシューマによって処理されるようになり、バッチ更新が正常に実行されます。

于 2016-11-07T07:15:33.310 に答える