1

クライアントに多数の電子メールを送信するアプリケーションを修正しようとしています。送信はマルチスレッドの Java アプリ (Producer-Consumer モデル) によって行われ、Producer はデータベースからメッセージのリストを呼び出し、Consumer は Python スクリプトを呼び出してメールを送信します。

ある日から別の日に、私の上司は私に、プログラムが動かなくなったと言いました。 .. そして今では、以前の 1000 件の数ではなく、1 時間あたり 2 ~ 3 件のメッセージしか送信しません。

開発者は利用できなくなったので、自分で修正する必要があります。

データベースをクリアしました。データベースには「過去」に関するデータしか含まれておらず、不要であり、巨大でした... 6GB のデータ、7M 行、非常に低速でした。しかし、問題は残りました。

今、私は Java アプリで画面にログオンしています。次の行が表示されます。

try
    {
      System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" is sleeping for "+consumerSleepTime+" ms ::");
      Thread.sleep(consumerSleepTime);
      System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" sleeptime over in try block ::");
    }
    catch (InterruptedException e)
    {
      e.printStackTrace();
      System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" was interrupted ::");
      Thread.currentThread().interrupt();
      break main_while_loop;
    }

眠りに「とどまる」と、私は見る:

11:24:15:: consumer_thread_1 は 100 ミリ秒スリープしています ::

他に何も起こりません。「スリープタイムオーバー」もスタックデータもありません。スレッドのスリープに永遠にかかるように見えます...ただし、プロセスを実行し続けると、ランダムな時間が数回経過した後、スレッドは継続し、再びスリープします...何時間も。

何か案は?

ここにいくつかのファイルがあります。全体の構造を理解するために必要かもしれません...

runjava

#!/bin/sh
java -cp /usr/share/java/mysql-connector-java.jar:. newthread

newthread.java

import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.sql.SQLException;
import java.sql.Driver;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
import java.io.Closeable;
import java.io.IOException;


class ReentrantLockTest
{
  int maxSize = 100;
  ArrayList boundedBuffer = new ArrayList( maxSize );

  ReentrantLock lock = new ReentrantLock(false);
  Condition telire_var = lock.newCondition(),
            uresre_var = lock.newCondition();

  volatile boolean shutdown = false;
  long producerSleepTime = 5000;
  long consumerSleepTime = 100;

  long mainDriverWaitTime = 1000;

  int totalProduerThreads = 1;

  int totalConsumerThreads = 2;
  long criticalSectionDelay = 0;
  int currentIndex = -1;
  String sqlhost = "localhost";
  String sqluser = "user";
  String sqlpassword = "pass";
  String sqldb = "db";
  String mysqlUrl = "";

  public ReentrantLockTest()
  {
    System.out.println(sqluser);
    java.text.DateFormat shortTime = java.text.DateFormat.getTimeInstance(java.text.DateFormat.MEDIUM);

    ArrayList<Thread> threads = new ArrayList<Thread>();
    ThreadGroup tg = new ThreadGroup("ReentrantLockTest Thread Group");
    // get sql
    mysqlUrl = "jdbc:mysql://"+ sqlhost +"/"+ sqldb +"?autoReconnect=true";

    try
    {
      Class.forName("com.mysql.jdbc.Driver").newInstance();
    }
    catch (Exception e)
    {
      System.out.println(e);
      return;
    }

    for (int i = 1; i<=totalProduerThreads; i++)
    {
      threads.add( new Thread( tg , new ReentrantLockTest.Producer() , "producer_thread_"+i) );
    }
    for (int i = 1; i<=totalConsumerThreads; i++)
    {
      threads.add( new Thread( tg , new ReentrantLockTest.Consumer() , "consumer_thread_"+i) );
    }
    for (Thread t: threads)
      t.start();

    try
    {
      while (true)
      {
        for (Thread t: threads)
          t.resume();
          Thread.sleep(mainDriverWaitTime);
      }
    }
    catch (InterruptedException e)
    {
      e.printStackTrace();
    }
    finally
    {
      shutdown = true;
      System.out.println(shortTime.format(new java.util.Date()) + ":: ReentrantLockTest - setting shutdown to false ::");
    }

    System.out.println(":: ReentrantLockTest - signalling interrupt and waiting for "+tg.activeCount()+" threads to die ::");

    for (Thread t: threads)
    {
      System.out.println(shortTime.format(new java.util.Date()) + ":: Interrupting "+t.getName()+" ::");
      try
      {
        t.interrupt();
      }
      catch (Exception e)
      {
        e.printStackTrace();
      }
    }
    for (Thread t: threads)
    {
      try
      {
        StringBuilder sb = new StringBuilder( t.getName() );
        System.out.println(shortTime.format(new java.util.Date()) + ":: Waiting for "+sb+" to die ::");
        t.join();
        System.out.println(shortTime.format(new java.util.Date()) + ":: "+sb+" is dead ::" );
      }
      catch (InterruptedException e)
      {
        e.printStackTrace();
      }
    }
  }

