/*
 * Decompiled with CFR 0.152.
 */
package org.apache.avro.ipc.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.avro.ipc.Responder;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.netty.NettyTransceiver;
import org.apache.avro.ipc.netty.NettyTransportCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyServer
implements Server {
    private static final Logger LOG = LoggerFactory.getLogger((String)NettyServer.class.getName());
    private final Responder responder;
    private final Channel serverChannel;
    private final EventLoopGroup bossGroup;
    private final EventLoopGroup workerGroup;
    private final EventLoopGroup callerGroup;
    private final CountDownLatch closed = new CountDownLatch(1);
    private final AtomicInteger activeCount = new AtomicInteger(0);

    public NettyServer(Responder responder, InetSocketAddress addr) throws InterruptedException {
        this(responder, addr, null);
    }

    public NettyServer(Responder responder, InetSocketAddress addr, Consumer<SocketChannel> initializer) throws InterruptedException {
        this(responder, addr, initializer, null, null, null, null);
    }

    public NettyServer(Responder responder, InetSocketAddress addr, Consumer<SocketChannel> initializer, Consumer<ServerBootstrap> bootStrapInitialzier) throws InterruptedException {
        this(responder, addr, initializer, bootStrapInitialzier, null, null, null);
    }

    public NettyServer(Responder responder, InetSocketAddress addr, final Consumer<SocketChannel> initializer, Consumer<ServerBootstrap> bootStrapInitialzier, EventLoopGroup bossGroup, EventLoopGroup workerGroup, EventLoopGroup callerGroup) throws InterruptedException {
        this.bossGroup = bossGroup == null ? new NioEventLoopGroup(1) : bossGroup;
        this.workerGroup = workerGroup == null ? new NioEventLoopGroup(10) : workerGroup;
        this.callerGroup = callerGroup == null ? new DefaultEventLoopGroup(16) : callerGroup;
        this.responder = responder;
        ServerBootstrap bootstrap = ((ServerBootstrap)((ServerBootstrap)new ServerBootstrap().group(this.bossGroup, this.workerGroup).channel(NioServerSocketChannel.class)).childHandler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            public void initChannel(SocketChannel ch) throws Exception {
                if (initializer != null) {
                    initializer.accept(ch);
                }
                ch.pipeline().addLast("frameDecoder", (ChannelHandler)new NettyTransportCodec.NettyFrameDecoder()).addLast("frameEncoder", (ChannelHandler)new NettyTransportCodec.NettyFrameEncoder()).addLast("handler", (ChannelHandler)new NettyServerAvroHandler());
            }
        }).option(ChannelOption.SO_BACKLOG, (Object)1024)).childOption(ChannelOption.TCP_NODELAY, (Object)true).childOption(ChannelOption.SO_KEEPALIVE, (Object)true);
        if (bootStrapInitialzier != null) {
            bootStrapInitialzier.accept(bootstrap);
        }
        this.serverChannel = bootstrap.bind((SocketAddress)addr).sync().channel();
    }

    public void start() {
    }

    public void close() {
        this.workerGroup.shutdownGracefully().syncUninterruptibly();
        this.bossGroup.shutdownGracefully().syncUninterruptibly();
        try {
            this.serverChannel.closeFuture().sync();
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.closed.countDown();
    }

    public int getPort() {
        return ((InetSocketAddress)this.serverChannel.localAddress()).getPort();
    }

    public void join() throws InterruptedException {
        this.closed.await();
    }

    public int getNumActiveConnections() {
        return this.activeCount.get();
    }

    class NettyServerAvroHandler
    extends SimpleChannelInboundHandler<NettyTransportCodec.NettyDataPack> {
        private NettyTransceiver connectionMetadata = new NettyTransceiver();

        NettyServerAvroHandler() {
        }

        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            NettyServer.this.activeCount.incrementAndGet();
            super.channelActive(ctx);
        }

        protected void channelRead0(final ChannelHandlerContext ctx, final NettyTransportCodec.NettyDataPack dataPack) throws Exception {
            NettyServer.this.callerGroup.submit(new Runnable(){

                @Override
                public void run() {
                    List<ByteBuffer> req = dataPack.getDatas();
                    try {
                        List res = NettyServer.this.responder.respond(req, (Transceiver)NettyServerAvroHandler.this.connectionMetadata);
                        if (res != null) {
                            dataPack.setDatas(res);
                            ctx.channel().writeAndFlush((Object)dataPack);
                        }
                    }
                    catch (IOException e) {
                        LOG.warn("unexpected error");
                    }
                }
            });
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
            LOG.warn("Unexpected exception from downstream.", e);
            ctx.close().syncUninterruptibly();
        }

        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            LOG.info("Connection to {} disconnected.", (Object)ctx.channel().remoteAddress());
            NettyServer.this.activeCount.decrementAndGet();
            super.channelInactive(ctx);
        }
    }
}

