3

RentrantLockを使用してエンティティの特定のインスタンス(キーで識別される)を変更する非同期の試行をブロックすることにより、Javaアプリケーションで並行性を強制するクラスを実装しようとしています。目標は、前のスレッドが完了するまで、オブジェクトの特定のインスタンスを変更する複数の同時試行をブロック/キューに入れることです。クラスはこれを一般的な方法で実装し、コードの任意のブロックがロックを取得して解放できるようにします(RentrantLockセマンティクスと同じ)。オブジェクトの同じインスタンスを変更しようとするスレッドのみをブロックするユーティリティが追加されます(識別されたとおり)。キーによって)コードのブロックに入るすべてのスレッドをブロックするのではなく。

このクラスは、エンティティの1つのインスタンスに対してのみコードのブロックを同期できるようにするための単純な構造を提供します。たとえば、ID 33のユーザーからのすべてのスレッドに対してコードのブロックを同期させたいが、他のユーザーのスレッドをスレッドサービスユーザー33によってブロックしたくない場合です。

クラスは次のように実装されます

public class EntitySynchronizer {
  private static final int DEFAULT_MAXIMUM_LOCK_DURATION_SECONDS = 300; // 5 minutes
  private Object mutex = new Object();
  private ConcurrentHashMap<Object, ReentrantLock> locks = new ConcurrentHashMap<Object, ReentrantLock>();
  private static ThreadLocal<Object> keyThreadLocal = new ThreadLocal<Object>();
  private int maximumLockDurationSeconds;
  public EntitySynchronizer() {
    this(DEFAULT_MAXIMUM_LOCK_DURATION_SECONDS);
  }
  public EntitySynchronizer(int maximumLockDurationSeconds) {
    this.maximumLockDurationSeconds = maximumLockDurationSeconds;
  }
  /**
   * Initiate a lock for all threads with this key value
   * @param key the instance identifier for concurrency synchronization
   */
  public void lock(Object key) {
    if (key == null) {
      return;
    }
    /*
     * returns the existing lock for specified key, or null if there was no existing lock for the
     * key
     */
    ReentrantLock lock;
    synchronized (mutex) {
      lock = locks.putIfAbsent(key, new ReentrantLock(true));
      if (lock == null) {
        lock = locks.get(key);
      }
    }
    /*
     * Acquires the lock and returns immediately with the value true if it is not held by another
     * thread within the given waiting time and the current thread has not been interrupted. If this
     * lock has been set to use a fair ordering policy then an available lock will NOT be acquired
     * if any other threads are waiting for the lock. If the current thread already holds this lock
     * then the hold count is incremented by one and the method returns true. If the lock is held by
     * another thread then the current thread becomes disabled for thread scheduling purposes and
     * lies dormant until one of three things happens: - The lock is acquired by the current thread;
     * or - Some other thread interrupts the current thread; or - The specified waiting time elapses
     */
    try {
      /*
       * using tryLock(timeout) instead of lock() to prevent deadlock situation in case acquired
       * lock is not released normalRelease will be false if the lock was released because the
       * timeout expired
       */
      boolean normalRelease = lock.tryLock(maximumLockDurationSeconds, TimeUnit.SECONDS);
      /*
       * lock was release because timeout expired. We do not want to proceed, we should throw a
       * concurrency exception for waiting thread
       */
      if (!normalRelease) {
        throw new ConcurrentModificationException(
            "Entity synchronization concurrency lock timeout expired for item key: " + key);
      }
    } catch (InterruptedException e) {
      throw new IllegalStateException("Entity synchronization interrupted exception for item key: "
          + key);
    }
    keyThreadLocal.set(key);
  }
  /**
   * Unlock this thread's lock. This takes care of preserving the lock for any waiting threads with
   * the same entity key
   */
  public void unlock() {
    Object key = keyThreadLocal.get();
    keyThreadLocal.remove();
    if (key != null) {
      ReentrantLock lock = locks.get(key);
      if (lock != null) {
        try {
          synchronized (mutex) {
            if (!lock.hasQueuedThreads()) {
              locks.remove(key);
            }
          }
        } finally {
          lock.unlock();
        }
      } else {
        synchronized (mutex) {
          locks.remove(key);
        }
      }
    }
  }
}

