2

基本的に、3 つのクラスを作成しました。

public void run() {  
int seqId = 0;  
while(true) {  
    List<KamMessage> list = null;  
    try {  
        list = fullPoll(seqId);  
    } catch (Exception e1) {  
        e1.printStackTrace();  
    }  
    if (!list.isEmpty()) {  
        seqId = list.get(0).getSequence();  
        incomingMessages.addAll(list);  
        System.out.println("waiting 3 seconds");  
        System.out.println("new incoming message");  
    }  
    try {  
        Thread.sleep(3000);  
        System.out.println("new incoming message");  
    } catch (InterruptedException e) {  
        e.printStackTrace();  
    }   
   }  
 }  
 public List<KamMessage> fullPoll(int lastSeq) throws Exception {  
 Statement st = dbConnection.createStatement();  
 ResultSet rs = st.executeQuery("select * from msg_new_to_bde where ACTION =  804 and SEQ >" +  
 lastSeq + "order by SEQ DESC");        
 List<KamMessage> pojoCol = new ArrayList<KamMessage>();  
  while (rs.next()) {  
    KamMessage filedClass = convertRecordsetToPojo(rs);  
    pojoCol.add(filedClass);  
  }  
for (KamMessage pojoClass : pojoCol) {  
    System.out.print(" " + pojoClass.getSequence());  
    System.out.print(" " + pojoClass.getTableName());  
    System.out.print(" " + pojoClass.getAction());  
    System.out.print(" " + pojoClass.getKeyInfo1());  
    System.out.print(" " + pojoClass.getKeyInfo2());  
    System.out.println(" " + pojoClass.getEntryTime());  
   }             
return pojoCol;  
  }   

クラスは次のとおりです。 1.Poller - ポーリングを実行し、db からコントローラーに新しいデータを渡します。

2.コントローラー - このクラスにはスレッドプールがあり、ポーラーを同時に呼び出し、プロセッサーから要求される新しいデータを持っています

3.プロセッサ - このクラスは、新しいデータを探して処理し、コントローラに返す必要があります。

だから今私の問題は、第3フェーズを実装する方法です...

ここに私のコントローラクラスがあります:

public class RunnableController {  

/** Here This Queue initializes the DB and have the collection of incoming message
 *                    
 */  
  private static Collection<KpiMessage> incomingQueue = new ArrayList<KpiMessage>();  
  private Connection dbConncetion;  
  public ExecutorService threadExecutor;  
  private void initializeDb()  
  {  
    //catching exception must be adapted - generic type Exception prohibited  
    DBhandler conn = new DBhandler();  
    try {  
        dbConncetion = conn.initializeDB();  
    } catch (Exception e) {  
        // TODO Auto-generated catch block  
        e.printStackTrace();  
     }  
  }  


private void initialiseThreads()  
{         
    try {  

        threadExecutor = Executors.newFixedThreadPool(10);  
            PollingSynchronizer read = new PollingSynchronizer(incomingQueue, dbConncetion);  
        threadExecutor.submit(read);  

    }catch (Exception e){  
    e.printStackTrace();  
    }  

}  

@SuppressWarnings("unused")  
private void shutDownThreads()  
{         
    try {  
        threadExecutor.shutdown();  
        //DB handling should be moved to separate DB class  
        dbConncetion.close();  

    }catch (Exception e){  
    e.printStackTrace();  
    }  

}  

/** Here This Queue passes the messages and have the collection of outgoing message 
 *  
 */  

//private Collection<KpiMessage> outgingQueue = new ArrayList<KpiMessage>();  
//have to implement something here for future  

     public static void main(String[] args) throws InterruptedException {  
     RunnableController controller = new RunnableController();  

    System.out.println(incomingQueue.size());  

    controller.initializeDb();  
    controller.initialiseThreads();  

    Thread.sleep(3000);  
    System.out.println("Polling");  

  }  

} 
4

1 に答える 1

4

そのためには、単純なArrayListの代わりに、BlockingQueueを使用することをお勧めします。incomingQueue変数のタイプを変更するだけです。次に、別のスレッド(またはスレッドプール)に次のようなことをさせることができます

//pseudocode
while (true) {
   // it polls data from the incomingQueue that shares with the producers
    KpiMessage message = this.incomingQueue.take()

   //Then process the message and produces an output... you can put that output in a different queue as well for other part of the code to pick it up
}

BlockingQueuesの良い例は、ここhttp://www.javamex.com/tutorials/blockingqueue_example.shtmlにあります。

于 2013-01-10T15:48:42.347 に答える