0

一方がポーリングしているときに、もう一方が処理後に新しい受信データを更新する必要があるスレッド プールを処理する方法。プログラムの実行は、メイン メソッドとスレッド プールを持つコントローラー クラスで行われます。

メインクラスのコントローラー

   public static void main(String[] args) throws InterruptedException {
    RunnableController controller = new RunnableController();
    Accumulator acque = new Accumulator();
        controller.initializeDb();
        controller.initialiseThreads(acque);
        controller.initialUpdate(acque);    

}

Polling クラスの Run メソッド:

     public void run() {
    int seqId = 0;
    List<KpiMessage> list = null;
    while(true) {
        try{
            list = fullPoll(seqId);
            if (!list.isEmpty()) {
            accumulator.manageIngoing(list);            
            }
        } catch (Exception e){
            e.printStackTrace();                
        }
    }
}

  public List<KpiMessage> fullPoll(int lastSeq) throws Exception {
    Statement st = dbConnection.createStatement();
    System.out.println("Polling");
 ResultSet rs = st.executeQuery("Select * from msg_new_to_bde where ACTION = 804 and SEQ >" + 
   lastSeq + "order by SEQ DESC");  

    return pojoCol;
}

処理の実行方法:

     public void run() {

    try {
        generate(accumulator.outgoingQueue);
        accumulator.manageOutgoing(accumulator.outgoingQueue, dbConnection);
         } catch (Exception e) {
        e.printStackTrace();
    }
   }
  } 

データベースへの更新方法

 public void updateDb(Collection<KpiMessage> updatedQueue, Connection dbConnection) throws  
  SQLException{ 
    for(KpiMessage pojoClass : updatedQueue){
            Statement stmtupd = dbConnection.createStatement();
        System.out.println("Updating");
    String query = "UPDATE msg_new_to_bde SET KEYINFO1= 'Processed', KEYINFO2 = 'Updated'
   WHERE ACTION = 804"; 

           stmtupd.executeUpdate(query);**My Execution stops here**

最後に、これらすべてのキューを維持するためのアキュムレータ クラス:

   public boolean isUsed = false;
    public synchronized void manageIngoing(List<KpiMessage> list){

    if(this.isUsed){                
        try {
            wait(); 
            System.out.println("first wait");
        } catch (Exception e1) {
            e1.printStackTrace();
        }
    }
    System.out.println("recived pass after update");
    this.getIncomingQueue().addAll(list);
     //incoming queue copied to outgoing queue
    this.setOutgoingQueue(this.getIncomingQueue());             
    System.out.println("waiting");
    System.out.println("new incoming message");
    this.isUsed = false;
    notifyAll();

}

/**
 * Method which handles synchronization using wait and notify for outgoing messages after   
  polling
 * @param outgoingQueue
 * @param dbConnection 
 */

  public synchronized void manageOutgoing(Collection<KpiMessage> outgoingQueue, Connection 
dbConnection){
    if(!this.isUsed)
    {
        try {
            System.out.println("second wait");
            wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    this.isUsed = true;
        DBhandler dbhandler = new DBhandler();
    try {
        dbhandler.updateDb(getOutgoingQueue(), dbConnection);
    } catch (SQLException e) {
        e.printStackTrace();
    }
    notifyAll();
}
 }

私の仕事と質問は:

1.コントローラーはスレッドの両方を処理する必要があります ポーラーとプロセッサーとアキュムレーターは受信キューと送信キューを処理し、最終的に処理後に DB を更新するために更新されたキューにフィードされます

2.ここの私のクラスはポーリングを1回だけ行い、更新できず、実行が停止します

3.私のwait()、notifyALL()ハンドルはここで正しいですか。

ここでポーリングと更新を繰り返すにはどうすればよいですか?

4

1 に答える 1

3

5 つの異なる質問があるこの複雑な設定では、すべてに対する完全な答えはない可能性があります。それらを待っている間、 java.util.concurrent が提供するもの、特に読み取りと書き込みのブロックをサポートする並行コレクションについて読む必要があります。wait()andnotify()は、JDK クラスが十分でない場合にのみ使用してください。

于 2013-02-15T10:49:36.290 に答える