  public class Consumer implements Runnable
  {
    java.text.DateFormat shortTime = java.text.DateFormat.getTimeInstance(java.text.DateFormat.MEDIUM);
    private void close(Closeable c)
    {
      if (c != null)
      {
        try
        {
          c.close();
        }
        catch (IOException e)
        {
          // ignored
        }
      }
    }

    public void run()
    {
      System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" has been started ::");
      main_while_loop: while( shutdown == false && !Thread.currentThread().isInterrupted() )
      {
        try
        {
          lock.lock();
          System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" acquired the lock ::");
          Thread.sleep(criticalSectionDelay);

          if (currentIndex == -1)
          {
            uresre_var.signal();
            System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" signalling waiting_on_full_buffer ::");
          }

          while(currentIndex == -1)
          {
            System.out.println(shortTime.format(new java.util.Date()) + ":: buffer is empty - "+Thread.currentThread().getName()+" is going to wait ::");
            telire_var.await();
          }//while condition waiting_on_empty_buffer

          // sendmail
          String mail_id = boundedBuffer.remove(currentIndex).toString();
          currentIndex--;
          Process proc = null;

          System.out.println(shortTime.format(new java.util.Date()) + ":: buffer size: "+ currentIndex);

          lock.unlock();
          System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" released the lock ::");

          try
          {
            System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName() +" starts python ("+ mail_id +")");

        //this randomwaiting on sending is not needed
        /*Random generator = new Random();
            int randomWait = generator.nextInt(1000) + 100;
            Thread.sleep(randomWait);
            System.out.println(" => "+ randomWait);*/

            proc = Runtime.getRuntime().exec("python ./new_mail.py "+ mail_id);
            proc.waitFor();
            System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName() +" ends python");
          }
          catch (Exception e)
          {
            e.printStackTrace();
          }
          finally
          {
            if (proc != null)
            {
              close(proc.getOutputStream());
              close(proc.getInputStream());
              close(proc.getErrorStream());
              proc.destroy();
              System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName() +" close python proc");
            }
          }
        }
        catch (InterruptedException e)
        {
          e.printStackTrace();
          System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" was interrupted ::");
          Thread.currentThread().interrupt();
          break main_while_loop;
        }
        try
        {
          System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" is sleeping for "+consumerSleepTime+" ms ::");
          Thread.sleep(consumerSleepTime);
          System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" sleeptime over in try block ::");
        }
        catch (InterruptedException e)
        {
          e.printStackTrace();
          System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" was interrupted ::");
          Thread.currentThread().interrupt();
          break main_while_loop;
        }
        System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" sleeptime over ::");
      }//end while: main_while_loop

      System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" has been shutdown ::");
    }//end run
  }

  public class Producer implements Runnable
  {
    java.text.DateFormat shortTime = java.text.DateFormat.getTimeInstance(java.text.DateFormat.MEDIUM);
    public void run()
    {
      System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" has been started ::");
      main_while_loop: while( shutdown == false && !Thread.currentThread().isInterrupted() )
      {
        try
        {
          lock.lock();
          System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" acquired the lock ::");

          while (currentIndex != -1)
          {
            System.out.println(shortTime.format(new java.util.Date()) + ":: a bufferben van valamennyi - "+Thread.currentThread().getName()+" varakozik ::");
            uresre_var.await();
          }//while condition waiting_on_full_buffer

          Thread.sleep(criticalSectionDelay);

          Connection con = null;
          Statement st = null;
          ResultSet rs = null;
          try
          {
            con = DriverManager.getConnection(mysqlUrl, sqluser, sqlpassword);
            st = con.createStatement();
            rs = st.executeQuery("SELECT mail_id FROM mail_queue WHERE status = 0 OR (status BETWEEN 2 AND 100 AND proof_counter <= 3 AND UNIX_TIMESTAMP() - proof_stamp > 15*60 ) ORDER BY status, mail_id DESC LIMIT "+ maxSize);

            boundedBuffer = new ArrayList(maxSize);
            currentIndex = -1;

            while(rs.next())
            {
              String mail_id = Integer.toString(rs.getInt(1));
              boundedBuffer.add(mail_id);
              currentIndex++;
            }
            con.close();
          }
          catch (Exception err)
          {
            System.out.println(err);
            break;
          }

          telire_var.signal();
          System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" signalling waiting_on_empty_buffer ::");
        }
        catch (InterruptedException e)
        {
          e.printStackTrace();
          System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" was interrupted ::");
          Thread.currentThread().interrupt();
          break main_while_loop;
        }
        finally
        {
          lock.unlock();
          System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" released the lock ::");
        }

        try
        {
          System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" is sleeping for "+producerSleepTime+" ms ::");
          Thread.sleep(producerSleepTime);
        }
        catch (InterruptedException e)
        {
          e.printStackTrace();
          System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" was interrupted ::");
          Thread.currentThread().interrupt();
          break main_while_loop;
        }
      }

      System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" has been shutdown ::");
    }
  }
}

