6

Play を使用してアプリをデプロイする予定で、「ジョブ」を使用したことがありません。私のデプロイは、さまざまな Play サーバーの負荷分散を必要とするほど大きくなりますが、私の計算は、hadoop/storm/others を必要とするほど大きくはありません。

私の質問は、Play でこのシナリオをどのように処理するのですか? Play でジョブを毎分実行するように設定した場合、すべてのサーバーがまったく同じことを同時に実行することは望ましくありません。

この答えしか見つかりませんでしたが、これらのオプションはどれも好きではありません。

では、ジョブを調整するためのツールやベスト プラクティスはありますか?

4

3 に答える 3

0

データベースのテーブルを使用してジョブロックを保存できますが、このロックを別のトランザクションでチェック/更新する必要があります (これには JPA.newEntityManager を使用する必要があります)。

私の JobLock クラスは LockMode 列挙型を使用します

package enums;

public enum LockMode {
    FREE, ACQUIRED;
}

ここにJobLockクラスがあります

package models;

import java.util.Date;
import java.util.List;

import javax.persistence.Entity;
import javax.persistence.EntityManager;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.Version;

import play.Logger;
import play.Play;
import play.data.validation.Required;
import play.db.jpa.JPA;
import play.db.jpa.Model;
import utils.Parser;
import enums.LockMode;
import exceptions.ServiceException;

/**
 * Technical class that allows to manage a lock in the database thus we can
 * synchronize multiple instances that thus cannot run the same job at the same
 * time
 * 
 * @author sebastien
 */
@Entity
public class JobLock extends Model {

    private static final Long MAX_ACQUISITION_DELAY = Parser.parseLong(Play.configuration.getProperty(
            "job.lock.acquisitiondelay", "10000"));

    @Required
    public String jobName;

    public Date acquisitionDate;

    @Required
    @Enumerated(EnumType.STRING)
    public LockMode lockMode;

    @Version
    public int version;

    // STATIC METHODS
    // ~~~~~~~~~~~~~~~

    /**
     * Acquire the lock for the type of job identified by the name parameter.
     * Acquisition of the lock is done on a separate transaction thus is
     * transaction is as small as possible and other instances will see the lock
     * acquisition sooner.
     * <p>
     * If we do not do that, the other instances will be blocked until the
     * instance that acquired the lock have finished is businees transaction
     * which could be long on a job.
     * </p>
     * 
     * @param name
     *            the name that identifies a job category, usually it is the job
     *            simple class name
     * @return the lock object if the acquisition is successfull, null otherwise
     */
    public static JobLock acquireLock(String name) {
        EntityManager em = JPA.newEntityManager();
        try {
            em.getTransaction().begin();
            List<JobLock> locks = em.createQuery("from JobLock where jobName=:name", JobLock.class)
                    .setParameter("name", name).setMaxResults(1).getResultList();
            JobLock lock = locks != null && !locks.isEmpty() ? locks.get(0) : null;
            if (lock == null) {
                lock = new JobLock();
                lock.jobName = name;
                lock.acquisitionDate = new Date();
                lock.lockMode = LockMode.ACQUIRED;
                em.persist(lock);
            } else {
                if (LockMode.ACQUIRED.equals(lock.lockMode)) {
                    if ((System.currentTimeMillis() - lock.acquisitionDate.getTime()) > MAX_ACQUISITION_DELAY) {
                        throw new ServiceException(String.format(
                                "Lock is held for too much time : there is a problem with job %s", name));
                    }
                    return null;
                }
                lock.lockMode = LockMode.ACQUIRED;
                lock.acquisitionDate = new Date();
                lock.willBeSaved = true;
            }
            em.flush();
            em.getTransaction().commit();
            return lock;
        } catch (Exception e) {
            // Do not log exception here because it is normal to have exception
            // in case of multi-node installation, this is the way to avoid
            // multiple job execution
            if (em.getTransaction().isActive()) {
                em.getTransaction().rollback();
            }
            // Maybe we have to inverse the test and to define which exception
            // is not problematic : exception that denotes concurrency in the
            // database are normal
            if (e instanceof ServiceException) {
                throw (ServiceException) e;
            } else {
                return null;
            }
        } finally {
            if (em.isOpen()) {
                em.close();
            }
        }
    }