このクラスは次のように使用されます。

private EntitySynchronizer entitySynchronizer = new EntitySynchronizer();
entitySynchronizer.lock(userId);  // 'user' is the entity by which i want to differentiate my synchronization
try {
  //execute my code here ...
} finally {
  entitySynchronizer.unlock();
}

問題は、それが完全に機能しないことです。同時実行性の負荷が非常に高い場合でも、同じキーを持つ複数のスレッドが同期されていない場合があります。私はかなり徹底的にテストしましたが、なぜ/どこでこれが発生するのか理解できません。

並行性の専門家はいますか?

4

3 に答える 3

4

修正する必要があることの1つはこれです:

ReentrantLock lock;
synchronized (mutex) {
  lock = locks.putIfAbsent(key, new ReentrantLock(true));
  if (lock == null) {
    lock = locks.get(key);
  }
}

これは、並行マップの全体のポイントを見逃します。なぜあなたはそれをこのように書かなかったのですか?

ReentrantLock lock = new ReentrantLock(true);
final ReentrantLock oldLock = locks.putIfAbsent(key, lock);
lock = oldLock != null? oldLock : lock;
于 2012-05-03T08:43:33.403 に答える
2

あなたのソリューションは原子性の欠如に悩まされています。次のシナリオを検討してください。

  • スレッドAが入りlock()、マップから既存のロックを取得します。
  • スレッドBはunlock()同じキーを入力し、ロックを解除してマップからロックを解除します(スレッドAはまだ呼び出していないtryLock()ため)。
  • スレッドAは正常に。を呼び出しますtryLock()

考えられるオプションの1つは、マップから「チェックアウト」されたロックの数を追跡することです。

public class EntitySynchronizer {
    private Map<Object, Token> tokens = new HashMap<Object, Token>();
    private ThreadLocal<Token> currentToken = new ThreadLocal<Token>();
    private Object mutex = new Object();

    ...

    public void lock(Object key) throws InterruptedException {
        Token token = checkOut(key);
        boolean locked = false;
        try {
            locked = token.lock.tryLock(maximumLockDurationSeconds, TimeUnit.SECONDS));
            if (locked) currentToken.set(token);
        } finally {
            if (!locked) checkIn(token);
        }
    }

    public void unlock() {
        Token token = currentToken.get();
        if (token != null) {
            token.lock.unlock();
            checkIn(token);
            currentToken.remove();
        }
    }

    private Token checkOut(Object key) {
        synchronized (mutex) {
            Token token = tokens.get(key);
            if (token != null) {
                token.checkedOutCount++;
            } else {
                token = new Token(key); 
                tokens.put(key, token);
            }
            return token;
        }
    }

    private void checkIn(Token token) {
        synchronized (mutex) {
            token.checkedOutCount--;
            if (token.checkedOutCount == 0)
                tokens.remove(token.key);
        }
    }

    private class Token {
        public final Object key;
        public final ReentrantLock lock = new ReentrantLock();
        public int checkedOutCount = 1;

        public Token(Object key) {
            this.key = key;
        }
    }
}

とにかくそのメソッドは同期ブロックで呼び出されるため、そうであるtokens必要はないことに注意してください。ConcurentHashMap

于 2012-05-03T09:15:42.220 に答える
1

次のように、クラスを実際に使用していないと想定しています。

private EntitySynchronizer entitySynchronizer = new EntitySynchronizer();
entitySynchronizer.lock(userId);  // 'user' is the entity by which i want to differentiate my synchronization
try {
  //execute my code here ...
} finally {
  entitySynchronizer.unlock();
}

しかし、EntitySynchronizerのシングルトンインスタンスがありますか?そうでなければ、それがあなたの問題だからです。

于 2012-05-03T09:04:36.570 に答える