class newthread
{
  public static void main(String[] args)
  {
    new ReentrantLockTest();
  }
}

どんな助けでも大歓迎です!!

編集:

スレッド ダンプは次のとおりです。

2012-07-05 12:15:06
Full thread dump OpenJDK 64-Bit Server VM (14.0-b16 mixed mode):

"SIGHUP handler" daemon prio=10 tid=0x00007f7bb0001000 nid=0x838 runnable [0x00007f7beb205000]
   java.lang.Thread.State: RUNNABLE
        at java.lang.Terminator$1.handle(Terminator.java:52)
        at sun.misc.Signal$1.run(Signal.java:212)
        at java.lang.Thread.run(Thread.java:636)

"consumer_thread_2" prio=10 tid=0x00007f7d940f4800 nid=0x68c runnable [0x00007f7beb51b000]
   java.lang.Thread.State: RUNNABLE
        at java.io.PrintStream.write(PrintStream.java:446)
        - locked <0x00007f7d137703b0> (a java.io.PrintStream)
        at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:220)
        at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:290)
        at sun.nio.cs.StreamEncoder.flushBuffer(StreamEncoder.java:103)
        - locked <0x00007f7d137704a8> (a java.io.OutputStreamWriter)
        at java.io.OutputStreamWriter.flushBuffer(OutputStreamWriter.java:185)
        at java.io.PrintStream.write(PrintStream.java:494)
        - locked <0x00007f7d137703b0> (a java.io.PrintStream)
        at java.io.PrintStream.print(PrintStream.java:636)
        at java.io.PrintStream.println(PrintStream.java:773)
        - locked <0x00007f7d137703b0> (a java.io.PrintStream)
        at ReentrantLockTest$Consumer.run(newthread.java:151)
        at java.lang.Thread.run(Thread.java:636)

"consumer_thread_1" prio=10 tid=0x00007f7d940f4000 nid=0x68b waiting on condition [0x00007f7beb61c000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f7d137acee0> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:871)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1201)
        at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214)
        at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290)
        at ReentrantLockTest$Consumer.run(newthread.java:150)
        at java.lang.Thread.run(Thread.java:636)