    /**
     * Release the lock on the database thus another instance can take it. This
     * action change the {@link #lockMode} and set {@link #acquisitionDate} to
     * null. This is done in a separate transaction that can have visibility on
     * what happens on the database during the time of the business transaction
     * 
     * @param lock
     *            the lock to release
     * @return true if we managed to relase the lock and false otherwise
     */
    public static boolean releaseLock(JobLock lock) {
        EntityManager em = JPA.newEntityManager();

        if (lock == null || LockMode.FREE.equals(lock.lockMode)) {
            return false;
        }

        try {
            em.getTransaction().begin();
            lock = em.find(JobLock.class, lock.id);
            lock.lockMode = LockMode.FREE;
            lock.acquisitionDate = null;
            lock.willBeSaved = true;
            em.persist(lock);
            em.flush();
            em.getTransaction().commit();
            return true;
        } catch (Exception e) {
            if (em.getTransaction().isActive()) {
                em.getTransaction().rollback();
            }
            Logger.error(e, "Error during commit of lock release");
            return false;
        } finally {
            if (em.isOpen()) {
                em.close();
            }
        }
    }
}

そして、これがこのロックを使用する私の LockAwareJob です

package jobs;

import models.JobLock;
import notifiers.ExceptionMails;
import play.Logger;
import play.jobs.Job;

public abstract class LockAwareJob<V> extends Job<V> {

    @Override
    public final void doJob() throws Exception {
        String name = this.getClass().getSimpleName();
        try {
            JobLock lock = JobLock.acquireLock(name);
            if (lock != null) {
                Logger.info("Starting %s", name);
                try {
                    doJobWithLock(lock);
                } finally {
                    if (!JobLock.releaseLock(lock)) {
                        Logger.error("Lock acquired but cannot be released for %s", name);
                    }
                    Logger.info("End of %s", name);
                }
            } else {
                Logger.info("Another node is running %s : nothing to do", name);
            }
        } catch (Exception ex) {
            ExceptionMails.exception(ex, String.format("Error while executing job %s", name));
            throw ex;
        }
    }

    @Override
    public final V doJobWithResult() throws Exception {
        String name = this.getClass().getSimpleName();
        try {
            JobLock lock = JobLock.acquireLock(name);
            if (lock != null) {
                Logger.info("Starting %s", name);
                try {
                    return resultWithLock(lock);
                } finally {
                    if (!JobLock.releaseLock(lock)) {
                        Logger.error("Lock acquired but cannot be released for %s", name);
                    }
                    Logger.info("End of %s", name);
                }
            } else {
                Logger.info("Another node is running %s : nothing to do", name);
                return resultWithoutLock();
            }
        } catch (Exception ex) {
            ExceptionMails.exception(ex, String.format("Error while executing job %s", name));
            throw ex;
        }
    }

    public void doJobWithLock(JobLock lock) throws Exception {
    }

    public V resultWithLock(JobLock lock) throws Exception {
        doJobWithLock(lock);
        return null;
    }

    public V resultWithoutLock() throws Exception {
        return null;
    }
}

インスタンスがジョブ ロックの取得に失敗するたびにエラーが発生しないように、log4j.properties に特別な行を追加します。

log4j.logger.org.hibernate.event.def.AbstractFlushingEventListener=FATAL

このソリューションでは、JobLock id を使用して、このジョブに関連付けられたパラメーター (たとえば、最終実行日) を保存することもできます。

于 2012-12-10T06:52:58.277 に答える
0

ここで説明されているように、データベース フラグを使用できます: Pere Villega による 2 つのジョブに対するPlayframework 同時ジョブ管理

しかし、Memcache を使用する Google グループの Guillaume Bort のソリューションが最適だと思います。Play 2 用のモジュールがあるようです: https://github.com/mumoshu/play2-memcached

于 2012-12-07T15:52:28.033 に答える
0

個人的には、簡単にするためにジョブを実行する 1 つのインスタンスを使用します。または、実行をより細かく制御し、同時実行性と並列処理を向上させたい場合は、ジョブの代わりに Akka を使用することもできます。

于 2012-12-08T07:39:23.260 に答える