クライアントに多数の電子メールを送信するアプリケーションを修正しようとしています。送信はマルチスレッドの Java アプリ (Producer-Consumer モデル) によって行われ、Producer はデータベースからメッセージのリストを呼び出し、Consumer は Python スクリプトを呼び出してメールを送信します。
ある日から別の日に、私の上司は私に、プログラムが動かなくなったと言いました。 .. そして今では、以前の 1000 件の数ではなく、1 時間あたり 2 ~ 3 件のメッセージしか送信しません。
開発者は利用できなくなったので、自分で修正する必要があります。
データベースをクリアしました。データベースには「過去」に関するデータしか含まれておらず、不要であり、巨大でした... 6GB のデータ、7M 行、非常に低速でした。しかし、問題は残りました。
今、私は Java アプリで画面にログオンしています。次の行が表示されます。
try
{
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" is sleeping for "+consumerSleepTime+" ms ::");
Thread.sleep(consumerSleepTime);
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" sleeptime over in try block ::");
}
catch (InterruptedException e)
{
e.printStackTrace();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" was interrupted ::");
Thread.currentThread().interrupt();
break main_while_loop;
}
眠りに「とどまる」と、私は見る:
11:24:15:: consumer_thread_1 は 100 ミリ秒スリープしています ::
他に何も起こりません。「スリープタイムオーバー」もスタックデータもありません。スレッドのスリープに永遠にかかるように見えます...ただし、プロセスを実行し続けると、ランダムな時間が数回経過した後、スレッドは継続し、再びスリープします...何時間も。
何か案は?
ここにいくつかのファイルがあります。全体の構造を理解するために必要かもしれません...
runjava
#!/bin/sh
java -cp /usr/share/java/mysql-connector-java.jar:. newthread
newthread.java
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.sql.SQLException;
import java.sql.Driver;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
import java.io.Closeable;
import java.io.IOException;
class ReentrantLockTest
{
int maxSize = 100;
ArrayList boundedBuffer = new ArrayList( maxSize );
ReentrantLock lock = new ReentrantLock(false);
Condition telire_var = lock.newCondition(),
uresre_var = lock.newCondition();
volatile boolean shutdown = false;
long producerSleepTime = 5000;
long consumerSleepTime = 100;
long mainDriverWaitTime = 1000;
int totalProduerThreads = 1;
int totalConsumerThreads = 2;
long criticalSectionDelay = 0;
int currentIndex = -1;
String sqlhost = "localhost";
String sqluser = "user";
String sqlpassword = "pass";
String sqldb = "db";
String mysqlUrl = "";
public ReentrantLockTest()
{
System.out.println(sqluser);
java.text.DateFormat shortTime = java.text.DateFormat.getTimeInstance(java.text.DateFormat.MEDIUM);
ArrayList<Thread> threads = new ArrayList<Thread>();
ThreadGroup tg = new ThreadGroup("ReentrantLockTest Thread Group");
// get sql
mysqlUrl = "jdbc:mysql://"+ sqlhost +"/"+ sqldb +"?autoReconnect=true";
try
{
Class.forName("com.mysql.jdbc.Driver").newInstance();
}
catch (Exception e)
{
System.out.println(e);
return;
}
for (int i = 1; i<=totalProduerThreads; i++)
{
threads.add( new Thread( tg , new ReentrantLockTest.Producer() , "producer_thread_"+i) );
}
for (int i = 1; i<=totalConsumerThreads; i++)
{
threads.add( new Thread( tg , new ReentrantLockTest.Consumer() , "consumer_thread_"+i) );
}
for (Thread t: threads)
t.start();
try
{
while (true)
{
for (Thread t: threads)
t.resume();
Thread.sleep(mainDriverWaitTime);
}
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
shutdown = true;
System.out.println(shortTime.format(new java.util.Date()) + ":: ReentrantLockTest - setting shutdown to false ::");
}
System.out.println(":: ReentrantLockTest - signalling interrupt and waiting for "+tg.activeCount()+" threads to die ::");
for (Thread t: threads)
{
System.out.println(shortTime.format(new java.util.Date()) + ":: Interrupting "+t.getName()+" ::");
try
{
t.interrupt();
}
catch (Exception e)
{
e.printStackTrace();
}
}
for (Thread t: threads)
{
try
{
StringBuilder sb = new StringBuilder( t.getName() );
System.out.println(shortTime.format(new java.util.Date()) + ":: Waiting for "+sb+" to die ::");
t.join();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+sb+" is dead ::" );
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
public class Consumer implements Runnable
{
java.text.DateFormat shortTime = java.text.DateFormat.getTimeInstance(java.text.DateFormat.MEDIUM);
private void close(Closeable c)
{
if (c != null)
{
try
{
c.close();
}
catch (IOException e)
{
// ignored
}
}
}
public void run()
{
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" has been started ::");
main_while_loop: while( shutdown == false && !Thread.currentThread().isInterrupted() )
{
try
{
lock.lock();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" acquired the lock ::");
Thread.sleep(criticalSectionDelay);
if (currentIndex == -1)
{
uresre_var.signal();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" signalling waiting_on_full_buffer ::");
}
while(currentIndex == -1)
{
System.out.println(shortTime.format(new java.util.Date()) + ":: buffer is empty - "+Thread.currentThread().getName()+" is going to wait ::");
telire_var.await();
}//while condition waiting_on_empty_buffer
// sendmail
String mail_id = boundedBuffer.remove(currentIndex).toString();
currentIndex--;
Process proc = null;
System.out.println(shortTime.format(new java.util.Date()) + ":: buffer size: "+ currentIndex);
lock.unlock();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" released the lock ::");
try
{
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName() +" starts python ("+ mail_id +")");
//this randomwaiting on sending is not needed
/*Random generator = new Random();
int randomWait = generator.nextInt(1000) + 100;
Thread.sleep(randomWait);
System.out.println(" => "+ randomWait);*/
proc = Runtime.getRuntime().exec("python ./new_mail.py "+ mail_id);
proc.waitFor();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName() +" ends python");
}
catch (Exception e)
{
e.printStackTrace();
}
finally
{
if (proc != null)
{
close(proc.getOutputStream());
close(proc.getInputStream());
close(proc.getErrorStream());
proc.destroy();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName() +" close python proc");
}
}
}
catch (InterruptedException e)
{
e.printStackTrace();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" was interrupted ::");
Thread.currentThread().interrupt();
break main_while_loop;
}
try
{
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" is sleeping for "+consumerSleepTime+" ms ::");
Thread.sleep(consumerSleepTime);
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" sleeptime over in try block ::");
}
catch (InterruptedException e)
{
e.printStackTrace();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" was interrupted ::");
Thread.currentThread().interrupt();
break main_while_loop;
}
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" sleeptime over ::");
}//end while: main_while_loop
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" has been shutdown ::");
}//end run
}
public class Producer implements Runnable
{
java.text.DateFormat shortTime = java.text.DateFormat.getTimeInstance(java.text.DateFormat.MEDIUM);
public void run()
{
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" has been started ::");
main_while_loop: while( shutdown == false && !Thread.currentThread().isInterrupted() )
{
try
{
lock.lock();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" acquired the lock ::");
while (currentIndex != -1)
{
System.out.println(shortTime.format(new java.util.Date()) + ":: a bufferben van valamennyi - "+Thread.currentThread().getName()+" varakozik ::");
uresre_var.await();
}//while condition waiting_on_full_buffer
Thread.sleep(criticalSectionDelay);
Connection con = null;
Statement st = null;
ResultSet rs = null;
try
{
con = DriverManager.getConnection(mysqlUrl, sqluser, sqlpassword);
st = con.createStatement();
rs = st.executeQuery("SELECT mail_id FROM mail_queue WHERE status = 0 OR (status BETWEEN 2 AND 100 AND proof_counter <= 3 AND UNIX_TIMESTAMP() - proof_stamp > 15*60 ) ORDER BY status, mail_id DESC LIMIT "+ maxSize);
boundedBuffer = new ArrayList(maxSize);
currentIndex = -1;
while(rs.next())
{
String mail_id = Integer.toString(rs.getInt(1));
boundedBuffer.add(mail_id);
currentIndex++;
}
con.close();
}
catch (Exception err)
{
System.out.println(err);
break;
}
telire_var.signal();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" signalling waiting_on_empty_buffer ::");
}
catch (InterruptedException e)
{
e.printStackTrace();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" was interrupted ::");
Thread.currentThread().interrupt();
break main_while_loop;
}
finally
{
lock.unlock();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" released the lock ::");
}
try
{
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" is sleeping for "+producerSleepTime+" ms ::");
Thread.sleep(producerSleepTime);
}
catch (InterruptedException e)
{
e.printStackTrace();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" was interrupted ::");
Thread.currentThread().interrupt();
break main_while_loop;
}
}
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" has been shutdown ::");
}
}
}
class newthread
{
public static void main(String[] args)
{
new ReentrantLockTest();
}
}
どんな助けでも大歓迎です!!
編集:
スレッド ダンプは次のとおりです。
2012-07-05 12:15:06
Full thread dump OpenJDK 64-Bit Server VM (14.0-b16 mixed mode):
"SIGHUP handler" daemon prio=10 tid=0x00007f7bb0001000 nid=0x838 runnable [0x00007f7beb205000]
java.lang.Thread.State: RUNNABLE
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:636)
"consumer_thread_2" prio=10 tid=0x00007f7d940f4800 nid=0x68c runnable [0x00007f7beb51b000]
java.lang.Thread.State: RUNNABLE
at java.io.PrintStream.write(PrintStream.java:446)
- locked <0x00007f7d137703b0> (a java.io.PrintStream)
at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:220)
at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:290)
at sun.nio.cs.StreamEncoder.flushBuffer(StreamEncoder.java:103)
- locked <0x00007f7d137704a8> (a java.io.OutputStreamWriter)
at java.io.OutputStreamWriter.flushBuffer(OutputStreamWriter.java:185)
at java.io.PrintStream.write(PrintStream.java:494)
- locked <0x00007f7d137703b0> (a java.io.PrintStream)
at java.io.PrintStream.print(PrintStream.java:636)
at java.io.PrintStream.println(PrintStream.java:773)
- locked <0x00007f7d137703b0> (a java.io.PrintStream)
at ReentrantLockTest$Consumer.run(newthread.java:151)
at java.lang.Thread.run(Thread.java:636)
"consumer_thread_1" prio=10 tid=0x00007f7d940f4000 nid=0x68b waiting on condition [0x00007f7beb61c000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00007f7d137acee0> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:871)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1201)
at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290)
at ReentrantLockTest$Consumer.run(newthread.java:150)
at java.lang.Thread.run(Thread.java:636)
"producer_thread_1" prio=10 tid=0x00007f7d940e5000 nid=0x68a waiting on condition [0x00007f7beb71d000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00007f7d137acf38> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at ReentrantLockTest$Producer.run(newthread.java:250)
at java.lang.Thread.run(Thread.java:636)
"Low Memory Detector" daemon prio=10 tid=0x00007f7d940af800 nid=0x688 runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"CompilerThread1" daemon prio=10 tid=0x00007f7d940ac800 nid=0x687 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"CompilerThread0" daemon prio=10 tid=0x00007f7d940aa800 nid=0x686 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Signal Dispatcher" daemon prio=10 tid=0x00007f7d940a8800 nid=0x685 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Finalizer" daemon prio=10 tid=0x00007f7d94087800 nid=0x683 in Object.wait() [0x00007f7bebd36000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00007f7d13761210> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:133)
- locked <0x00007f7d13761210> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:149)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:177)
"Reference Handler" daemon prio=10 tid=0x00007f7d94085800 nid=0x682 in Object.wait() [0x00007f7bebe37000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00007f7d13761078> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:133)
- locked <0x00007f7d13761078> (a java.lang.ref.Reference$Lock)
"main" prio=10 tid=0x00007f7d94007000 nid=0x673 sleeping[0x00007f7d99285000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at ReentrantLockTest.<init>(newthread.java:81)
at newthread.main(newthread.java:320)
"VM Thread" prio=10 tid=0x00007f7d94080800 nid=0x681 runnable
"GC task thread#0 (ParallelGC)" prio=10 tid=0x00007f7d94011800 nid=0x674 runnable
"GC task thread#1 (ParallelGC)" prio=10 tid=0x00007f7d94013000 nid=0x675 runnable
"GC task thread#2 (ParallelGC)" prio=10 tid=0x00007f7d94015000 nid=0x676 runnable
"GC task thread#3 (ParallelGC)" prio=10 tid=0x00007f7d94017000 nid=0x677 runnable
"GC task thread#4 (ParallelGC)" prio=10 tid=0x00007f7d94018800 nid=0x678 runnable
"GC task thread#5 (ParallelGC)" prio=10 tid=0x00007f7d9401a800 nid=0x679 runnable
"GC task thread#6 (ParallelGC)" prio=10 tid=0x00007f7d9401c800 nid=0x67a runnable
"GC task thread#7 (ParallelGC)" prio=10 tid=0x00007f7d9401e000 nid=0x67b runnable
"GC task thread#8 (ParallelGC)" prio=10 tid=0x00007f7d94020000 nid=0x67c runnable
"GC task thread#9 (ParallelGC)" prio=10 tid=0x00007f7d94022000 nid=0x67d runnable
"GC task thread#10 (ParallelGC)" prio=10 tid=0x00007f7d94023800 nid=0x67e runnable
"GC task thread#11 (ParallelGC)" prio=10 tid=0x00007f7d94025800 nid=0x67f runnable
"GC task thread#12 (ParallelGC)" prio=10 tid=0x00007f7d94027800 nid=0x680 runnable
"VM Periodic Task Thread" prio=10 tid=0x00007f7d940b2000 nid=0x689 waiting on condition
JNI global references: 1000
Heap
12:15:06:: consumer_thread_2 acquired the lock :: PSYoungGen total 112448K, used 17376K [0x00007f7d13760000, 0x00007f7d1b4e0000, 0x00007f7d91000000)
eden space 96384K, 18% used [0x00007f7d13760000,0x00007f7d148581c0,0x00007f7d19580000)
from space 16064K, 0% used [0x00007f7d1a530000,0x00007f7d1a530000,0x00007f7d1b4e0000)
to space 16064K, 0% used
[0x00007f7d19580000,0x00007f7d19580000,0x00007f7d1a530000)
PSOldGen total 257152K, used 0K [0x00007f7c18600000, 0x00007f7c28120000, 0x00007f7d13760000)
object space 257152K, 0% used [0x00007f7c18600000,0x00007f7c18600000,0x00007f7c28120000)
PSPermGen total 21248K, used 7359K [0x00007f7c0de00000, 0x00007f7c0f2c0000, 0x00007f7c18600000)
object space 21248K, 34% used [0x00007f7c0de00000,0x00007f7c0e52fe38,0x00007f7c0f2c0000)