この質問は、ThreadLocal の使用方法に関するものではありません。私の質問は、ThreadLocal コントラクトを破る ForkJoinTask.compute() の ForkJoinPool 継続の副作用についてです。
でForkJoinTask.compute()
、任意の静的 ThreadLocal をプルします。
値は任意のステートフル オブジェクトですが、compute()
呼び出しの終了後はステートフルではありません。つまり、スレッドローカルのオブジェクト/状態を準備し、それを使用してから破棄します。
原則として、その状態を ForkJoinTasK に入れますが、このスレッド ローカル値がサード パーティのライブラリにあると仮定して、変更することはできません。したがって、静的スレッドローカルは、すべてのタスク インスタンスが共有するリソースであるためです。
もちろん、単純な ThreadLocal は一度だけ初期化されることを予想し、テストし、証明しました。これは、ForkJoinTask.join()
呼び出しの下でのスレッドの継続により、compute()
メソッドが終了する前に再度呼び出される可能性があることを意味します。これにより、以前の計算呼び出しで使用されていたオブジェクトの状態が明らかになり、多くのスタックフレームが高くなります。
その望ましくない露出の問題をどのように解決しますか?
私が現在見ている唯一の方法は、呼び出しごとに新しいスレッドを確保することですcompute()
が、それは F/J プールの継続を無効にし、危険なほどスレッド数を爆発させる可能性があります。
最初の ForkJoinTask 以降に変更された TL をバックアップし、すべての task.compute がスレッドで最初に実行されるかのようにスレッドローカル マップ全体を元に戻すために、JRE コアで何かすることはありませんか?
ありがとう。
package jdk8tests;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicInteger;
public class TestForkJoin3 {
static AtomicInteger nextId = new AtomicInteger();
static long T0 = System.currentTimeMillis();
static int NTHREADS = 5;
static final ThreadLocal<StringBuilder> myTL = ThreadLocal.withInitial( () -> new StringBuilder());
static void log(Object msg) {
System.out.format("%09.3f %-10s %s%n", new Double(0.001*(System.currentTimeMillis()-T0)), Thread.currentThread().getName(), " : "+msg);
}
public static void main(String[] args) throws Exception {
ForkJoinPool p = new ForkJoinPool(
NTHREADS,
pool -> {
int id = nextId.incrementAndGet(); //count new threads
log("new FJ thread "+ id);
ForkJoinWorkerThread t = new ForkJoinWorkerThread(pool) {/**/};
t.setName("My FJThread "+id);
return t;
},
Thread.getDefaultUncaughtExceptionHandler(),
false
);
LowercasingTask t = new LowercasingTask("ROOT", 3);
p.invoke(t);
int nt = nextId.get();
log("number of threads was "+nt);
if(nt > NTHREADS)
log(">>>>>>> more threads than prescribed <<<<<<<<");
}
//=====================
static class LowercasingTask extends RecursiveTask<String> {
String name;
int level;
public LowercasingTask(String name, int level) {
this.name = name;
this.level = level;
}
@Override
protected String compute() {
StringBuilder sbtl = myTL.get();
String initialValue = sbtl.toString();
if(!initialValue.equals(""))
log("!!!!!! BROKEN ON START!!!!!!! value = "+ initialValue);
sbtl.append(":START");
if(level>0) {
log(name+": compute level "+level);
try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}
List<LowercasingTask> tasks = new ArrayList<>();
for(int i=1; i<=9; i++) {
LowercasingTask lt = new LowercasingTask(name+"."+i, level-1);
tasks.add(lt);
lt.fork();
}
for(int i=0; i<tasks.size(); i++) { //this can lead to compensation threads due to l1.join() method running lifo task lN
//for(int i=tasks.size()-1; i>=0; i--) { //this usually has the lN.join() method running task lN, without compensation threads.
tasks.get(i).join();
}
log(name+": returning from joins");
}
sbtl.append(":END");
String val = sbtl.toString();
if(!val.equals(":START:END"))
log("!!!!!! BROKEN AT END !!!!!!! value = "+val);
sbtl.setLength(0);
return "done";
}
}
}