/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.blocks;

import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
import EDU.oswego.cs.dl.util.concurrent.Channel;
import EDU.oswego.cs.dl.util.concurrent.DirectExecutor;
import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.FutureResult;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jgroups.Address;
import org.jgroups.blocks.ConnectionTable;
import org.jgroups.protocols.TCP_NIO;
import org.jgroups.stack.IpAddress;
import org.jgroups.util.Util;

public class ConnectionTableNIO
extends ConnectionTable
implements Runnable {
    private ServerSocketChannel m_serverSocketChannel;
    private Selector m_acceptSelector;
    protected static final Log LOG = LogFactory.getLog((Class)ConnectionTableNIO.class);
    private WriteHandler[] m_writeHandlers;
    private int m_nextWriteHandler = 0;
    private final Object m_lockNextWriteHandler = new Object();
    private ReadHandler[] m_readHandlers;
    private int m_nextReadHandler = 0;
    private final Object m_lockNextReadHandler = new Object();
    private Executor m_requestProcessors;

    public ConnectionTableNIO(int srv_port) throws Exception {
        super(srv_port);
    }

    public ConnectionTableNIO(int srv_port, long reaper_interval, long conn_expire_time) throws Exception {
        super(srv_port, reaper_interval, conn_expire_time);
    }

    public ConnectionTableNIO(ConnectionTable.Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port) throws Exception {
        super(r, bind_addr, external_addr, srv_port, max_port);
    }

    public ConnectionTableNIO(ConnectionTable.Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port, long reaper_interval, long conn_expire_time) throws Exception {
        super(r, bind_addr, external_addr, srv_port, max_port, reaper_interval, conn_expire_time);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    ConnectionTable.Connection getConnection(Address dest) throws Exception {
        Connection conn = null;
        HashMap hashMap = this.conns;
        synchronized (hashMap) {
            conn = (Connection)this.conns.get(dest);
            if (conn == null) {
                int idx;
                block23: {
                    SocketChannel sock_ch;
                    block22: {
                        InetSocketAddress destAddress = new InetSocketAddress(((IpAddress)dest).getIpAddress(), ((IpAddress)dest).getPort());
                        sock_ch = SocketChannel.open(destAddress);
                        conn = new Connection(sock_ch, dest);
                        conn.sendLocalAddress(this.local_addr);
                        sock_ch.configureBlocking(false);
                        try {
                            if (LOG.isTraceEnabled()) {
                                LOG.trace((Object)("About to change new connection send buff size from " + sock_ch.socket().getSendBufferSize() + " bytes"));
                            }
                            sock_ch.socket().setSendBufferSize(this.send_buf_size);
                            if (LOG.isTraceEnabled()) {
                                LOG.trace((Object)("Changed new connection send buff size to " + sock_ch.socket().getSendBufferSize() + " bytes"));
                            }
                        }
                        catch (IllegalArgumentException ex) {
                            if (!this.log.isErrorEnabled()) break block22;
                            this.log.error((Object)("exception setting send buffer size to " + this.send_buf_size + " bytes: " + ex));
                        }
                    }
                    try {
                        if (LOG.isTraceEnabled()) {
                            LOG.trace((Object)("About to change new connection receive buff size from " + sock_ch.socket().getReceiveBufferSize() + " bytes"));
                        }
                        sock_ch.socket().setReceiveBufferSize(this.recv_buf_size);
                        if (LOG.isTraceEnabled()) {
                            LOG.trace((Object)("Changed new connection receive buff size to " + sock_ch.socket().getReceiveBufferSize() + " bytes"));
                        }
                    }
                    catch (IllegalArgumentException ex) {
                        if (!this.log.isErrorEnabled()) break block23;
                        this.log.error((Object)("exception setting receive buffer size to " + this.send_buf_size + " bytes: " + ex));
                    }
                }
                Object object = this.m_lockNextWriteHandler;
                synchronized (object) {
                    idx = this.m_nextWriteHandler = (this.m_nextWriteHandler + 1) % this.m_writeHandlers.length;
                }
                conn.setupWriteHandler(this.m_writeHandlers[idx]);
                try {
                    object = this.m_lockNextReadHandler;
                    synchronized (object) {
                        idx = this.m_nextReadHandler = (this.m_nextReadHandler + 1) % this.m_readHandlers.length;
                    }
                    this.m_readHandlers[idx].add(conn);
                }
                catch (InterruptedException e) {
                    if (LOG.isWarnEnabled()) {
                        LOG.warn((Object)("Thread (" + Thread.currentThread().getName() + ") was interrupted, closing connection"), (Throwable)e);
                    }
                    conn.destroy();
                    throw e;
                }
                this.addConnection(dest, conn);
                this.notifyConnectionOpened(dest);
                if (LOG.isInfoEnabled()) {
                    LOG.info((Object)("created socket to " + dest));
                }
            }
            return conn;
        }
    }

    protected void init() throws Exception {
        TCP_NIO NIOreceiver = (TCP_NIO)this.receiver;
        if (NIOreceiver.getProcessorMaxThreads() <= 0) {
            this.m_requestProcessors = new DirectExecutor();
        } else {
            PooledExecutor requestProcessors = new PooledExecutor((Channel)new BoundedBuffer(NIOreceiver.getProcessorQueueSize()), NIOreceiver.getProcessorMaxThreads());
            requestProcessors.setThreadFactory(new ThreadFactory(){

                public Thread newThread(Runnable runnable) {
                    return new Thread(Util.getGlobalThreadGroup(), runnable);
                }
            });
            requestProcessors.setMinimumPoolSize(NIOreceiver.getProcessorMinThreads());
            requestProcessors.setKeepAliveTime((long)NIOreceiver.getProcessorKeepAliveTime());
            requestProcessors.waitWhenBlocked();
            requestProcessors.createThreads(NIOreceiver.getProcessorThreads());
            this.m_requestProcessors = requestProcessors;
        }
        this.m_writeHandlers = WriteHandler.create(NIOreceiver.getWriterThreads());
        this.m_readHandlers = ReadHandler.create(NIOreceiver.getReaderThreads(), this);
    }

    public void stop() {
        int i;
        if (this.m_serverSocketChannel.isOpen()) {
            try {
                this.m_serverSocketChannel.close();
            }
            catch (Exception eat) {
                // empty catch block
            }
        }
        this.m_acceptSelector.wakeup();
        for (i = 0; i < this.m_readHandlers.length; ++i) {
            try {
                this.m_readHandlers[i].add(new Shutdown());
                continue;
            }
            catch (InterruptedException e) {
                LOG.error((Object)("Thread (" + Thread.currentThread().getName() + ") was interrupted, failed to shutdown selector"), (Throwable)e);
            }
        }
        for (i = 0; i < this.m_writeHandlers.length; ++i) {
            try {
                this.m_writeHandlers[i].QUEUE.put((Object)new Shutdown());
                this.m_writeHandlers[i].SELECTOR.wakeup();
                continue;
            }
            catch (InterruptedException e) {
                LOG.error((Object)("Thread (" + Thread.currentThread().getName() + ") was interrupted, failed to shutdown selector"), (Throwable)e);
            }
        }
        if (this.m_requestProcessors instanceof PooledExecutor) {
            ((PooledExecutor)this.m_requestProcessors).shutdownNow();
        }
        super.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    public void run() {
        conn = null;
        while (this.m_serverSocketChannel.isOpen()) {
            num = 0;
            try {
                num = this.m_acceptSelector.select();
            }
            catch (IOException e) {
                if (!ConnectionTableNIO.LOG.isWarnEnabled()) continue;
                ConnectionTableNIO.LOG.warn((Object)"Select operation on listening socket failed", (Throwable)e);
                continue;
            }
            if (num <= 0) continue;
            readyKeys = this.m_acceptSelector.selectedKeys();
            i = readyKeys.iterator();
            while (i.hasNext()) {
                block45: {
                    block44: {
                        key = i.next();
                        i.remove();
                        readyChannel = (ServerSocketChannel)key.channel();
                        client_sock_ch = null;
                        try {
                            client_sock_ch = readyChannel.accept();
                        }
                        catch (IOException e) {
                            if (!ConnectionTableNIO.LOG.isWarnEnabled()) continue;
                            ConnectionTableNIO.LOG.warn((Object)"Attempt to accept new connection from listening socket failed", (Throwable)e);
                            continue;
                        }
                        if (ConnectionTableNIO.LOG.isInfoEnabled()) {
                            ConnectionTableNIO.LOG.info((Object)("accepted connection, client_sock=" + client_sock_ch.socket()));
                        }
                        try {
                            if (ConnectionTableNIO.LOG.isTraceEnabled()) {
                                ConnectionTableNIO.LOG.trace((Object)("About to change new connection send buff size from " + client_sock_ch.socket().getSendBufferSize() + " bytes"));
                            }
                            client_sock_ch.socket().setSendBufferSize(this.send_buf_size);
                            if (ConnectionTableNIO.LOG.isTraceEnabled()) {
                                ConnectionTableNIO.LOG.trace((Object)("Changed new connection send buff size to " + client_sock_ch.socket().getSendBufferSize() + " bytes"));
                            }
                        }
                        catch (IllegalArgumentException ex) {
                            if (this.log.isErrorEnabled()) {
                                this.log.error((Object)("exception setting send buffer size to " + this.send_buf_size + " bytes: "), (Throwable)ex);
                            }
                        }
                        catch (SocketException e) {
                            if (!this.log.isErrorEnabled()) break block44;
                            this.log.error((Object)("exception setting send buffer size to " + this.send_buf_size + " bytes: "), (Throwable)e);
                        }
                    }
                    try {
                        if (ConnectionTableNIO.LOG.isTraceEnabled()) {
                            ConnectionTableNIO.LOG.trace((Object)("About to change new connection receive buff size from " + client_sock_ch.socket().getReceiveBufferSize() + " bytes"));
                        }
                        client_sock_ch.socket().setReceiveBufferSize(this.recv_buf_size);
                        if (ConnectionTableNIO.LOG.isTraceEnabled()) {
                            ConnectionTableNIO.LOG.trace((Object)("Changed new connection receive buff size to " + client_sock_ch.socket().getReceiveBufferSize() + " bytes"));
                        }
                    }
                    catch (IllegalArgumentException ex) {
                        if (this.log.isErrorEnabled()) {
                            this.log.error((Object)("exception setting receive buffer size to " + this.send_buf_size + " bytes: "), (Throwable)ex);
                        }
                    }
                    catch (SocketException e) {
                        if (!this.log.isErrorEnabled()) break block45;
                        this.log.error((Object)("exception setting receive buffer size to " + this.recv_buf_size + " bytes: "), (Throwable)e);
                    }
                }
                conn = new Connection(client_sock_ch, null);
                try {
                    conn.peer_addr = conn.readPeerAddress(client_sock_ch.socket());
                    e = this.conns;
                    synchronized (e) {
                        if (!this.conns.containsKey(conn.getPeerAddress())) ** GOTO lbl73
                        if (conn.getPeerAddress().equals(this.getLocalAddress())) {
                            if (ConnectionTableNIO.LOG.isTraceEnabled()) {
                                ConnectionTableNIO.LOG.trace((Object)(conn.getPeerAddress() + " is myself, not put it in table twice, but still read from it"));
                            }
                        } else {
                            if (ConnectionTableNIO.LOG.isWarnEnabled()) {
                                ConnectionTableNIO.LOG.warn((Object)(conn.getPeerAddress() + " is already there, will terminate connection"));
                            }
                            conn.destroy();
                            continue;
lbl73:
                            // 1 sources

                            this.addConnection(conn.getPeerAddress(), conn);
                        }
                    }
                    this.notifyConnectionOpened(conn.getPeerAddress());
                    client_sock_ch.configureBlocking(false);
                }
                catch (IOException e) {
                    if (ConnectionTableNIO.LOG.isWarnEnabled()) {
                        ConnectionTableNIO.LOG.warn((Object)"Attempt to configure non-blocking mode failed", (Throwable)e);
                    }
                    conn.destroy();
                    continue;
                }
                catch (Exception e) {
                    if (ConnectionTableNIO.LOG.isWarnEnabled()) {
                        ConnectionTableNIO.LOG.warn((Object)"Attempt to handshake with other peer failed", (Throwable)e);
                    }
                    conn.destroy();
                    continue;
                }
                try {
                    var9_20 = this.m_lockNextWriteHandler;
                    synchronized (var9_20) {
                        idx = this.m_nextWriteHandler = (this.m_nextWriteHandler + 1) % this.m_writeHandlers.length;
                    }
                    Connection.access$000(conn, this.m_writeHandlers[idx]);
                }
                catch (InterruptedException e) {
                    if (ConnectionTableNIO.LOG.isWarnEnabled()) {
                        ConnectionTableNIO.LOG.warn((Object)"Attempt to configure accepted connection was interrupted", (Throwable)e);
                    }
                    conn.destroy();
                    continue;
                }
                try {
                    var9_20 = this.m_lockNextReadHandler;
                    synchronized (var9_20) {
                        idx = this.m_nextReadHandler = (this.m_nextReadHandler + 1) % this.m_readHandlers.length;
                    }
                    ReadHandler.access$100(this.m_readHandlers[idx], conn);
                }
                catch (InterruptedException e) {
                    if (ConnectionTableNIO.LOG.isWarnEnabled()) {
                        ConnectionTableNIO.LOG.warn((Object)"Attempt to configure read handler for accepted connection failed", (Throwable)e);
                    }
                    conn.destroy();
                }
            }
        }
        if (ConnectionTableNIO.LOG.isTraceEnabled()) {
            ConnectionTableNIO.LOG.trace((Object)"acceptor thread terminated");
        }
    }

    protected ServerSocket createServerSocket(int start_port, int end_port) throws Exception {
        this.m_acceptSelector = Selector.open();
        this.m_serverSocketChannel = ServerSocketChannel.open();
        this.m_serverSocketChannel.configureBlocking(false);
        while (true) {
            try {
                InetSocketAddress sockAddr;
                if (this.bind_addr == null) {
                    sockAddr = new InetSocketAddress(start_port);
                    this.m_serverSocketChannel.socket().bind(sockAddr);
                    break;
                }
                sockAddr = new InetSocketAddress(this.bind_addr, start_port);
                this.m_serverSocketChannel.socket().bind(sockAddr, 20);
            }
            catch (BindException bind_ex) {
                if (start_port == end_port) {
                    throw (BindException)new BindException("No available port to bind to").initCause(bind_ex);
                }
                ++start_port;
                continue;
            }
            catch (SocketException bind_ex) {
                if (start_port == end_port) {
                    throw (BindException)new BindException("No available port to bind to").initCause(bind_ex);
                }
                ++start_port;
                continue;
            }
            catch (IOException io_ex) {
                if (LOG.isErrorEnabled()) {
                    LOG.error((Object)("Attempt to bind serversocket failed, port=" + start_port + ", bind addr=" + this.bind_addr), (Throwable)io_ex);
                }
                throw io_ex;
            }
            break;
        }
        this.srv_port = start_port;
        this.m_serverSocketChannel.register(this.m_acceptSelector, 16);
        return this.m_serverSocketChannel.socket();
    }

    protected void runRequest(Address addr, ByteBuffer buf) throws InterruptedException {
        this.m_requestProcessors.execute((Runnable)new ExecuteTask(addr, buf));
    }

    public static class WriteRequest {
        private final SocketChannel m_channel;
        private final ByteBuffer m_buffer;
        private final FutureResult m_callback;
        private final SelectorWriteHandler m_hdlr;

        WriteRequest(SocketChannel channel, ByteBuffer buffer, FutureResult callback, SelectorWriteHandler hdlr) {
            this.m_channel = channel;
            this.m_buffer = buffer;
            this.m_callback = callback;
            this.m_hdlr = hdlr;
        }

        SelectorWriteHandler getHandler() {
            return this.m_hdlr;
        }

        SocketChannel getChannel() {
            return this.m_channel;
        }

        ByteBuffer getBuffer() {
            return this.m_buffer;
        }

        FutureResult getCallback() {
            return this.m_callback;
        }
    }

    public static class SelectorWriteHandler {
        private final LinkedList m_writeRequests = new LinkedList();
        private boolean m_headerSent = false;
        private SocketChannel m_channel;
        private SelectionKey m_key;
        private Selector m_selector;
        private int m_bytesWritten = 0;
        private boolean m_enabled = false;
        private ByteBuffer m_headerBuffer;

        SelectorWriteHandler(SocketChannel channel, Selector selector, ByteBuffer headerBuffer) {
            this.m_channel = channel;
            this.m_selector = selector;
            this.m_headerBuffer = headerBuffer;
        }

        private void register(Selector selector, SocketChannel channel) throws ClosedChannelException {
            this.m_key = channel.register(selector, 0, this);
        }

        private boolean enable() {
            boolean rc = false;
            try {
                if (this.m_key == null) {
                    this.register(this.m_selector, this.m_channel);
                }
            }
            catch (ClosedChannelException e) {
                return rc;
            }
            if (!this.m_enabled) {
                rc = true;
                try {
                    this.m_key.interestOps(4);
                }
                catch (CancelledKeyException e) {
                    return false;
                }
                this.m_enabled = true;
            }
            return rc;
        }

        private void disable() {
            if (this.m_enabled) {
                try {
                    this.m_key.interestOps(0);
                }
                catch (CancelledKeyException cancelledKeyException) {
                    // empty catch block
                }
                this.m_enabled = false;
            }
        }

        private void cancel() {
            this.m_key.cancel();
        }

        boolean add(WriteRequest entry) {
            this.m_writeRequests.add(entry);
            return this.enable();
        }

        WriteRequest getCurrentRequest() {
            return (WriteRequest)this.m_writeRequests.getFirst();
        }

        SocketChannel getChannel() {
            return this.m_channel;
        }

        ByteBuffer getBuffer() {
            return this.getCurrentRequest().getBuffer();
        }

        FutureResult getCallback() {
            return this.getCurrentRequest().getCallback();
        }

        int getBytesWritten() {
            return this.m_bytesWritten;
        }

        void notifyError(Throwable error) {
            if (this.getCallback() != null) {
                this.getCallback().setException(error);
            }
        }

        void notifyObject(Object result) {
            if (this.getCallback() != null) {
                this.getCallback().set(result);
            }
        }

        boolean next() {
            boolean rc;
            this.m_headerSent = false;
            this.m_bytesWritten = 0;
            this.m_writeRequests.removeFirst();
            boolean bl = rc = !this.m_writeRequests.isEmpty();
            if (!rc) {
                this.disable();
            }
            return rc;
        }

        int write() throws IOException {
            if (!this.m_headerSent) {
                this.m_headerSent = true;
                this.m_headerBuffer.clear();
                this.m_headerBuffer.putInt(this.getBuffer().remaining());
                this.m_headerBuffer.flip();
                do {
                    this.getChannel().write(this.m_headerBuffer);
                } while (this.m_headerBuffer.remaining() > 0);
            }
            this.m_bytesWritten += this.getChannel().write(this.getBuffer());
            return this.getBuffer().remaining();
        }
    }

    private static class WriteHandler
    implements Runnable {
        private final LinkedQueue QUEUE = new LinkedQueue();
        private final Selector SELECTOR = this.initSelector();
        private int m_pendingChannels;
        private ByteBuffer m_headerBuffer = ByteBuffer.allocate(4);

        private WriteHandler() {
        }

        Selector initSelector() {
            try {
                return SelectorProvider.provider().openSelector();
            }
            catch (IOException e) {
                if (LOG.isErrorEnabled()) {
                    LOG.error((Object)e);
                }
                throw new IllegalStateException(e.getMessage());
            }
        }

        private static WriteHandler[] create(int workerThreads) {
            WriteHandler[] handlers = new WriteHandler[workerThreads];
            for (int looper = 0; looper < workerThreads; ++looper) {
                handlers[looper] = new WriteHandler();
                Thread thread = new Thread((Runnable)handlers[looper], "nioWriteHandlerThread");
                thread.setDaemon(true);
                thread.start();
            }
            return handlers;
        }

        private SelectorWriteHandler add(SocketChannel channel) throws InterruptedException {
            return new SelectorWriteHandler(channel, this.SELECTOR, this.m_headerBuffer);
        }

        private void write(SocketChannel channel, ByteBuffer buffer, FutureResult notification, SelectorWriteHandler hdlr) throws InterruptedException {
            this.QUEUE.put((Object)new WriteRequest(channel, buffer, notification, hdlr));
        }

        private void close(SelectorWriteHandler entry) {
            entry.cancel();
        }

        private void handleChannelError(Selector selector, SelectorWriteHandler entry, SelectionKey selKey, Throwable error) {
            do {
                if (error == null) continue;
                entry.notifyError(error);
            } while (entry.next());
            this.close(entry);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void processWrite(Selector selector) {
            Set<SelectionKey> keys = selector.selectedKeys();
            Object[] arr = keys.toArray();
            for (int looper = 0; looper < arr.length; ++looper) {
                SelectionKey key = (SelectionKey)arr[looper];
                SelectorWriteHandler entry = (SelectorWriteHandler)key.attachment();
                boolean needToDecrementPendingChannels = false;
                try {
                    if (0 != entry.write()) continue;
                    entry.notifyObject(new Integer(entry.getBytesWritten()));
                    if (entry.next()) continue;
                    needToDecrementPendingChannels = true;
                    continue;
                }
                catch (IOException e) {
                    needToDecrementPendingChannels = true;
                    this.handleChannelError(selector, entry, key, e);
                    continue;
                }
                finally {
                    if (needToDecrementPendingChannels) {
                        --this.m_pendingChannels;
                    }
                }
            }
            keys.clear();
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        public void run() {
            while (this.SELECTOR.isOpen()) {
                try {
                    WriteRequest queueEntry;
                    Object o;
                    while (null != (o = this.QUEUE.poll(0L))) {
                        if (o instanceof Shutdown) {
                            return;
                        }
                        queueEntry = (WriteRequest)o;
                        if (queueEntry.getHandler().add(queueEntry)) {
                            ++this.m_pendingChannels;
                        }
                        try {
                            if (this.SELECTOR.selectNow() <= 0) continue;
                            this.processWrite(this.SELECTOR);
                        }
                        catch (IOException e) {
                            if (LOG.isErrorEnabled()) {
                                LOG.error((Object)"SelectNow operation on write selector failed, didn't expect this to occur, please report this", (Throwable)e);
                            }
                            return;
                        }
                    }
                    if (this.m_pendingChannels == 0) {
                        o = this.QUEUE.take();
                        if (o instanceof Shutdown) {
                            return;
                        }
                        queueEntry = (WriteRequest)o;
                        if (!queueEntry.getHandler().add(queueEntry)) continue;
                        ++this.m_pendingChannels;
                        continue;
                    }
                    try {
                        if (this.SELECTOR.select() <= 0) continue;
                        this.processWrite(this.SELECTOR);
                    }
                    catch (IOException e) {
                        if (!LOG.isErrorEnabled()) continue;
                        LOG.error((Object)"Failure while writing to socket", (Throwable)e);
                    }
                    continue;
                }
                catch (InterruptedException e) {
                    if (!LOG.isErrorEnabled()) continue;
                    LOG.error((Object)("Thread (" + Thread.currentThread().getName() + ") was interrupted"), (Throwable)e);
                }
                catch (Throwable e) {
                    if (!LOG.isErrorEnabled()) continue;
                    LOG.error((Object)("Thread (" + Thread.currentThread().getName() + ") caught Throwable"), e);
                    continue;
                }
                break;
            }
            return;
        }
    }

    class Connection
    extends ConnectionTable.Connection {
        private SocketChannel sock_ch;
        private WriteHandler m_writeHandler;
        private SelectorWriteHandler m_selectorWriteHandler;
        private final ConnectionReadState m_readState;
        private static final int HEADER_SIZE = 4;
        final ByteBuffer headerBuffer;

        Connection(SocketChannel s, Address peer_addr) {
            super(s.socket(), peer_addr);
            this.sock_ch = null;
            this.headerBuffer = ByteBuffer.allocate(4);
            this.sock_ch = s;
            this.m_readState = new ConnectionReadState(this);
        }

        private ConnectionReadState getReadState() {
            return this.m_readState;
        }

        private void setupWriteHandler(WriteHandler hdlr) throws InterruptedException {
            this.m_writeHandler = hdlr;
            this.m_selectorWriteHandler = hdlr.add(this.sock_ch);
        }

        void destroy() {
            this.closeSocket();
        }

        void doSend(byte[] buffie, int offset, int length) throws Exception {
            FutureResult result = new FutureResult();
            this.m_writeHandler.write(this.sock_ch, ByteBuffer.wrap(buffie, offset, length), result, this.m_selectorWriteHandler);
            InvocationTargetException ex = result.getException();
            if (ex != null) {
                if (LOG.isErrorEnabled()) {
                    LOG.error((Object)"failed sending message", (Throwable)ex);
                }
                if (((Throwable)ex).getCause() instanceof IOException) {
                    throw (IOException)((Throwable)ex).getCause();
                }
                throw ex;
            }
            result.get();
        }

        SocketChannel getSocketChannel() {
            return this.sock_ch;
        }

        void closeSocket() {
            if (this.sock_ch != null) {
                try {
                    if (this.sock_ch.isConnected()) {
                        this.sock_ch.close();
                    }
                }
                catch (Exception exception) {
                    // empty catch block
                }
                this.sock_ch = null;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void closed() {
            Address peerAddr = this.getPeerAddress();
            HashMap hashMap = ConnectionTableNIO.this.conns;
            synchronized (hashMap) {
                ConnectionTableNIO.this.conns.remove(peerAddr);
            }
            ConnectionTableNIO.this.notifyConnectionClosed(peerAddr);
        }
    }

    private class ConnectionReadState {
        private final Connection m_conn;
        private boolean m_headFinished = false;
        private ByteBuffer m_readBodyBuf = null;
        private final ByteBuffer m_readHeadBuf = ByteBuffer.allocate(4);

        public ConnectionReadState(Connection conn) {
            this.m_conn = conn;
        }

        ByteBuffer getReadBodyBuffer() {
            return this.m_readBodyBuf;
        }

        ByteBuffer getReadHeadBuffer() {
            return this.m_readHeadBuf;
        }

        void bodyFinished() {
            this.m_headFinished = false;
            this.m_readHeadBuf.clear();
            this.m_readBodyBuf = null;
            this.m_conn.updateLastAccessed();
        }

        int headFinished() {
            this.m_headFinished = true;
            this.m_readHeadBuf.flip();
            int messageSize = this.m_readHeadBuf.getInt();
            this.m_readBodyBuf = ByteBuffer.allocate(messageSize);
            this.m_conn.updateLastAccessed();
            return messageSize;
        }

        boolean isHeadFinished() {
            return this.m_headFinished;
        }
    }

    private class ExecuteTask
    implements Runnable {
        Address m_addr = null;
        ByteBuffer m_buf = null;

        public ExecuteTask(Address addr, ByteBuffer buf) {
            this.m_addr = addr;
            this.m_buf = buf;
        }

        public void run() {
            ConnectionTableNIO.this.receive(this.m_addr, this.m_buf.array(), this.m_buf.arrayOffset(), this.m_buf.limit());
        }
    }

    private static class ReadHandler
    implements Runnable {
        private final Selector SELECTOR = this.initHandler();
        private final LinkedQueue QUEUE = new LinkedQueue();
        private final ConnectionTableNIO connectTable;

        ReadHandler(ConnectionTableNIO ct) {
            this.connectTable = ct;
        }

        public Selector initHandler() {
            try {
                return Selector.open();
            }
            catch (IOException e) {
                if (LOG.isErrorEnabled()) {
                    LOG.error((Object)e);
                }
                throw new IllegalStateException(e.getMessage());
            }
        }

        private static ReadHandler[] create(int workerThreads, ConnectionTableNIO ct) {
            ReadHandler[] handlers = new ReadHandler[workerThreads];
            for (int looper = 0; looper < workerThreads; ++looper) {
                handlers[looper] = new ReadHandler(ct);
                Thread thread = new Thread((Runnable)handlers[looper], "nioReadHandlerThread");
                thread.setDaemon(true);
                thread.start();
            }
            return handlers;
        }

        private void add(Object conn) throws InterruptedException {
            this.QUEUE.put(conn);
            this.wakeup();
        }

        private void wakeup() {
            this.SELECTOR.wakeup();
        }

        public void run() {
            while (true) {
                int events = 0;
                try {
                    events = this.SELECTOR.select();
                }
                catch (IOException e) {
                    if (!LOG.isWarnEnabled()) continue;
                    LOG.warn((Object)"Select operation on socket failed", (Throwable)e);
                    continue;
                }
                catch (ClosedSelectorException e) {
                    if (LOG.isWarnEnabled()) {
                        LOG.warn((Object)"Select operation on socket failed", (Throwable)e);
                    }
                    return;
                }
                if (events > 0) {
                    Set<SelectionKey> readyKeys = this.SELECTOR.selectedKeys();
                    Iterator<SelectionKey> i = readyKeys.iterator();
                    while (i.hasNext()) {
                        SelectionKey key = i.next();
                        i.remove();
                        Connection conn = (Connection)key.attachment();
                        try {
                            if (conn.getSocketChannel().isOpen()) {
                                this.readOnce(conn);
                                continue;
                            }
                            conn.closed();
                        }
                        catch (IOException e) {
                            if (LOG.isWarnEnabled()) {
                                LOG.warn((Object)"Read operation on socket failed", (Throwable)e);
                            }
                            key.cancel();
                            conn.destroy();
                            conn.closed();
                        }
                    }
                }
                Object o = null;
                try {
                    o = this.QUEUE.poll(0L);
                }
                catch (InterruptedException e) {
                    if (!LOG.isInfoEnabled()) continue;
                    LOG.info((Object)("Thread (" + Thread.currentThread().getName() + ") was interrupted while polling queue"), (Throwable)e);
                    continue;
                }
                if (null == o) continue;
                if (o instanceof Shutdown) {
                    return;
                }
                Connection conn = (Connection)o;
                SocketChannel sc = conn.getSocketChannel();
                try {
                    sc.register(this.SELECTOR, 1, conn);
                    continue;
                }
                catch (ClosedChannelException e) {
                    if (LOG.isInfoEnabled()) {
                        LOG.info((Object)"Socket channel was closed while we were trying to register it to selector", (Throwable)e);
                    }
                    conn.destroy();
                    conn.closed();
                    continue;
                }
                break;
            }
        }

        private void readOnce(Connection conn) throws IOException {
            int size;
            ConnectionReadState readState = conn.getReadState();
            if (!readState.isHeadFinished() && 0 == (size = this.readHeader(conn))) {
                return;
            }
            if (this.readBody(conn) > 0) {
                return;
            }
            Address addr = conn.getPeerAddress();
            ByteBuffer buf = readState.getReadBodyBuffer();
            readState.bodyFinished();
            try {
                this.connectTable.runRequest(addr, buf);
            }
            catch (InterruptedException e) {
                LOG.error((Object)("Thread (" + Thread.currentThread().getName() + ") was interrupted while assigning executor to process read request"), (Throwable)e);
            }
        }

        private int read(Connection conn, ByteBuffer buf) throws IOException {
            SocketChannel sc = conn.getSocketChannel();
            int num = sc.read(buf);
            if (-1 == num) {
                throw new IOException("Couldn't read from socket as peer closed the socket");
            }
            return buf.remaining();
        }

        private int readHeader(Connection conn) throws IOException {
            ConnectionReadState readState = conn.getReadState();
            ByteBuffer headBuf = readState.getReadHeadBuffer();
            SocketChannel sc = conn.getSocketChannel();
            while (headBuf.remaining() > 0) {
                int num = sc.read(headBuf);
                if (-1 == num) {
                    throw new IOException("Peer closed socket");
                }
                if (0 != num) continue;
                return 0;
            }
            return readState.headFinished();
        }

        private int readBody(Connection conn) throws IOException {
            ByteBuffer bodyBuf = conn.getReadState().getReadBodyBuffer();
            SocketChannel sc = conn.getSocketChannel();
            while (bodyBuf.remaining() > 0) {
                int num = sc.read(bodyBuf);
                if (-1 == num) {
                    throw new IOException("Couldn't read from socket as peer closed the socket");
                }
                if (0 != num) continue;
                return bodyBuf.remaining();
            }
            bodyBuf.flip();
            return 0;
        }
    }

    private static class Shutdown {
        private Shutdown() {
        }
    }
}

