0

dspace 1.8.x での作業 適切なコンシューマを登録するなど、コンシューマ ポーデューサのメカニズムを理解しようとしています。

しかし、クラス BasicDispatcher で奇妙なことが起こっています。

理由がわからないループで消費者の実行を繰り返します。

ここで問題のある行とコードは以下にあります

while (!vnts.isEmpty() && dispatchLoop < ConfigurationManager.getIntProperty("maxDispatchLoops", 3))

すぐ上に。

/**
 * Dispatch all events added to this Context according to configured
 * consumers.
 * 
 * @param ctx
 *            the execution context
 */
public void dispatch(Context ctx)
{
    if (!consumers.isEmpty())
    {
        List<Event> vnts = ctx.getEvents();
        int dispatchLoop=0;

        while (!vnts.isEmpty() && dispatchLoop < ConfigurationManager.getIntProperty("maxDispatchLoops", 3)) {
            List<Event> events=new LinkedList<Event>(vnts);
            log.debug(dispatchLoop + " - " + ctx.getEvents().size());
            vnts.clear();
            log.debug(ctx.getEvents().size());

            if (events == null)
            {
                return;
            }

            if (log.isDebugEnabled())
            {
                log.debug("Processing queue of "
                        + String.valueOf(events.size()) + " events.");
            }

            // transaction identifier applies to all events created in
            // this context for the current transaction. Prefix it with
            // some letters so RDF readers don't mistake it for an integer.
            String tid = "TX" + Utils.generateKey();

            for (Event event : events)
            {
                event.setDispatcher(getIdentifier());
                event.setTransactionID(tid);

                if (log.isDebugEnabled())
                {
                    log.debug("Iterating over "
                            + String.valueOf(consumers.values().size())
                            + " consumers...");
                }

                for (Iterator ci = consumers.values().iterator(); ci.hasNext();)
                {
                    ConsumerProfile cp = (ConsumerProfile) ci.next();

                    if (event.pass(cp.getFilters()))
                    {
                        if (log.isDebugEnabled())
                        {
                            log.debug("Sending event to \"" + cp.getName()
                                    + "\": " + event.toString());
                        }

                        try
                        {
                            consume(cp, event, ctx);
                        }
                        catch (Exception e)
                        {
                            log.error("Consumer(\"" + cp.getName()
                                    + "\").consume threw: " + e.toString(), e);
                        }
                    }

                }
            }

            // Call end on the consumers that got synchronous events.
            for (Iterator ci = consumers.values().iterator(); ci.hasNext();)
            {
                ConsumerProfile cp = (ConsumerProfile) ci.next();
                if (cp != null)
                {
                    if (log.isDebugEnabled())
                    {
                        log.debug("Calling end for consumer \"" + cp.getName()
                                + "\"");
                    }

                    try
                    {
                        if (cp.isAsynchronous()) {
                            if (thread == null || !thread.isAlive()) {
                                thread = new ConsumerThread();
                                thread.queue.add(new ConsumerStuff(cp));
                                thread.start();
                            } else
                              thread.queue.add(new ConsumerStuff(cp));
                        } else {
                            cp.getConsumer().end(ctx);
                        }
                    }
                    catch (Exception e)
                    {
                        log.error("Error in Consumer(\"" + cp.getName()
                                + "\").end: " + e.toString(), e);
                    }
                }
            }
            dispatchLoop++;
            if(ctx.getEvents()!=null)
            vnts = Collections.synchronizedList(ctx.getEvents());

        }
    }
}

誰かがその理由について考えを持っていますか。また、誰かが maxDispatchLoops をどのように変更できますか?発生率はどうなりますか?

コンシューマーを 1 回だけ実行することはできますか?

4

1 に答える 1