具体的には、私が探している動作は次のとおりです。読み取り操作は同時に発生し、保留中のすべての書き込み操作が終了すると実行されます。書き込み操作は、他のすべての読み取り/書き込み操作が完了するまで常に待機します。クローズ操作は、他のすべての読み取り/書き込み操作が完了するまで常に待機します。つまり、これらの操作はキューに入れられる必要があります。
NIO FileLocksの公式ドキュメントでは、この動作は指定されていません。実際、次のように述べています。
ファイルロックは、Java仮想マシン全体に代わって保持されます。同じ仮想マシン内の複数のスレッドによるファイルへのアクセスを制御するのには適していません。
新しいI/Oリクエストを送信する前に、すべてのリクエストを手動でクエリし、すべての未処理のFutureでget()を呼び出すというアイデアを試しましたが、これが良いアイデアであるかどうかはわかりません。
どうすればこの動作を実現できますか?
編集:fgeの洞察のおかげで、私は自分の問題の基本的な解決策を見つけることができました:
import java.nio.file.Path;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ChannelAccessFactory {
public static final ExecutorService IO_THREADS = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private final Path file;
private final ReadWriteLock lock;
public ChannelAccessFactory (Path file){
this.file = file;
this.lock = new ReentrantReadWriteLock();
}
public ReadWriteLock getLock(){
return lock;
}
public ChannelAccess newAccess() throws Exception{
return new ChannelAccess(file, lock);
}
}
ラップされたチャネルクラス:
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReadWriteLock;
public class ChannelAccess implements AutoCloseable{
private final ReadWriteLock lock;
private final AsynchronousFileChannel channel;
protected ChannelAccess (Path file, ReadWriteLock lock) throws Exception{
this.lock = lock;
this.channel = AsynchronousFileChannel.open(file, StandardOpenOption.READ, StandardOpenOption.WRITE);
}
public Future<Integer> read(final ByteBuffer buffer, final long position){
return ChannelAccessFactory.IO_THREADS.submit(new Callable<Integer>(){
@Override
public Integer call() throws InterruptedException, ExecutionException{
lock.readLock().lock();
try{
return channel.read(buffer, position).get();
}
finally {
lock.readLock().unlock();
}
}
});
}
public Future<Integer> write(final ByteBuffer buffer, final long position){
return ChannelAccessFactory.IO_THREADS.submit(new Callable<Integer>(){
@Override
public Integer call() throws InterruptedException, ExecutionException {
lock.writeLock().lock();
try{
return channel.write(buffer, position).get();
}
finally {
lock.writeLock().unlock();
}
}
});
}
public long size() throws Exception{
lock.readLock().lock();
try{
return channel.size();
}
finally{
lock.readLock().unlock();
}
}
@Override
public void close() {
lock.readLock().lock();
try{
channel.close();
}
catch (IOException e){}
finally{
lock.readLock().unlock();
}
}
}