/*
 * Decompiled with CFR 0.152.
 */
package org.mobicents.protocols.sctp;

import com.sun.nio.sctp.MessageInfo;
import com.sun.nio.sctp.SctpChannel;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.AbstractSelectableChannel;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import javolution.util.FastList;
import javolution.xml.XMLFormat;
import javolution.xml.stream.XMLStreamException;
import org.apache.log4j.Logger;
import org.mobicents.protocols.api.Association;
import org.mobicents.protocols.api.AssociationListener;
import org.mobicents.protocols.api.AssociationType;
import org.mobicents.protocols.api.IpChannelType;
import org.mobicents.protocols.api.ManagementEventListener;
import org.mobicents.protocols.api.PayloadData;
import org.mobicents.protocols.sctp.AssociationHandler;
import org.mobicents.protocols.sctp.ChangeRequest;
import org.mobicents.protocols.sctp.ManagementImpl;
import org.mobicents.protocols.sctp.ServerImpl;
import org.mobicents.protocols.sctp.Worker;

public class AssociationImpl
implements Association {
    protected static final Logger logger = Logger.getLogger(AssociationImpl.class.getName());
    private static final String NAME = "name";
    private static final String SERVER_NAME = "serverName";
    private static final String HOST_ADDRESS = "hostAddress";
    private static final String HOST_PORT = "hostPort";
    private static final String PEER_ADDRESS = "peerAddress";
    private static final String PEER_PORT = "peerPort";
    private static final String ASSOCIATION_TYPE = "assoctype";
    private static final String IPCHANNEL_TYPE = "ipChannelType";
    private static final String EXTRA_HOST_ADDRESS = "extraHostAddress";
    private static final String EXTRA_HOST_ADDRESS_SIZE = "extraHostAddresseSize";
    private String hostAddress;
    private int hostPort;
    private String peerAddress;
    private int peerPort;
    private String serverName;
    private String name;
    private IpChannelType ipChannelType;
    private String[] extraHostAddresses;
    private ServerImpl server;
    private AssociationType type;
    private AssociationListener associationListener = null;
    protected final AssociationHandler associationHandler = new AssociationHandler();
    protected volatile SocketAddress peerSocketAddress = null;
    private volatile boolean started = false;
    protected volatile boolean up = false;
    private int[] workerThreadTable = null;
    private ConcurrentLinkedQueue<PayloadData> txQueue = new ConcurrentLinkedQueue();
    private ManagementImpl management;
    private SctpChannel socketChannelSctp;
    private SocketChannel socketChannelTcp;
    private ByteBuffer rxBuffer;
    private ByteBuffer txBuffer;
    private volatile MessageInfo msgInfo;
    private volatile int ioErrors = 0;
    protected static final XMLFormat<AssociationImpl> ASSOCIATION_XML = new XMLFormat<AssociationImpl>(AssociationImpl.class){

        @Override
        public void read(XMLFormat.InputElement xml, AssociationImpl association) throws XMLStreamException {
            association.name = xml.getAttribute(AssociationImpl.NAME, "");
            association.type = AssociationType.getAssociationType(xml.getAttribute(AssociationImpl.ASSOCIATION_TYPE, ""));
            association.hostAddress = xml.getAttribute(AssociationImpl.HOST_ADDRESS, "");
            association.hostPort = xml.getAttribute(AssociationImpl.HOST_PORT, 0);
            association.peerAddress = xml.getAttribute(AssociationImpl.PEER_ADDRESS, "");
            association.peerPort = xml.getAttribute(AssociationImpl.PEER_PORT, 0);
            association.serverName = xml.getAttribute(AssociationImpl.SERVER_NAME, "");
            association.ipChannelType = IpChannelType.getInstance(xml.getAttribute(AssociationImpl.IPCHANNEL_TYPE, IpChannelType.SCTP.getCode()));
            if (association.ipChannelType == null) {
                association.ipChannelType = IpChannelType.SCTP;
            }
            int extraHostAddressesSize = xml.getAttribute(AssociationImpl.EXTRA_HOST_ADDRESS_SIZE, 0);
            AssociationImpl.access$802(association, new String[extraHostAddressesSize]);
            for (int i = 0; i < extraHostAddressesSize; ++i) {
                ((AssociationImpl)association).extraHostAddresses[i] = xml.get(AssociationImpl.EXTRA_HOST_ADDRESS, String.class);
            }
        }

        @Override
        public void write(AssociationImpl association, XMLFormat.OutputElement xml) throws XMLStreamException {
            xml.setAttribute(AssociationImpl.NAME, association.name);
            xml.setAttribute(AssociationImpl.ASSOCIATION_TYPE, association.type.getType());
            xml.setAttribute(AssociationImpl.HOST_ADDRESS, association.hostAddress);
            xml.setAttribute(AssociationImpl.HOST_PORT, association.hostPort);
            xml.setAttribute(AssociationImpl.PEER_ADDRESS, association.peerAddress);
            xml.setAttribute(AssociationImpl.PEER_PORT, association.peerPort);
            xml.setAttribute(AssociationImpl.SERVER_NAME, association.serverName);
            xml.setAttribute(AssociationImpl.IPCHANNEL_TYPE, association.ipChannelType.getCode());
            xml.setAttribute(AssociationImpl.EXTRA_HOST_ADDRESS_SIZE, association.extraHostAddresses != null ? association.extraHostAddresses.length : 0);
            if (association.extraHostAddresses != null) {
                for (String s : association.extraHostAddresses) {
                    xml.add(s, AssociationImpl.EXTRA_HOST_ADDRESS, String.class);
                }
            }
        }
    };

    public AssociationImpl() {
    }

    protected void initChannels() {
        this.rxBuffer = ByteBuffer.allocateDirect(this.management.getBufferSize());
        this.txBuffer = ByteBuffer.allocateDirect(this.management.getBufferSize());
        this.txBuffer.clear();
        this.txBuffer.rewind();
        this.txBuffer.flip();
        this.rxBuffer.clear();
        this.rxBuffer.rewind();
        this.rxBuffer.flip();
    }

    public AssociationImpl(String hostAddress, int hostPort, String peerAddress, int peerPort, String assocName, IpChannelType ipChannelType, String[] extraHostAddresses) throws IOException {
        this();
        this.hostAddress = hostAddress;
        this.hostPort = hostPort;
        this.peerAddress = peerAddress;
        this.peerPort = peerPort;
        this.name = assocName;
        this.ipChannelType = ipChannelType;
        this.extraHostAddresses = extraHostAddresses;
        this.type = AssociationType.CLIENT;
    }

    public AssociationImpl(String peerAddress, int peerPort, String serverName, String assocName, IpChannelType ipChannelType) {
        this();
        this.peerAddress = peerAddress;
        this.peerPort = peerPort;
        this.serverName = serverName;
        this.name = assocName;
        this.ipChannelType = ipChannelType;
        this.type = AssociationType.SERVER;
    }

    protected AssociationImpl(String peerAddress, int peerPort, String serverName, IpChannelType ipChannelType, ServerImpl server) {
        this();
        this.peerAddress = peerAddress;
        this.peerPort = peerPort;
        this.serverName = serverName;
        this.ipChannelType = ipChannelType;
        this.server = server;
        this.type = AssociationType.ANONYMOUS_SERVER;
    }

    protected void start() throws Exception {
        if (this.associationListener == null) {
            throw new NullPointerException(String.format("AssociationListener is null for Associatoion=%s", this.name));
        }
        if (this.type == AssociationType.CLIENT) {
            this.scheduleConnect();
        }
        this.started = true;
        if (logger.isInfoEnabled() && this.type != AssociationType.ANONYMOUS_SERVER) {
            logger.info(String.format("Started Association=%s", this));
        }
        for (ManagementEventListener lstr : this.management.getManagementEventListeners()) {
            try {
                lstr.onAssociationStarted(this);
            }
            catch (Throwable ee) {
                logger.error("Exception while invoking onAssociationStarted", ee);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void stop() throws Exception {
        this.started = false;
        for (ManagementEventListener lstr : this.management.getManagementEventListeners()) {
            try {
                lstr.onAssociationStopped(this);
            }
            catch (Throwable ee) {
                logger.error("Exception while invoking onAssociationStopped", ee);
            }
        }
        if (this.getSocketChannel() != null && this.getSocketChannel().isOpen()) {
            FastList<ChangeRequest> pendingChanges;
            FastList<ChangeRequest> fastList = pendingChanges = this.management.getPendingChanges();
            synchronized (fastList) {
                pendingChanges.add(new ChangeRequest(this.getSocketChannel(), this, 4, -1));
            }
            this.management.getSocketSelector().wakeup();
        }
    }

    @Override
    public void acceptAnonymousAssociation(AssociationListener associationListener) throws Exception {
        this.associationListener = associationListener;
        if (this.getAssociationType() != AssociationType.ANONYMOUS_SERVER) {
            throw new UnsupportedOperationException("Association.acceptAnonymousAssociation() can be applied only for anonymous associations");
        }
        this.start();
    }

    @Override
    public void rejectAnonymousAssociation() {
    }

    @Override
    public void stopAnonymousAssociation() throws Exception {
        if (this.getAssociationType() != AssociationType.ANONYMOUS_SERVER) {
            throw new UnsupportedOperationException("Association.stopAnonymousAssociation() can be applied only for anonymous associations");
        }
        this.stop();
    }

    @Override
    public IpChannelType getIpChannelType() {
        return this.ipChannelType;
    }

    @Override
    public AssociationListener getAssociationListener() {
        return this.associationListener;
    }

    @Override
    public void setAssociationListener(AssociationListener associationListener) {
        this.associationListener = associationListener;
    }

    @Override
    public String getName() {
        return this.name;
    }

    @Override
    public AssociationType getAssociationType() {
        return this.type;
    }

    @Override
    public boolean isStarted() {
        return this.started;
    }

    @Override
    public boolean isConnected() {
        return this.started && this.up;
    }

    @Override
    public boolean isUp() {
        return this.up;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void markAssociationUp() {
        if (this.server != null) {
            FastList<Association> fastList = this.server.anonymAssociations;
            synchronized (fastList) {
                this.server.anonymAssociations.add(this);
            }
        }
        this.up = true;
        for (ManagementEventListener lstr : this.management.getManagementEventListeners()) {
            try {
                lstr.onAssociationUp(this);
            }
            catch (Throwable ee) {
                logger.error("Exception while invoking onAssociationUp", ee);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void markAssociationDown() {
        this.up = false;
        for (ManagementEventListener lstr : this.management.getManagementEventListeners()) {
            try {
                lstr.onAssociationDown(this);
            }
            catch (Throwable ee) {
                logger.error("Exception while invoking onAssociationDown", ee);
            }
        }
        if (this.server != null) {
            FastList<Association> fastList = this.server.anonymAssociations;
            synchronized (fastList) {
                this.server.anonymAssociations.remove(this);
            }
        }
    }

    @Override
    public String getHostAddress() {
        return this.hostAddress;
    }

    @Override
    public int getHostPort() {
        return this.hostPort;
    }

    @Override
    public String getPeerAddress() {
        return this.peerAddress;
    }

    @Override
    public int getPeerPort() {
        return this.peerPort;
    }

    @Override
    public String getServerName() {
        return this.serverName;
    }

    @Override
    public String[] getExtraHostAddresses() {
        return this.extraHostAddresses;
    }

    protected void setManagement(ManagementImpl management) {
        this.management = management;
        this.initChannels();
    }

    private AbstractSelectableChannel getSocketChannel() {
        if (this.ipChannelType == IpChannelType.SCTP) {
            return this.socketChannelSctp;
        }
        return this.socketChannelTcp;
    }

    protected void setSocketChannel(AbstractSelectableChannel socketChannel) {
        if (this.ipChannelType == IpChannelType.SCTP) {
            this.socketChannelSctp = (SctpChannel)socketChannel;
        } else {
            this.socketChannelTcp = (SocketChannel)socketChannel;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void send(PayloadData payloadData) throws Exception {
        FastList<ChangeRequest> pendingChanges;
        this.checkSocketIsOpen();
        FastList<ChangeRequest> fastList = pendingChanges = this.management.getPendingChanges();
        synchronized (fastList) {
            pendingChanges.add(new ChangeRequest(this.getSocketChannel(), this, 2, 4));
            this.txQueue.add(payloadData);
        }
        this.management.getSocketSelector().wakeup();
    }

    private void checkSocketIsOpen() throws Exception {
        if (this.ipChannelType == IpChannelType.SCTP) {
            if (!this.started || this.socketChannelSctp == null || !this.socketChannelSctp.isOpen() || this.socketChannelSctp.association() == null) {
                throw new Exception(String.format("Underlying sctp channel doesn't open or doesn't have association for Association=%s", this.name));
            }
        } else if (!(this.started && this.socketChannelTcp != null && this.socketChannelTcp.isOpen() && this.socketChannelTcp.isConnected())) {
            throw new Exception(String.format("Underlying tcp channel doesn't open for Association=%s", this.name));
        }
    }

    protected void read() {
        block12: {
            try {
                PayloadData payload = this.ipChannelType == IpChannelType.SCTP ? this.doReadSctp() : this.doReadTcp();
                if (payload == null) {
                    return;
                }
                if (logger.isDebugEnabled()) {
                    logger.debug(String.format("Rx : Ass=%s %s", this.name, payload));
                }
                if (this.management.isSingleThread()) {
                    try {
                        this.associationListener.onPayload(this, payload);
                    }
                    catch (Exception e) {
                        logger.error(String.format("Error while calling Listener for Association=%s.Payload=%s", this.name, payload), e);
                    }
                } else {
                    Worker worker = new Worker(this, this.associationListener, payload);
                    ExecutorService executorService = this.management.getExecutorService(this.workerThreadTable[payload.getStreamNumber()]);
                    try {
                        executorService.execute(worker);
                    }
                    catch (RejectedExecutionException e) {
                        logger.error(String.format("Rejected %s as Executors is shutdown", payload), e);
                    }
                    catch (NullPointerException e) {
                        logger.error(String.format("NullPointerException while submitting %s", payload), e);
                    }
                    catch (Exception e) {
                        logger.error(String.format("Exception while submitting %s", payload), e);
                    }
                }
            }
            catch (IOException e) {
                ++this.ioErrors;
                logger.error(String.format("IOException while trying to read from underlying socket for Association=%s IOError count=%d", this.name, this.ioErrors), e);
                if (this.ioErrors <= this.management.getMaxIOErrors()) break block12;
                this.close();
                this.scheduleConnect();
            }
        }
    }

    private PayloadData doReadSctp() throws IOException {
        this.rxBuffer.clear();
        MessageInfo messageInfo = this.socketChannelSctp.receive(this.rxBuffer, this, this.associationHandler);
        if (messageInfo == null) {
            if (logger.isDebugEnabled()) {
                logger.debug(String.format(" messageInfo is null for Association=%s", this.name));
            }
            return null;
        }
        int len = messageInfo.bytes();
        if (len == -1) {
            logger.error(String.format("Rx -1 while trying to read from underlying socket for Association=%s ", this.name));
            this.close();
            this.scheduleConnect();
            return null;
        }
        this.rxBuffer.flip();
        byte[] data = new byte[len];
        this.rxBuffer.get(data);
        this.rxBuffer.clear();
        PayloadData payload = new PayloadData(len, data, messageInfo.isComplete(), messageInfo.isUnordered(), messageInfo.payloadProtocolID(), messageInfo.streamNumber());
        return payload;
    }

    private PayloadData doReadTcp() throws IOException {
        this.rxBuffer.clear();
        int len = this.socketChannelTcp.read(this.rxBuffer);
        if (len == -1) {
            logger.warn(String.format("Rx -1 while trying to read from underlying socket for Association=%s ", this.name));
            this.close();
            this.scheduleConnect();
            return null;
        }
        this.rxBuffer.flip();
        byte[] data = new byte[len];
        this.rxBuffer.get(data);
        this.rxBuffer.clear();
        PayloadData payload = new PayloadData(len, data, true, false, 0, 0);
        return payload;
    }

    protected void write(SelectionKey key) {
        block11: {
            try {
                if (this.txBuffer.hasRemaining()) {
                    this.doSend();
                }
                if (!this.txQueue.isEmpty() && !this.txBuffer.hasRemaining()) {
                    while (!this.txQueue.isEmpty()) {
                        this.txBuffer.clear();
                        PayloadData payloadData = this.txQueue.poll();
                        if (logger.isDebugEnabled()) {
                            logger.debug(String.format("Tx : Ass=%s %s", this.name, payloadData));
                        }
                        this.txBuffer.put(payloadData.getData());
                        if (this.ipChannelType == IpChannelType.SCTP) {
                            int seqControl = payloadData.getStreamNumber();
                            if (seqControl < 0 || seqControl >= this.associationHandler.getMaxOutboundStreams()) {
                                try {
                                    this.associationListener.inValidStreamId(payloadData);
                                }
                                catch (Exception exception) {
                                    // empty catch block
                                }
                                this.txBuffer.clear();
                                this.txBuffer.flip();
                                continue;
                            }
                            this.msgInfo = MessageInfo.createOutgoing(this.peerSocketAddress, seqControl);
                            this.msgInfo.payloadProtocolID(payloadData.getPayloadProtocolId());
                            this.msgInfo.complete(payloadData.isComplete());
                            this.msgInfo.unordered(payloadData.isUnordered());
                        }
                        this.txBuffer.flip();
                        this.doSend();
                        if (!this.txBuffer.hasRemaining()) continue;
                        return;
                    }
                }
                if (this.txQueue.isEmpty()) {
                    key.interestOps(1);
                }
            }
            catch (IOException e) {
                ++this.ioErrors;
                logger.error(String.format("IOException while trying to write to underlying socket for Association=%s IOError count=%d", this.name, this.ioErrors), e);
                if (this.ioErrors <= this.management.getMaxIOErrors()) break block11;
                this.close();
                this.scheduleConnect();
            }
        }
    }

    private int doSend() throws IOException {
        if (this.ipChannelType == IpChannelType.SCTP) {
            return this.doSendSctp();
        }
        return this.doSendTcp();
    }

    private int doSendSctp() throws IOException {
        return this.socketChannelSctp.send(this.txBuffer, this.msgInfo);
    }

    private int doSendTcp() throws IOException {
        return this.socketChannelTcp.write(this.txBuffer);
    }

    protected void close() {
        if (this.getSocketChannel() != null) {
            try {
                this.getSocketChannel().close();
            }
            catch (Exception e) {
                logger.error(String.format("Exception while closing the SctpScoket for Association=%s", this.name), e);
            }
        }
        try {
            this.markAssociationDown();
            this.associationListener.onCommunicationShutdown(this);
        }
        catch (Exception e) {
            logger.error(String.format("Exception while calling onCommunicationShutdown on AssociationListener for Association=%s", this.name), e);
        }
        if (this.txQueue.size() > 0) {
            logger.warn(String.format("Clearig txQueue for Association=%s. %d messages still pending will be cleared", this.name, this.txQueue.size()));
        }
        this.txQueue.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void scheduleConnect() {
        if (this.getAssociationType() == AssociationType.CLIENT) {
            FastList<ChangeRequest> pendingChanges;
            FastList<ChangeRequest> fastList = pendingChanges = this.management.getPendingChanges();
            synchronized (fastList) {
                pendingChanges.add(new ChangeRequest(this, 3, System.currentTimeMillis() + (long)this.management.getConnectDelay()));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void initiateConnection() throws IOException {
        FastList<ChangeRequest> pendingChanges;
        if (!this.started) {
            return;
        }
        if (this.getSocketChannel() != null) {
            try {
                this.getSocketChannel().close();
            }
            catch (Exception e) {
                logger.error(String.format("Exception while trying to close existing sctp socket and initiate new socket for Association=%s", this.name), e);
            }
        }
        try {
            if (this.ipChannelType == IpChannelType.SCTP) {
                this.doInitiateConnectionSctp();
            } else {
                this.doInitiateConnectionTcp();
            }
        }
        catch (Exception e) {
            logger.error("Error while initiating a connection", e);
            this.scheduleConnect();
            return;
        }
        this.ioErrors = 0;
        FastList<ChangeRequest> fastList = pendingChanges = this.management.getPendingChanges();
        synchronized (fastList) {
            pendingChanges.add(new ChangeRequest(this.getSocketChannel(), this, 1, 8));
        }
        this.management.getSocketSelector().wakeup();
    }

    private void doInitiateConnectionSctp() throws IOException {
        this.socketChannelSctp = SctpChannel.open();
        this.socketChannelSctp.configureBlocking(false);
        this.socketChannelSctp.bind(new InetSocketAddress(this.hostAddress, this.hostPort));
        if (this.extraHostAddresses != null) {
            for (String s : this.extraHostAddresses) {
                this.socketChannelSctp.bindAddress(InetAddress.getByName(s));
            }
        }
        this.socketChannelSctp.connect(new InetSocketAddress(this.peerAddress, this.peerPort), 32, 32);
    }

    private void doInitiateConnectionTcp() throws IOException {
        this.socketChannelTcp = SocketChannel.open();
        this.socketChannelTcp.configureBlocking(false);
        this.socketChannelTcp.bind(new InetSocketAddress(this.hostAddress, this.hostPort));
        this.socketChannelTcp.connect(new InetSocketAddress(this.peerAddress, this.peerPort));
    }

    protected void createworkerThreadTable(int maximumBooundStream) {
        this.workerThreadTable = new int[maximumBooundStream];
        this.management.populateWorkerThread(this.workerThreadTable);
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("Association [name=").append(this.name).append(", associationType=").append((Object)this.type).append(", ipChannelType=").append((Object)this.ipChannelType).append(", hostAddress=").append(this.hostAddress).append(", hostPort=").append(this.hostPort).append(", peerAddress=").append(this.peerAddress).append(", peerPort=").append(this.peerPort).append(", serverName=").append(this.serverName);
        sb.append(", extraHostAddress=[");
        if (this.extraHostAddresses != null) {
            for (int i = 0; i < this.extraHostAddresses.length; ++i) {
                String extraHostAddress = this.extraHostAddresses[i];
                sb.append(extraHostAddress);
                sb.append(", ");
            }
        }
        sb.append("]]");
        return sb.toString();
    }

    static /* synthetic */ String[] access$802(AssociationImpl x0, String[] x1) {
        x0.extraHostAddresses = x1;
        return x1;
    }
}

