2

ここでマルチスレッドを実行しようとしていますが、DbHandler クラスを使用してデータベースを更新する必要があります

プログラムの実行は、メイン メソッドとスレッド プールを持つコントローラー クラスで開始されます。

public class RunnableController {
// Main method
public static void main(String[] args) throws InterruptedException {
    try {
        RunnableController controller = new RunnableController();
        controller.initializeDb();
        controller.initialiseThreads();
        System.out.println("Polling");
    } catch (Exception e) {
        e.printStackTrace();
    }
}

   private void initialUpdate()
{
    DBhandler dbhandler = new DBhandler();
    dbhandler.updateDb(getOutgoingQueue());
}

private void initialiseThreads() {      
    try {
        threadExecutorRead = Executors.newFixedThreadPool(10);
        PollingSynchronizer read = new PollingSynchronizer(incomingQueue, dbConncetion);
        threadExecutorRead.submit(read);
    } catch (Exception e){
        e.printStackTrace();
    }   
 }
}

新しいデータをフェッチし、シミュレートして更新を行う必要がある私のポーラー クラス:

 public class PollingSynchronizer implements Runnable {
     public PollingSynchronizer(Collection<KamMessage> incomingQueue,
     Connection dbConnection) {
     super();
    this.incomingQueue = incomingQueue;
    this.dbConnection = dbConnection;
 }

  private int seqId;

   public int getSeqId() {
    return seqId;
  }

   public void setSeqId(int seqId) {
   this.seqId = seqId;
 }

  // The method which runs Polling action and record the time at which it is done
    public void run() {
     int seqId = 0;

      while (true) {
        List<KamMessage> list = null;

        try {
           list = fullPoll(seqId);

           if (!list.isEmpty()) {
             seqId = list.get(0).getSequence();
             incomingQueue.addAll(list);
             this.outgoingQueue = incomingQueue;
             System.out.println("waiting 3 seconds");
             System.out.println("new incoming message");
             Thread.sleep(3000);//at this wait I should execute run()

             //when I debug my execution stops here and throws " Class not found Exception "
             // its does not enters the message processor class 
             MessageProcessor processor = new MessageProcessor() {
              //the run method which should fetch the message processor class.
              final public void run() {
               RunnableController.setOutgoingQueue(generate(outgoingQueue));
              }
           };
          new Thread(processor).start();
        }
     } catch (Exception e1) {
        e1.printStackTrace();
     }
  }
 }
}

私のメッセージプロセッサクラス:

 public class MessageProcessor implements Runnable {
private Collection<KpiMessage> fetchedMessages;
private Connection dbConnection;
Statement st = null;
ResultSet rs = null;
PreparedStatement pstmt = null;
private Collection<KamMessage> outgoingQueue;

public Collection<KamMessage> MessageProcessor(Collection<KamMessage> outgoingQueue){
    this.outgoingQueue = outgoingQueue;
    this.dbConnection = dbConnection;
    return outgoingQueue;
}
/**
 * Method for updating new values into database in preference for dummy processing of message
 * @param outgoingQueue 
 * @return 
 */
@SuppressWarnings("javadoc")
public Collection<KamMessage> generate(Collection<KamMessage> outgoingQueue)
{
        for (KamMessage pojoClass : outgoingQueue) {
            KamMessage updatedValue = createKamMsg804(pojoClass);
            System.out.print(" " + pojoClass.getSequence());
            System.out.print(" " + pojoClass.getTableName());
            System.out.print(" " + pojoClass.getAction());
            System.out.print(" " + updatedValue.getKeyInfo1());
            System.out.print(" " + updatedValue.getKeyInfo2());
            System.out.println(" " + pojoClass.getEntryTime());
        }
        return outgoingQueue;
}

/**
 * 
 * @param pojoClass 
 * @return msg
 */
public KamMessage createKamMsg804(KamMessage pojoClass)
{
    if(pojoClass.getAction() == 804){
    pojoClass.setKeyInfo1("ENTITYKEY9");
    pojoClass.setKeyInfo2("STATUSKEY9");
    }
    return pojoClass;
}
private KamMessage convertRecordsetToPojo(ResultSet rs) throws SQLException {

    KamMessage msg = new KamMessage();
    int sequence = rs.getInt("SEQ");
    msg.setSequence(sequence);
    String tablename = rs.getString("TABLENAME");
    msg.setTableName(tablename);
    Timestamp entrytime = rs.getTimestamp("ENTRYTIME");
    Date entryTime = new Date(entrytime.getTime());
    msg.setEntryTime(entryTime);
    Timestamp processingtime=rs.getTimestamp("PROCESSINGTIME");
    if(processingtime!=null){
        Date processingTime = new Date(processingtime.getTime());
        msg.setProcessingTime(processingTime);   
    }
    String keyInfo1 = rs.getString("KEYINFO1");
    msg.setKeyInfo1(keyInfo1);
    String keyInfo2 = rs.getString("KEYINFO2");
    msg.setKeyInfo2(keyInfo2);
    return msg;
}


@Override
 public void run() {

    // TODO Auto-generated method stub

 }

  }

