dendini
dendini

Reputation: 3952

JGroups not forming a cluster with UDP

I am trying to create a leader election protocol using JGroups so that N instances of my program can elect a master and all clients get the ip of this master. More or less the current implementation relies on each instance trying to acquire a lock on lock-channel and when it successfully acquires this channel it becomes master and all others switch to clients.

import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.*;
import org.jgroups.*;
import org.jgroups.blocks.locking.LockService;

public class AutoDiscovery
{
    static org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(AutoDiscovery.class); //used for logging purposes (see log4j library)
    /* this variable indicates whether I have become the master or I'm just a client*/
    public volatile AtomicBoolean becomeMaster = new AtomicBoolean(false);
    /* The address of the server if we are a client or of ourself if we are
     * server */
    public String serverAddress;
    /* A channel on which to acquire a lock, so that only one can become server */
    private JChannel lockChannel;
    /* A shared channel ffor communication between client and master*/
    private JChannel communicationChannel;
    private LockService lockService;
    /* A thread which tries to acquire a lock */
    private Thread acquiringThread;
    /* A thread which listens for the server ip which may change */
    private Thread listeningThread;
    /* A thread which lists the status and initializes the acquiring thread*/
    private Thread statusThread;
    private String name;
    /* If we pass from being a client to being a server we must stop the listening
     * thread however we cannot call listeningThread.stop() but instead we change
     * the stopListening boolean to true */
    private boolean stopListening = false;
    /* This lock communicates I have finally become either master or client so
     * the serverAddress and becomeMaster variables are correctly set */
    public final Object finishedLock = new Object();

    public static void main(String[] args) throws Exception
    {
        Thread.currentThread().setName("MyMainThread");
        Random rand = new Random();

        AutoDiscovery master = new AutoDiscovery("Node" + rand.nextInt(10));

        master.lockChannel = new JChannel(AutoDiscovery.class.getResource("/resource/udp.xml"));
        master.lockChannel.connect("lock-channel");

        master.communicationChannel = new JChannel(AutoDiscovery.class.getResource("/resource/udp.xml"));
        master.communicationChannel.connect("communication-channel");

        master.lockService = new LockService(master.lockChannel);
        master.startStatusPrinterThread();
    }

    public AutoDiscovery(String name)
    {
        this.name = name;
    }

