プログラムの N インスタンスがマスターを選択し、すべてのクライアントがこのマスターの IP を取得できるように、JGroups を使用してリーダー選出プロトコルを作成しようとしています。現在の実装は多かれ少なかれ、各インスタンスが lock-channel のロックを取得しようとすることに依存しており、このチャネルを正常に取得するとマスターになり、他のすべてのインスタンスはクライアントに切り替わります。
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.*;
import org.jgroups.*;
import org.jgroups.blocks.locking.LockService;
public class AutoDiscovery
{
static org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(AutoDiscovery.class); //used for logging purposes (see log4j library)
/* this variable indicates whether I have become the master or I'm just a client*/
public volatile AtomicBoolean becomeMaster = new AtomicBoolean(false);
/* The address of the server if we are a client or of ourself if we are
* server */
public String serverAddress;
/* A channel on which to acquire a lock, so that only one can become server */
private JChannel lockChannel;
/* A shared channel ffor communication between client and master*/
private JChannel communicationChannel;
private LockService lockService;
/* A thread which tries to acquire a lock */
private Thread acquiringThread;
/* A thread which listens for the server ip which may change */
private Thread listeningThread;
/* A thread which lists the status and initializes the acquiring thread*/
private Thread statusThread;
private String name;
/* If we pass from being a client to being a server we must stop the listening
* thread however we cannot call listeningThread.stop() but instead we change
* the stopListening boolean to true */
private boolean stopListening = false;
/* This lock communicates I have finally become either master or client so
* the serverAddress and becomeMaster variables are correctly set */
public final Object finishedLock = new Object();
public static void main(String[] args) throws Exception
{
Thread.currentThread().setName("MyMainThread");
Random rand = new Random();
AutoDiscovery master = new AutoDiscovery("Node" + rand.nextInt(10));
master.lockChannel = new JChannel(AutoDiscovery.class.getResource("/resource/udp.xml"));
master.lockChannel.connect("lock-channel");
master.communicationChannel = new JChannel(AutoDiscovery.class.getResource("/resource/udp.xml"));
master.communicationChannel.connect("communication-channel");
master.lockService = new LockService(master.lockChannel);
master.startStatusPrinterThread();
}
public AutoDiscovery(String name)
{
this.name = name;
}
public AutoDiscovery()
{
try
{
Thread.currentThread().setName("MyMainThread");
Random rand = new Random();
this.name = ("Node" + rand.nextInt(10));
lockChannel = new JChannel(AutoDiscovery.class.getResource("/resource/udp.xml"));
lockChannel.connect("lock-channel");
communicationChannel = new JChannel(AutoDiscovery.class.getResource("/resource/udp.xml"));
communicationChannel.connect("communication-channel");
lockService = new LockService(lockChannel);
startStatusPrinterThread();
}
catch (Exception ex)
{
Logger.getLogger(AutoDiscovery.class.getName()).log(Level.SEVERE, null, ex);
}
}
public void startAcquiringThread()
{
acquiringThread = new Thread()
{
@Override
public void run()
{
while (true)
{
//if you have become Master send your ip every now and then
if (becomeMaster.get())
{
try
{
communicationChannel.send(new Message(null, null, "serverip " + serverAddress));
}
catch (Exception ex)
{
Logger.getLogger(AutoDiscovery.class.getName()).log(Level.SEVERE, null, ex);
}
}
else
{
try
{
Thread.currentThread().setName(name + "AcquiringThread");
Lock lock = lockService.getLock("serverLock");
if (lock.tryLock(4, TimeUnit.SECONDS))
{
becomeMaster.set(true);
stopListening = true;
/* Now that I'm server I must find out my own ip address on which to listen */
Enumeration<NetworkInterface> networkInterfaces;
try
{
networkInterfaces = NetworkInterface.getNetworkInterfaces();
for (NetworkInterface netint : Collections.list(networkInterfaces))
{
Enumeration<InetAddress> inetAddresses = netint.getInetAddresses();
for (InetAddress inetAddress : Collections.list(inetAddresses))
{
if (isIPAddress(inetAddress.getHostAddress())
&& !inetAddress.getHostAddress().equals("127.0.0.1"))
{
serverAddress = inetAddress.getHostAddress();
}
}
}
/* I notify to the rest of the program I have correctly initialized
* becomeMaster and serverAddress */
synchronized (finishedLock)
{
finishedLock.notify();
}
}
catch (Exception e)
{
Logger.getLogger(AutoDiscovery.class.getName()).log(Level.SEVERE, null, e);
System.exit(0);
}
log.info(Thread.currentThread().getName() + ": I acquired lock! will become master! my ip is " + serverAddress);
}
else
{
becomeMaster.set(false);
stopListening = false;
if (listeningThread == null || !listeningThread.isAlive())
{
if (!stopListening) //??? this codnition might be useless
{
startListeningThread();
}
}
}
}
catch (Exception e)
{
e.printStackTrace();
}
}
try
{
sleep(5000L);
}
catch (InterruptedException ex)
{
Logger.getLogger(AutoDiscovery.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
};
acquiringThread.setDaemon(true);
acquiringThread.start();
}
public void startListeningThread()
{
listeningThread = new Thread()
{
@Override
public void run()
{
try
{
while (true)
{
Thread.currentThread().setName(name + "ListeningThread");
communicationChannel.setReceiver(new ReceiverAdapter()
{
@Override
public void receive(Message msg)
{
if (msg.getObject() != null)
{
String leaderServerAddress = (msg.getObject().toString().substring(9));
if (isIPAddress(leaderServerAddress))
{
serverAddress = leaderServerAddress;
log.info(name + " Master server has ip" + serverAddress);
/* I notify to the rest of the program I have correctly initialized
* becomeMaster and serverAddress */
synchronized (finishedLock)
{
finishedLock.notify();
}
}
else
{
log.info(name + ": discarded message " + msg.getObject().toString());
}
}
}
});
sleep(10000L);
if (stopListening)
{
return;
}
}
}
catch (Exception e)
{
e.printStackTrace();
}
}
};
listeningThread.setDaemon(true);
listeningThread.start();
}
private void startStatusPrinterThread()
{
statusThread = new Thread()
{
@Override
public void run()
{
Thread.currentThread().setName(name + "StatusPrinterThread");
startAcquiringThread();
while (true)
{
try
{
if (becomeMaster.get())
{
log.info(name + " startStatusPrinterThread(): I am happily a Master!");
}
else
{
if (!acquiringThread.isAlive())
{
startAcquiringThread();
}
}
sleep(5000L);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
};
statusThread.setDaemon(true);
statusThread.start();
}
private static boolean isIPAddress(String str)
{
Pattern ipPattern = Pattern.compile("^([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\."
+ "([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\."
+ "([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\."
+ "([01]?\\d\\d?|2[0-4]\\d|25[0-5])$");
return ipPattern.matcher(str).matches();
}
}
今私の現在のudp.xmlは
<config xmlns="urn:org:jgroups"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.0.xsd">
<UDP
mcast_port="${jgroups.udp.mcast_port:45588}"
tos="8"
ucast_recv_buf_size="20M"
ucast_send_buf_size="640K"
mcast_recv_buf_size="25M"
mcast_send_buf_size="640K"
loopback="true"
level="WARN"
log_discard_msgs="false"
max_bundle_size="64K"
max_bundle_timeout="30"
ip_ttl="${jgroups.udp.ip_ttl:8}"
enable_diagnostics="true"
thread_naming_pattern="cl"
timer_type="new"
timer.min_threads="4"
timer.max_threads="10"
timer.keep_alive_time="3000"
timer.queue_max_size="500"
thread_pool.enabled="true"
thread_pool.min_threads="2"
thread_pool.max_threads="8"
thread_pool.keep_alive_time="5000"
thread_pool.queue_enabled="true"
thread_pool.queue_max_size="10000"
thread_pool.rejection_policy="discard"
oob_thread_pool.enabled="true"
oob_thread_pool.min_threads="1"
oob_thread_pool.max_threads="8"
oob_thread_pool.keep_alive_time="5000"
oob_thread_pool.queue_enabled="false"
oob_thread_pool.queue_max_size="100"
oob_thread_pool.rejection_policy="Run"/>
<PING timeout="2000"
num_initial_members="3"/>
<MERGE2 max_interval="30000"
min_interval="10000"/>
<FD_SOCK/>
<FD_ALL/>
<VERIFY_SUSPECT timeout="1500" />
<BARRIER />
<pbcast.NAKACK exponential_backoff="300"
xmit_stagger_timeout="200"
use_mcast_xmit="false"
discard_delivered_msgs="true"/>
<UNICAST />
<pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
max_bytes="4M"/>
<pbcast.GMS print_local_addr="true" join_timeout="3000"
view_bundling="true"/>
<UFC max_credits="2M"
min_threshold="0.4"/>
<MFC max_credits="2M"
min_threshold="0.4"/>
<FRAG2 frag_size="60K" />
<pbcast.STATE_TRANSFER />
<CENTRAL_LOCK />
<!-- pbcast.FLUSH /-->
</config>
上記は、同じマシンでプログラムの N インスタンスを実行すると機能します (N-1 メンバーがクライアントになり、1 メンバーがマスターになります)。同じ LAN に接続された 2 つの異なるマシンで実行すると、明らかに、各メンバーで同じクラスター名を指定して JChannel.connect() を呼び出した後、各メンバーはチャネルを作成し、共通のクラスターは作成されません。その結果、クライアントにメッセージを送信すると、他のマスターは同じクラスター名に対して異なる物理アドレスを認識し、すべてのメッセージがドロップされます。
したがって、次のような警告が表示されます。
7683 [Incoming-1,communication-channel,pc-home-41714] WARN org.jgroups.protocols.pbcast.NAKACK - [JGRP00011] pc-home-41714: dropped message 293 from non-member cf8b4ea6-8cc8-cb21-538f-b03f3fa7413d (view=[pc-home-41714|0] [pc-home-41714])
1207996 [TransferQueueBundler,communication-channel,pc-home-5280] WARN org.jgroups.protocols.UDP - pc-home-5280: no physical address for cf8b4ea6-8cc8-cb21-538f-b03f3fa7413d, dropping message
1209526 [TransferQueueBundler,lock-channel,pc-home-59082] WARN org.jgroups.protocols.UDP - pc-home-59082: no physical address for efbe6408-0e21-d119-e2b8-f1d5762d9b45, dropping message
udp.xml loopback="true" を loopback="false" に変更すると、両方が同じクラスターに接続されますが、次のようなエラーが発生します。
55539 [Node0StatusPrinterThread] INFO plarz.net.planningig.autodiscovery.AutoDiscovery - Node0 startStatusPrinterThread(): I am happily a Master!
59077 [TransferQueueBundler,lock-channel,pc-test-6919] ERROR org.jgroups.protocols.UDP - pc-test-6919: exception sending bundled msgs: java.lang.Exception: dest=/fe80:0:0:0:226:18ff:fece:6ccc%2:43109 (130 bytes):, cause: java.io.IOException: Network is unreachable
59505 [TransferQueueBundler,communication-channel,pc-test-35303] ERROR org.jgroups.protocols.UDP - pc-test-35303: exception sending bundled msgs: java.lang.Exception: dest=/fe80:0:0:0:226:18ff:fece:6ccc%2:55053 (139 bytes):, cause: java.io.IOException: Network is unreachable