これは、データベースで更新を行う必要がある私の DBhandler クラスです。

         public class DBhandler {
  Connection conn = null;
Statement st = null;
ResultSet rs = null;
PreparedStatement pstmt = null;

public DBhandler(){
    super();
}

/**
 * Method to initialize the database connection
 * @return conn
 * @throws Exception 
 * 
 */
public Connection initializeDB() throws Exception {
    System.out.println("JDBC connection");
    DriverManager.registerDriver(new oracle.jdbc.driver.OracleDriver());
    conn = DriverManager.getConnection("jdbc:oracle:thin:@VM-SALES-
           MB:1521:SALESDB1","bdeuser", "edb"); // Connection for Database SALES-DB1   
    return conn;
}

 //The method for updating Database

  public void updateDb(Collection<KpiMessage> updatedQueue){
    for(KpiMessage pojoClass : updatedQueue){
       //**How the query should be used so that it gets last sequence vale and Updates into   
            Database**
           String query = "UPDATE msg_new_to_bde Set KEYINFO1= ?, KEYINFO2 = ? WHERE SEQ =  and 
           action = 804";   
      }
}
/**
 * Method for Closing the connection
 * @throws Exception 
 *
 */

     public void closeDB() throws Exception { 
    st.close();
    conn.close();
   }

    }

コントローラークラスでupdatedQueueを呼び出して、このクラス(DbHAndler)で更新クエリを使用してデータベースを更新するだけです。

私のプログラム フロー - 3 つのクラスがあります: 1.Controller 2.PollerSynchro 3.Msgprocessor

POJO 形式に変換され、コレクションに格納されるデータベース レコードがあります。これらの POJO を使用して、私のクラスはマルチプロセッシングと更新を一度に実行しようとします。

コントローラ - スレッド プールを持ち、poll メソッドでポーラー クラスを開始 - 完了

ポーラー - 新しい受信メッセージをポーリングし、受信キューに格納する必要があります - 完了

MsgProcessor - 新しい受信メッセージを探して、送信キューから受信キューに渡す必要があります - これも完了

DbHandler- データベースで更新する必要があります。

問題:

今私の問題は

ポーリング スレッドが 3 秒間スリープしている間に、この更新を実装する必要があります -Done

Poller クラスの 2 番目の void run() メソッドのコードでは、送信キューが渡されず、更新のためにメッセージ プロセッサ クラスに供給されます。私の実行フローは、最初の実行メソッドにループバックするだけで、Class exception-Resolved を取得しています

Dbhanler クラスのデータベースでこれを更新する方法

これらの問題を解決するのを手伝ってください。

4

1 に答える 1

4

例外はこの行から発生しているようです (これは MessageProcessor.java の 38 行目ですか?)

return (KpiMsg804) fetchedMessages;

このfetchedMessages時点で は のようですArrayList

于 2013-01-22T09:56:27.593 に答える