/*
 * Decompiled with CFR 0.152.
 */
package org.bundlebee.registry.net;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.HashSet;
import java.util.Set;
import org.bundlebee.registry.impl.RegistryImpl;
import org.bundlebee.registry.net.MultiCastMessage;
import org.bundlebee.registry.net.MultiCastMessageListener;
import org.bundlebee.registry.net.MultiCastMessageSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultiCastNode
implements MultiCastMessageListener,
MultiCastMessageSource {
    private static Logger LOG = LoggerFactory.getLogger(MultiCastNode.class);
    private static final int BUFFER_SIZE = 1024;
    protected static final int MAX_PORT_RANGE = 100;
    private static final String MESSAGE_CHAR_ENCODING = "UTF-8";
    private static final int DEFAULT_GROUP_PORT = 5555;
    protected static final int DEFAULT_LOCAL_PORT = 5556;
    private static final String ORG_BUNDLEBEE_REGISTRY_GROUPPORT = "org.bundlebee.registry.groupport";
    private static final String ORG_BUNDLEBEE_REGISTRY_GROUPADDRESS = "org.bundlebee.registry.groupaddress";
    protected static final String ORG_BUNDLEBEE_REGISTRY_LOCALADDRESS = "org.bundlebee.registry.localaddress";
    private static final String ORG_BUNDLEBEE_REGISTRY_LOCALPORT = "org.bundlebee.registry.localport";
    private static final int GROUP_PORT = MultiCastNode.getIntSystemProperty(5555, "org.bundlebee.registry.groupport");
    protected static final int LOCAL_PORT = MultiCastNode.getIntSystemProperty(5556, "org.bundlebee.registry.localport");
    private static final byte[] DEFAULT_GROUP_HOST = new byte[]{-28, 5, 6, 7};
    private static final byte[] GROUP_ADDRESS = MultiCastNode.getFourByteSystemProperty(DEFAULT_GROUP_HOST, "org.bundlebee.registry.groupaddress");
    private static final int DEFAULT_MAX_LS_REQUESTS = 3;
    private DatagramSocket privateSocket;
    private MulticastSocket groupSocket;
    private InetAddress groupAddress;
    private long nodeId;
    private boolean running;
    private Set<MultiCastMessageListener> multiCastMessageListeners = new HashSet<MultiCastMessageListener>();
    private int lsRequestSent;
    private int maxLsRequests = 3;
    private String localAddress = System.getProperty("org.bundlebee.registry.localaddress", InetAddress.getLocalHost().getHostAddress());

    private static byte[] getFourByteSystemProperty(byte[] defaultValue, String key) {
        String address = System.getProperty(key);
        if (address == null) {
            return defaultValue;
        }
        try {
            String[] byteStrings = address.split("\\.");
            if (byteStrings.length != 4) {
                return defaultValue;
            }
            byte[] bytes = new byte[4];
            for (int i = 0; i < bytes.length; ++i) {
                bytes[i] = (byte)Integer.parseInt(byteStrings[i]);
            }
            return bytes;
        }
        catch (NumberFormatException e) {
            LOG.error(RegistryImpl.BUNDLE_MARKER, e.toString(), (Throwable)e);
            return defaultValue;
        }
    }

    private static int getIntSystemProperty(int defaultValue, String key) {
        try {
            return Integer.parseInt(System.getProperty(key, "" + defaultValue));
        }
        catch (NumberFormatException e) {
            LOG.error(RegistryImpl.BUNDLE_MARKER, e.toString(), (Throwable)e);
            return defaultValue;
        }
    }

    public MultiCastNode(long nodeId) throws IOException {
        try {
            this.nodeId = nodeId;
            this.groupAddress = InetAddress.getByAddress(GROUP_ADDRESS);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Initializing MultiCastNode for " + nodeId + " and group address " + this.groupAddress + ".");
            }
            this.setRunning(true);
            this.makePrivateSocket();
            this.startPrivateListener();
            this.joinGroup();
            this.startGroupListener();
            MultiCastMessage hiMessage = new MultiCastMessage(nodeId, MultiCastMessage.Type.HI, this.getSender());
            this.sendToGroup(hiMessage);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Done initializing MultiCastNode for " + this.nodeId);
            }
        }
        catch (IOException e) {
            this.setRunning(false);
            throw e;
        }
    }

    public void addMultiCastMessageListener(MultiCastMessageListener multiCastMessageListener) {
        this.multiCastMessageListeners.add(multiCastMessageListener);
    }

    public void processMessage(MultiCastMessage multiCastMessage) {
        if (MultiCastMessage.Direction.OUT.equals((Object)multiCastMessage.getDirection())) {
            this.sendToGroup(multiCastMessage);
        } else if (LOG.isDebugEnabled()) {
            LOG.debug("Not sending message, because its direction is not " + (Object)((Object)MultiCastMessage.Direction.OUT) + ", but " + (Object)((Object)multiCastMessage.getDirection()));
        }
    }

    private void fireMessage(MultiCastMessage multiCastMessage) {
        for (MultiCastMessageListener multiCastMessageListener : this.multiCastMessageListeners) {
            multiCastMessageListener.processMessage(multiCastMessage);
        }
        if (this.multiCastMessageListeners.isEmpty() && LOG.isDebugEnabled()) {
            LOG.debug(RegistryImpl.BUNDLE_MARKER, "No MultiCastMessageListeners registered.");
        }
    }

    private void makePrivateSocket() throws SocketException {
        this.privateSocket = MultiCastNode.getAvailableSocket(LOCAL_PORT);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Bound private socket to port " + this.privateSocket.getLocalPort());
        }
    }

    int getPort() {
        return this.privateSocket.getLocalPort();
    }

    private void receivePrivateMessages() {
        while (this.isRunning()) {
            try {
                byte[] buf = new byte[1024];
                DatagramPacket receivePacket = new DatagramPacket(buf, buf.length);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Waiting for private message, listening on port " + this.privateSocket.getLocalPort());
                }
                this.privateSocket.receive(receivePacket);
                String data = this.toString(receivePacket);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got a private message: " + data);
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Received " + data + " from " + receivePacket.getAddress() + ":" + receivePacket.getPort() + ".");
                }
                this.processPrivateMessage(data);
            }
            catch (IOException e) {
                LOG.error(RegistryImpl.BUNDLE_MARKER, e.toString(), (Throwable)e);
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Stopped listening to private messages.");
        }
    }

    private void processPrivateMessage(String message) throws IOException {
        try {
            MultiCastMessage multiCastMessage = new MultiCastMessage(message);
            if (multiCastMessage.getType() == MultiCastMessage.Type.LS) {
                InetAddress address = multiCastMessage.getSenderAddress();
                int port = multiCastMessage.getSenderPort();
                for (MultiCastMessage mcm : ((RegistryImpl)RegistryImpl.getInstance()).getList()) {
                    this.sendPrivateMessage(mcm, address, port);
                }
            } else if (multiCastMessage.getType() == MultiCastMessage.Type.HI) {
                InetAddress address = multiCastMessage.getSenderAddress();
                int port = multiCastMessage.getSenderPort();
                if (multiCastMessage.getNodeId().longValue() == RegistryImpl.getInstance().getNodeId()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("HI message is from this node, doing nothing.");
                    }
                } else if (this.lsRequestSent < this.maxLsRequests) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Sending ls request to " + address);
                    }
                    this.sendPrivateMessage(new MultiCastMessage(this.nodeId, MultiCastMessage.Type.LS, this.getSender()), address, port);
                    ++this.lsRequestSent;
                } else if (LOG.isDebugEnabled()) {
                    LOG.debug("Max ls requests already sent, doing nothing.");
                }
            } else if (multiCastMessage.isValid()) {
                multiCastMessage.setDirection(MultiCastMessage.Direction.IN);
                this.fireMessage(multiCastMessage);
            }
        }
        catch (RuntimeException e) {
            LOG.error(RegistryImpl.BUNDLE_MARKER, "Failed to process private message: " + e.toString(), (Throwable)e);
        }
    }

    private void sendPrivateMessage(MultiCastMessage message, InetAddress address, int port) throws IOException {
        this.privateSocket.send(this.toDatagramPacket(message.toString(), address, port));
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sent " + message + " to " + address + ":" + port);
        }
    }

    private void joinGroup() throws IOException {
        this.groupSocket = new MulticastSocket(GROUP_PORT);
        this.groupSocket.joinGroup(this.groupAddress);
        if (LOG.isDebugEnabled()) {
            LOG.debug("MultiCastNode for " + this.nodeId + " joined group at " + this.groupAddress + ":" + GROUP_PORT);
        }
    }

    public void sendToGroup(MultiCastMessage multiCastMessage) {
        if (multiCastMessage.isValid()) {
            if (multiCastMessage.getType() == MultiCastMessage.Type.HI) {
                this.lsRequestSent = 0;
                if (multiCastMessage.getValues().getProperty("sender") == null) {
                    multiCastMessage.getValues().setProperty("sender", this.getSender());
                }
            }
            this.sendToGroup(multiCastMessage.toString());
        } else if (LOG.isDebugEnabled()) {
            LOG.debug(RegistryImpl.BUNDLE_MARKER, "not sending message, because it is not valid: " + multiCastMessage);
        }
    }

    public void sendToGroup(String message) {
        try {
            if (LOG.isInfoEnabled()) {
                LOG.info(RegistryImpl.BUNDLE_MARKER, "Sending packet to group: " + message);
            }
            DatagramPacket sendPacket = this.toDatagramPacket(message, this.groupAddress, GROUP_PORT);
            this.groupSocket.send(sendPacket);
            if (LOG.isInfoEnabled()) {
                LOG.info(RegistryImpl.BUNDLE_MARKER, "Packet sent to group: " + message);
            }
        }
        catch (IOException ioe) {
            LOG.error(RegistryImpl.BUNDLE_MARKER, "Failed to send packet: " + ioe.toString(), (Throwable)ioe);
        }
    }

    public void receiveGroupMessages() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Starting group receive loop for " + this.nodeId);
        }
        while (this.isRunning()) {
            try {
                byte[] data = new byte[1024];
                DatagramPacket packet = new DatagramPacket(data, data.length);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Waiting for group message on " + this.groupSocket);
                }
                this.groupSocket.receive(packet);
                String msg = this.toString(packet);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Received group msg " + msg + " from " + packet.getAddress() + ":" + packet.getPort() + ".");
                }
                this.processGroupMessage(msg);
            }
            catch (SocketTimeoutException e) {
                if (!LOG.isInfoEnabled()) continue;
                LOG.info(RegistryImpl.BUNDLE_MARKER, "GroupSocket receive timed out.");
            }
            catch (IOException e) {
                LOG.error(RegistryImpl.BUNDLE_MARKER, e.toString(), (Throwable)e);
            }
            catch (Throwable t) {
                LOG.error(RegistryImpl.BUNDLE_MARKER, t.toString(), t);
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Group receive loop for " + this.nodeId + " was stopped.");
        }
    }

    private void processGroupMessage(String message) throws IOException {
        try {
            MultiCastMessage multiCastMessage = new MultiCastMessage(message);
            if (multiCastMessage.getType() == MultiCastMessage.Type.HI && multiCastMessage.getNodeId().longValue() != RegistryImpl.getInstance().getNodeId()) {
                InetAddress address = multiCastMessage.getSenderAddress();
                int port = multiCastMessage.getSenderPort();
                this.sendPrivateMessage(new MultiCastMessage(this.nodeId, MultiCastMessage.Type.HI, this.getSender()), address, port);
            }
            if (LOG.isInfoEnabled()) {
                LOG.info(RegistryImpl.BUNDLE_MARKER, "(NODE) processing node message: " + message);
            }
            if (multiCastMessage.isValid()) {
                multiCastMessage.setDirection(MultiCastMessage.Direction.IN);
                this.fireMessage(multiCastMessage);
            }
        }
        catch (RuntimeException e) {
            LOG.error(RegistryImpl.BUNDLE_MARKER, "Failed to process group message: " + e.toString(), (Throwable)e);
        }
    }

    private String toString(DatagramPacket datagramPacket) throws UnsupportedEncodingException {
        return new String(datagramPacket.getData(), 0, datagramPacket.getLength(), MESSAGE_CHAR_ENCODING);
    }

    private DatagramPacket toDatagramPacket(String message, InetAddress address, int port) throws UnsupportedEncodingException {
        byte[] bytes = message.getBytes(MESSAGE_CHAR_ENCODING);
        return new DatagramPacket(bytes, bytes.length, address, port);
    }

    private String getSender() {
        return this.localAddress + ":" + this.privateSocket.getLocalPort();
    }

    private void startPrivateListener() {
        new Thread(new Runnable(){

            public void run() {
                MultiCastNode.this.receivePrivateMessages();
            }
        }, "PrivateListener").start();
    }

    private void startGroupListener() {
        new Thread(new Runnable(){

            public void run() {
                MultiCastNode.this.receiveGroupMessages();
            }
        }, "MultiCastNode[" + this.nodeId + "] GroupReceive").start();
    }

    public void stop() {
        if (LOG.isInfoEnabled()) {
            LOG.info(RegistryImpl.BUNDLE_MARKER, "Shutting down node " + this.nodeId);
        }
        this.setRunning(false);
    }

    public synchronized boolean isRunning() {
        return this.running;
    }

    public synchronized void setRunning(boolean running) {
        this.running = running;
    }

    private static DatagramSocket getAvailableSocket(int startport) throws SocketException {
        SocketException lastex = null;
        for (int port = startport; port < LOCAL_PORT + 100; ++port) {
            try {
                DatagramSocket ds = new DatagramSocket(port);
                LOG.info("LOCAL_PORT=" + port);
                return ds;
            }
            catch (SocketException e) {
                lastex = e;
                LOG.info("LOCAL_PORT " + port + " already in use - trying " + (port + 1));
                continue;
            }
        }
        throw lastex;
    }
}