"producer_thread_1" prio=10 tid=0x00007f7d940e5000 nid=0x68a waiting on condition [0x00007f7beb71d000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f7d137acf38> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
        at ReentrantLockTest$Producer.run(newthread.java:250)
        at java.lang.Thread.run(Thread.java:636)

"Low Memory Detector" daemon prio=10 tid=0x00007f7d940af800 nid=0x688 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"CompilerThread1" daemon prio=10 tid=0x00007f7d940ac800 nid=0x687 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"CompilerThread0" daemon prio=10 tid=0x00007f7d940aa800 nid=0x686 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" daemon prio=10 tid=0x00007f7d940a8800 nid=0x685 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" daemon prio=10 tid=0x00007f7d94087800 nid=0x683 in Object.wait() [0x00007f7bebd36000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x00007f7d13761210> (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:133)
        - locked <0x00007f7d13761210> (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:149)
        at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:177)

"Reference Handler" daemon prio=10 tid=0x00007f7d94085800 nid=0x682 in Object.wait() [0x00007f7bebe37000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x00007f7d13761078> (a java.lang.ref.Reference$Lock)
        at java.lang.Object.wait(Object.java:502)
        at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:133)
        - locked <0x00007f7d13761078> (a java.lang.ref.Reference$Lock)

"main" prio=10 tid=0x00007f7d94007000 nid=0x673 sleeping[0x00007f7d99285000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at ReentrantLockTest.<init>(newthread.java:81)
        at newthread.main(newthread.java:320)

"VM Thread" prio=10 tid=0x00007f7d94080800 nid=0x681 runnable

"GC task thread#0 (ParallelGC)" prio=10 tid=0x00007f7d94011800 nid=0x674 runnable

"GC task thread#1 (ParallelGC)" prio=10 tid=0x00007f7d94013000 nid=0x675 runnable

"GC task thread#2 (ParallelGC)" prio=10 tid=0x00007f7d94015000 nid=0x676 runnable

"GC task thread#3 (ParallelGC)" prio=10 tid=0x00007f7d94017000 nid=0x677 runnable

"GC task thread#4 (ParallelGC)" prio=10 tid=0x00007f7d94018800 nid=0x678 runnable

"GC task thread#5 (ParallelGC)" prio=10 tid=0x00007f7d9401a800 nid=0x679 runnable

"GC task thread#6 (ParallelGC)" prio=10 tid=0x00007f7d9401c800 nid=0x67a runnable

"GC task thread#7 (ParallelGC)" prio=10 tid=0x00007f7d9401e000 nid=0x67b runnable

"GC task thread#8 (ParallelGC)" prio=10 tid=0x00007f7d94020000 nid=0x67c runnable

"GC task thread#9 (ParallelGC)" prio=10 tid=0x00007f7d94022000 nid=0x67d runnable

"GC task thread#10 (ParallelGC)" prio=10 tid=0x00007f7d94023800 nid=0x67e runnable

"GC task thread#11 (ParallelGC)" prio=10 tid=0x00007f7d94025800 nid=0x67f runnable

"GC task thread#12 (ParallelGC)" prio=10 tid=0x00007f7d94027800 nid=0x680 runnable

"VM Periodic Task Thread" prio=10 tid=0x00007f7d940b2000 nid=0x689 waiting on condition

JNI global references: 1000

Heap
12:15:06:: consumer_thread_2 acquired the lock :: PSYoungGen      total 112448K, used 17376K [0x00007f7d13760000, 0x00007f7d1b4e0000, 0x00007f7d91000000)
  eden space 96384K, 18% used [0x00007f7d13760000,0x00007f7d148581c0,0x00007f7d19580000)
  from space 16064K, 0% used [0x00007f7d1a530000,0x00007f7d1a530000,0x00007f7d1b4e0000)
  to   space 16064K, 0% used
 [0x00007f7d19580000,0x00007f7d19580000,0x00007f7d1a530000)
 PSOldGen        total 257152K, used 0K [0x00007f7c18600000, 0x00007f7c28120000, 0x00007f7d13760000)
  object space 257152K, 0% used [0x00007f7c18600000,0x00007f7c18600000,0x00007f7c28120000)
 PSPermGen       total 21248K, used 7359K [0x00007f7c0de00000, 0x00007f7c0f2c0000, 0x00007f7c18600000)
  object space 21248K, 34% used [0x00007f7c0de00000,0x00007f7c0e52fe38,0x00007f7c0f2c0000)
4

1 に答える 1

1

(コメントからコピー)

この行動は先週末から始まりましたか? これは、多くの人が抱えていたうるう秒の問題と関係があるのでしょうか? (症状: Java プロセスの CPU 使用率が非常に高く、スレッドのスリープに通常よりも長い時間がかかる可能性があります)


これが実際に問題である場合、1 つの解決策は単にシステムを再起動することです。別の解決策は、次を実行することです。

date -s "`date`"

この記事で示唆されているように。

于 2012-07-05T10:54:44.157 に答える