    public AutoDiscovery()
    {
        try
        {
            Thread.currentThread().setName("MyMainThread");
            Random rand = new Random();

            this.name = ("Node" + rand.nextInt(10));

            lockChannel = new JChannel(AutoDiscovery.class.getResource("/resource/udp.xml"));
            lockChannel.connect("lock-channel");

            communicationChannel = new JChannel(AutoDiscovery.class.getResource("/resource/udp.xml"));
            communicationChannel.connect("communication-channel");

            lockService = new LockService(lockChannel);
            startStatusPrinterThread();
        }
        catch (Exception ex)
        {
            Logger.getLogger(AutoDiscovery.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public void startAcquiringThread()
    {
        acquiringThread = new Thread()
        {
            @Override
            public void run()
            {
                while (true)
                {
                    //if you have become Master send your ip every now and then
                    if (becomeMaster.get())
                    {
                        try
                        {
                            communicationChannel.send(new Message(null, null, "serverip " + serverAddress));
                        }
                        catch (Exception ex)
                        {
                            Logger.getLogger(AutoDiscovery.class.getName()).log(Level.SEVERE, null, ex);
                        }
                    }
                    else
                    {
                        try
                        {
                            Thread.currentThread().setName(name + "AcquiringThread");
                            Lock lock = lockService.getLock("serverLock");
                            if (lock.tryLock(4, TimeUnit.SECONDS))
                            {
                                becomeMaster.set(true);
                                stopListening = true;
                                /* Now that I'm server I must find out my own ip address on which to listen */
                                Enumeration<NetworkInterface> networkInterfaces;
                                try
                                {
                                    networkInterfaces = NetworkInterface.getNetworkInterfaces();
                                    for (NetworkInterface netint : Collections.list(networkInterfaces))
                                    {
                                        Enumeration<InetAddress> inetAddresses = netint.getInetAddresses();
                                        for (InetAddress inetAddress : Collections.list(inetAddresses))
                                        {
                                            if (isIPAddress(inetAddress.getHostAddress())
                                                    && !inetAddress.getHostAddress().equals("127.0.0.1"))
                                            {
                                                serverAddress = inetAddress.getHostAddress();
                                            }
                                        }
                                    }
                                    /* I notify to the rest of the program I have correctly initialized 
                                     * becomeMaster and serverAddress */
                                    synchronized (finishedLock)
                                    {
                                        finishedLock.notify();
                                    }
                                }
                                catch (Exception e)
                                {
                                    Logger.getLogger(AutoDiscovery.class.getName()).log(Level.SEVERE, null, e);
                                    System.exit(0);
                                }
                                log.info(Thread.currentThread().getName() + ": I acquired lock! will become master! my ip is " + serverAddress);
                            }
                            else
                            {
                                becomeMaster.set(false);
                                stopListening = false;
                                if (listeningThread == null || !listeningThread.isAlive())
                                {
                                    if (!stopListening) //??? this codnition might be useless
                                    {
                                        startListeningThread();
                                    }
                                }
                            }
                        }
                        catch (Exception e)
                        {
                            e.printStackTrace();
                        }
                    }
                    try
                    {
                        sleep(5000L);
                    }
                    catch (InterruptedException ex)
                    {
                        Logger.getLogger(AutoDiscovery.class.getName()).log(Level.SEVERE, null, ex);
                    }
                }
            }
        };
        acquiringThread.setDaemon(true);
        acquiringThread.start();
    }

    public void startListeningThread()
    {
        listeningThread = new Thread()
        {
            @Override
            public void run()
            {
                try
                {
                    while (true)
                    {
                        Thread.currentThread().setName(name + "ListeningThread");
                        communicationChannel.setReceiver(new ReceiverAdapter()
                        {
                            @Override
                            public void receive(Message msg)
                            {
                                if (msg.getObject() != null)
                                {
                                    String leaderServerAddress = (msg.getObject().toString().substring(9));
                                    if (isIPAddress(leaderServerAddress))
                                    {
                                        serverAddress = leaderServerAddress;
                                        log.info(name + " Master server has ip" + serverAddress);
                                        /* I notify to the rest of the program I have correctly initialized 
                                         * becomeMaster and serverAddress */
                                        synchronized (finishedLock)
                                        {
                                            finishedLock.notify();
                                        }
                                    }
                                    else
                                    {
                                        log.info(name + ": discarded message " + msg.getObject().toString());
                                    }
                                }
                            }
                        });
                        sleep(10000L);
                        if (stopListening)
                        {
                            return;
                        }
                    }
                }
                catch (Exception e)
                {
                    e.printStackTrace();
                }
            }
        };
        listeningThread.setDaemon(true);
        listeningThread.start();
    }

    private void startStatusPrinterThread()
    {
        statusThread = new Thread()
        {
            @Override
            public void run()
            {
                Thread.currentThread().setName(name + "StatusPrinterThread");
                startAcquiringThread();
                while (true)
                {
                    try
                    {
                        if (becomeMaster.get())
                        {
                            log.info(name + " startStatusPrinterThread(): I am happily a Master!");
                        }
                        else
                        {
                            if (!acquiringThread.isAlive())
                            {
                                startAcquiringThread();
                            }
                        }
                        sleep(5000L);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                }
            }
        };
        statusThread.setDaemon(true);
        statusThread.start();
    }

    private static boolean isIPAddress(String str)
    {
        Pattern ipPattern = Pattern.compile("^([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\."
                + "([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\."
                + "([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\."
                + "([01]?\\d\\d?|2[0-4]\\d|25[0-5])$");
        return ipPattern.matcher(str).matches();
    }
}

now my current udp.xml is

<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.0.xsd">
    <UDP
        mcast_port="${jgroups.udp.mcast_port:45588}"
         tos="8"
         ucast_recv_buf_size="20M"
         ucast_send_buf_size="640K"
         mcast_recv_buf_size="25M"
         mcast_send_buf_size="640K"
         loopback="true"
         level="WARN"
         log_discard_msgs="false"
         max_bundle_size="64K"
         max_bundle_timeout="30"
         ip_ttl="${jgroups.udp.ip_ttl:8}"
         enable_diagnostics="true"
         thread_naming_pattern="cl"

         timer_type="new"
         timer.min_threads="4"
         timer.max_threads="10"
         timer.keep_alive_time="3000"
         timer.queue_max_size="500"

         thread_pool.enabled="true"
         thread_pool.min_threads="2"
         thread_pool.max_threads="8"
         thread_pool.keep_alive_time="5000"
         thread_pool.queue_enabled="true"
         thread_pool.queue_max_size="10000"
         thread_pool.rejection_policy="discard"

         oob_thread_pool.enabled="true"
         oob_thread_pool.min_threads="1"
         oob_thread_pool.max_threads="8"
         oob_thread_pool.keep_alive_time="5000"
         oob_thread_pool.queue_enabled="false"
         oob_thread_pool.queue_max_size="100"
         oob_thread_pool.rejection_policy="Run"/>

    <PING timeout="2000"
            num_initial_members="3"/>
    <MERGE2 max_interval="30000"
            min_interval="10000"/>
    <FD_SOCK/>
    <FD_ALL/>
    <VERIFY_SUSPECT timeout="1500"  />
    <BARRIER />
    <pbcast.NAKACK exponential_backoff="300"
                   xmit_stagger_timeout="200"
                   use_mcast_xmit="false"
                   discard_delivered_msgs="true"/>
    <UNICAST />
    <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
                   max_bytes="4M"/>
    <pbcast.GMS print_local_addr="true" join_timeout="3000"
                view_bundling="true"/>
    <UFC max_credits="2M"
         min_threshold="0.4"/>
    <MFC max_credits="2M"
         min_threshold="0.4"/>
    <FRAG2 frag_size="60K"  />
    <pbcast.STATE_TRANSFER />
    <CENTRAL_LOCK />
    <!-- pbcast.FLUSH  /-->
</config> 

Now the above works when I run N instances of the program on the same machine (with N-1 members becoming client and 1 becoming master). When run on two different machines both connected to the same LAN, apparently after calling JChannel.connect() with the same clustername in each member, each member creates its channel and no common cluster is created. The result is that when sending messages to the clients the other master sees a different physical address for the same cluster name and all messages are dropped.

So I get warnings like:

7683 [Incoming-1,communication-channel,pc-home-41714] WARN org.jgroups.protocols.pbcast.NAKACK  - [JGRP00011] pc-home-41714: dropped message 293 from non-member cf8b4ea6-8cc8-cb21-538f-b03f3fa7413d (view=[pc-home-41714|0] [pc-home-41714])

1207996 [TransferQueueBundler,communication-channel,pc-home-5280] WARN org.jgroups.protocols.UDP  - pc-home-5280: no physical address for cf8b4ea6-8cc8-cb21-538f-b03f3fa7413d, dropping message
1209526 [TransferQueueBundler,lock-channel,pc-home-59082] WARN org.jgroups.protocols.UDP  - pc-home-59082: no physical address for efbe6408-0e21-d119-e2b8-f1d5762d9b45, dropping message

If I change udp.xml loopback="true" to loopback="false" what happens is that they both connect to the same cluster but then they give an error like:

55539 [Node0StatusPrinterThread] INFO plarz.net.planningig.autodiscovery.AutoDiscovery  - Node0 startStatusPrinterThread(): I am happily a Master!
59077 [TransferQueueBundler,lock-channel,pc-test-6919] ERROR org.jgroups.protocols.UDP  - pc-test-6919: exception sending bundled msgs: java.lang.Exception: dest=/fe80:0:0:0:226:18ff:fece:6ccc%2:43109 (130 bytes):, cause: java.io.IOException: Network is unreachable
59505 [TransferQueueBundler,communication-channel,pc-test-35303] ERROR org.jgroups.protocols.UDP  - pc-test-35303: exception sending bundled msgs: java.lang.Exception: dest=/fe80:0:0:0:226:18ff:fece:6ccc%2:55053 (139 bytes):, cause: java.io.IOException: Network is unreachable

Upvotes: 0

Views: 5594

Answers (1)

Med Ali Difallah
Med Ali Difallah

Reputation: 146

Error:

[Server:ha-server-3] 13:59:13,122 WARNING [org.jgroups.protocols.UDP] (OOB-15,null) null: no physical address for 766de5c9-8ac2-6d30-89ef-78d39aa5f7eb, dropping message

In My case, it was due to having multiple jboss clusters at a same network and each of the clusters were having same name. For example ha-server-1 and ha-server-2 existed at two different clusters in different machines.

Cluster-1(10.10.10.10): | +- ha-server-1 +- ha-server-2

Cluster-2(10.10.10.20): | +- ha-server-1 +- ha-server-2

I have resolved this problem by changing the ha-server names. Note: Both were independent cluster. I assume it happened due to the multicast issue of JGroups. Any further explanation from an expert like you will be nice.

refer to http://icfun.blogspot.com/2013/10/no-physical-address-for-766de5c9-8ac2.html

Upvotes: 0

Related Questions