/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.stream.socket;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteOrder;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.util.nio.GridBufferedParser;
import org.apache.ignite.internal.util.nio.GridDelimitedParser;
import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
import org.apache.ignite.internal.util.nio.GridNioFilter;
import org.apache.ignite.internal.util.nio.GridNioParser;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.stream.StreamAdapter;
import org.apache.ignite.stream.socket.SocketMessageConverter;
import org.jetbrains.annotations.Nullable;

public class SocketStreamer<T, K, V>
extends StreamAdapter<T, K, V> {
    private static final int DFLT_THREADS = Runtime.getRuntime().availableProcessors();
    private IgniteLogger log;
    private InetAddress addr;
    private int port;
    private int threads = DFLT_THREADS;
    private boolean directMode;
    private byte[] delim;
    private SocketMessageConverter<T> converter;
    private GridNioServer<byte[]> srv;

    public void setAddr(InetAddress addr) {
        this.addr = addr;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public void setThreads(int threads) {
        this.threads = threads;
    }

    public void setDirectMode(boolean directMode) {
        this.directMode = directMode;
    }

    public void setDelimiter(byte[] delim) {
        this.delim = delim;
    }

    public void setConverter(SocketMessageConverter<T> converter) {
        this.converter = converter;
    }

    public void start() {
        GridNioParser parser;
        A.ensure(this.getSingleTupleExtractor() != null || this.getMultipleTupleExtractor() != null, "tupleExtractor (single or multiple)");
        A.notNull(this.getStreamer(), "streamer");
        A.notNull(this.getIgnite(), "ignite");
        A.ensure(this.threads > 0, "threads > 0");
        this.log = this.getIgnite().log();
        GridNioServerListenerAdapter<byte[]> lsnr = new GridNioServerListenerAdapter<byte[]>(){

            @Override
            public void onConnected(GridNioSession ses) {
                assert (ses.accepted());
                if (SocketStreamer.this.log.isDebugEnabled()) {
                    SocketStreamer.this.log.debug("Accepted connection: " + ses.remoteAddress());
                }
            }

            @Override
            public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
                if (e != null) {
                    SocketStreamer.this.log.error("Connection failed with exception", e);
                }
            }

            @Override
            public void onMessage(GridNioSession ses, byte[] msg) {
                SocketStreamer.this.addMessage(SocketStreamer.this.converter.convert(msg));
            }
        };
        ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
        GridNioParser gridNioParser = parser = F.isEmpty(this.delim) ? new GridBufferedParser(this.directMode, byteOrder) : new GridDelimitedParser(this.delim, this.directMode);
        if (this.converter == null) {
            this.converter = new DefaultConverter(this.getIgnite().name());
        }
        GridNioCodecFilter codec = new GridNioCodecFilter(parser, this.log, this.directMode);
        GridNioFilter[] filters = new GridNioFilter[]{codec};
        try {
            this.srv = new GridNioServer.Builder().address(this.addr == null ? InetAddress.getLocalHost() : this.addr).serverName("sock-streamer").port(this.port).listener(lsnr).logger(this.log).selectorCount(this.threads).byteOrder(byteOrder).filters(filters).build();
        }
        catch (UnknownHostException | IgniteCheckedException e) {
            throw new IgniteException(e);
        }
        this.srv.start();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Socket streaming server started on " + this.addr + ':' + this.port);
        }
    }

    public void stop() {
        if (this.srv != null) {
            this.srv.stop();
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Socket streaming server stopped");
        }
    }

    private class DefaultConverter<T>
    implements SocketMessageConverter<T> {
        private final Marshaller marsh;

        private DefaultConverter(String igniteInstanceName) {
            this.marsh = new JdkMarshaller(((IgniteKernal)SocketStreamer.this.ignite).context().marshallerContext().classNameFilter());
            MarshallerUtils.setNodeName(this.marsh, igniteInstanceName);
        }

        @Override
        public T convert(byte[] msg) {
            try {
                return U.unmarshal(this.marsh, msg, null);
            }
            catch (IgniteCheckedException e) {
                throw new IgniteException(e);
            }
        }
    }
}

