0

H2 データベースを AUTO_SERVER モードで正常に使用して、データベース ファイルがネットワーク上の多数のデスクトップ クライアント間で透過的に共有されるようにしています。このようにして、クライアントの中からサーバーが選択され、他のすべてのクライアントは tcp サーバーから読み取ります。

私が見逃しているのは、クライアントまたはサーバーが他のすべてのデスクトップクライアントにデータベースで何かが変更されたことを通知する方法です。現在、JGroups チャネルを使用してすべてのクライアントが相互に通信できるようにしていますが、これは別の障害点であり、H2 と並行して実行される別のリーダー選出アルゴリズムです。

他に方法はありませんか?一部のデータベースでサポートされている JMS (Java Message Service Java API) について読みました。H2のヒントはありますか?

ありがとう

編集

次のコードは、現在の回答の適応です。最初に送信者を起動すると (引数を「送信者」として設定)、サーバーとして H2 データベースに接続し、リモート マシンで受信者を実行します (引数を「受信者」として設定します)。クライアントとして接続します。

ただし、通知を受け取るのはサーバーだけで、クライアントは何も受け取りません。

これは、私が現在知っていることから理にかなっています。トリガーはサーバーでのみ呼び出され、クライアントまたはサーバーから呼び出されたユーザー定義関数はクライアントまたはサーバーで呼び出されますが、データベースに接続されているすべてのクライアント (およびサーバー) では呼び出されません。

接続されているすべてのインスタンスにデータベースの変更を通知するように以下を適応させる方法はありますか?

import java.io.File;
import java.sql.*;
import java.util.concurrent.atomic.AtomicLong;
import org.h2.tools.TriggerAdapter;

public class TestSimpleDB2
{

    public static void main(String[] args) throws Exception
    {
        //final String url = "jdbc:h2:mem:test;multi_threaded=true";
        final String url = "jdbc:h2:" + File.separator + "mnt/testdir/PlanIGS" + File.separator
                + "persondb;create=true;AUTO_SERVER=TRUE;multi_threaded=true";
        Connection conn = DriverManager.getConnection(url);
        Statement stat = conn.createStatement();

        boolean isSender = false;
        args = new String[]
        {
            "sender"
        };
        for (String arg : args)
        {
            if (arg.contains("receiver"))
            {
                System.out.println("receiver starting");
                isSender = false;
            }
            else if (arg.contains("sender"))
            {
                System.out.println("sender starting");
                isSender = true;
            }
        }

        if (isSender)
        {
            stat.execute("create alias wait_for_change for \""
                    + TestSimpleDB2.class.getName()
                    + ".waitForChange\"");
            stat.execute("create table test(id identity)");
            stat.execute("create trigger notifier "
                    + "before insert, update, delete, rollback "
                    + "on test call \""
                    + TestSimpleDB2.Notifier.class.getName() + "\"");

            Thread.sleep(1000);
            for (int i = 0; i < 10; i++)
            {
                System.out.println("Sender: I change something...");
                stat.execute("insert into test values(null)");
                Thread.sleep(2000);
            }
        }
        else
        {
            new Thread()
            {
                public void run()
                {
                    try
                    {
                        Connection conn = DriverManager.getConnection(url);
                        for (int i = 0; i < 10; i++)
                        {
                            conn.createStatement().execute(
                                    "call wait_for_change(100000)");
                            System.out.println("Receiver: event received");
                        }
                    }
                    catch (Exception e)
                    {
                        e.printStackTrace();
                    }
                }
            }.start();
        }
        conn.close();
    }

    static AtomicLong modCount = new AtomicLong();

    public static void waitForChange(long maxWaitMillis)
    {
        synchronized (modCount)
        {
            try
            {
                modCount.wait(maxWaitMillis);
            }
            catch (InterruptedException e)
            {
                // ignore
            }
        }
    }

    public static class Notifier extends TriggerAdapter
    {

        public void fire(Connection conn, ResultSet oldRow, ResultSet newRow)
                throws SQLException
        {
            modCount.incrementAndGet();
            synchronized (modCount)
            {
                modCount.notifyAll();
            }
        }
    }
}
4

1 に答える 1

2

H2 は JMS を実装していません (実際、JMS を実装しているデータベースは知りません)。ただし、次のように、トリガーとユーザー定義関数を使用して、H2 内に単純な通知メカニズムを構築できます。これには、まだ完全にテストされていない H2 のマルチスレッド モードが必要になることに注意してください。そのため、データに使用するデータベースとは別のデータベースをメッセージングに使用することが理にかなっている場合があります。

import java.sql.*;
import java.util.concurrent.atomic.AtomicLong;
import org.h2.tools.TriggerAdapter;

public class TestSimpleDb {

    public static void main(String[] args) throws Exception {
        final String url = "jdbc:h2:mem:test;multi_threaded=true";
        Connection conn = DriverManager.getConnection(url);
        Statement stat = conn.createStatement();
        stat.execute("create alias wait_for_change for \"" + 
                TestSimpleDb.class.getName() + 
                ".waitForChange\"");
        stat.execute("create table test(id identity)");
        stat.execute("create trigger notifier " + 
                "before insert, update, delete, rollback " +
                "on test call \"" + 
                TestSimpleDb.Notifier.class.getName() + "\"");
        new Thread() {
            public void run() {
                try {
                    Connection conn = DriverManager.getConnection(url);
                    for (int i = 0; i < 10; i++) {
                        conn.createStatement().execute(
                                "call wait_for_change(10000)");
                        System.out.println("Receiver: event received");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }.start();
        Thread.sleep(500);
        for (int i = 0; i < 10; i++) {
            System.out.println("Sender: I change something...");
            stat.execute("insert into test values(null)");
            Thread.sleep(1000);
        }
        conn.close();
    }

    static AtomicLong modCount = new AtomicLong();

    public static void waitForChange(long maxWaitMillis) {
        synchronized (modCount) {
            try {
                modCount.wait(maxWaitMillis);
            } catch (InterruptedException e) {
                // ignore
            }
        }
    }

    public static class Notifier extends TriggerAdapter {
        public void fire(Connection conn, ResultSet oldRow, ResultSet newRow)
                throws SQLException {
            modCount.incrementAndGet();
            synchronized (modCount) {
                modCount.notifyAll();
            }
        }
    }

}
于 2013-10-10T15:56:25.030 に答える