netty框架是如何执行connect连接的

文章链接:http://www.liuschen.com

连接

以客户端为例,netty是通过以下代码发起服务请求的,其中b是Bootstrap的实例对象

future = b.connect(new InetSocketAddress(host, port)).sync();

而上面的connect方法是Bootstrap对象中的

public ChannelFuture connect(SocketAddress remoteAddress) {
    if(remoteAddress == null) {
        throw new NullPointerException("remoteAddress");
    } else {
        this.validate();
        return this.doConnect(remoteAddress, this.localAddress());
    }
}

以上的doConnect对象是在同一个类中的方法,其中的ChannelFuture和PromiseFuture分别存储的是注册和连接的结果,和外部的对象有所区别,因为PromiseFuture是继承ChannelFuture的接口,其中多了一些设置的方法,这个方法是先初始化并且注册,然后如果成功的话,就开始进行连接操作,先做连 接代码的追踪

private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
    final ChannelFuture regFuture = this.initAndRegister();
    final Channel channel = regFuture.channel();
    if(regFuture.cause() != null) {
        return regFuture;
    } else {
        final ChannelPromise promise = channel.newPromise();
        if(regFuture.isDone()) {
            doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
        } else {
            regFuture.addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture future) throws Exception {
                    Bootstrap.doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
                }
            });
        }

        return promise;
    }
}

以上的doConnect依然在同一个类中

private static void doConnect0(final ChannelFuture regFuture, final Channel channel, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    channel.eventLoop().execute(new OneTimeTask() {
        public void run() {
            if(regFuture.isSuccess()) {
                if(localAddress == null) {
                    channel.connect(remoteAddress, promise);
                } else {
                    channel.connect(remoteAddress, localAddress, promise);
                }

                promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }

        }
    });
}

然后上面的connect方法是在channel接口中定义的方法,channel类继承关系如图

其中NioSocketChannel的继承关系:NioSocketChannel extends AbstractNioByteChannel implements SocketChannel 而AbstractNioByteChannel又继承于AbstractNioChannel,在AbstractNioChannel中在找到了connect的实现方法

public final void connect(final SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        if(promise.setUncancellable() && this.ensureOpen(promise)) {
            try {
                if(AbstractNioChannel.this.connectPromise != null) {
                    throw new IllegalStateException("connection attempt already made");
                }

                boolean t = AbstractNioChannel.this.isActive();
                if(AbstractNioChannel.this.doConnect(remoteAddress, localAddress)) {
                    this.fulfillConnectPromise(promise, t);
                } else {
                    AbstractNioChannel.this.connectPromise = promise;
                    AbstractNioChannel.this.requestedRemoteAddress = remoteAddress;
                    int connectTimeoutMillis = AbstractNioChannel.this.config().getConnectTimeoutMillis();
                    if(connectTimeoutMillis > 0) {
                        AbstractNioChannel.this.connectTimeoutFuture = AbstractNioChannel.this.eventLoop().schedule(new OneTimeTask() {
                            public void run() {
                                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress);
                                if(connectPromise != null && connectPromise.tryFailure(cause)) {
                                    AbstractNioUnsafe.this.close(AbstractNioUnsafe.this.voidPromise());
                                }

                            }
                        }, (long)connectTimeoutMillis, TimeUnit.MILLISECONDS);
                    }

                    promise.addListener(new ChannelFutureListener() {
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if(future.isCancelled()) {
                                if(AbstractNioChannel.this.connectTimeoutFuture != null) {
                                    AbstractNioChannel.this.connectTimeoutFuture.cancel(false);
                                }

                                AbstractNioChannel.this.connectPromise = null;
                                AbstractNioUnsafe.this.close(AbstractNioUnsafe.this.voidPromise());
                            }

                        }
                    });
                }
            } catch (Throwable var6) {
                promise.tryFailure(this.annotateConnectException(var6, remoteAddress));
                this.closeIfClosed();
            }

        }
    }

以上的都Connect是由具体功能的实现类NioSocketChannel中实现具体方法的定义

protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    if(localAddress != null) {
        this.javaChannel().socket().bind(localAddress);
    }

    boolean success = false;

    boolean var5;
    try {
        boolean connected = this.javaChannel().connect(remoteAddress);
        if(!connected) {
            this.selectionKey().interestOps(8);
        }

        success = true;
        var5 = connected;
    } finally {
        if(!success) {
            this.doClose();
        }

    }

    return var5;
}

最终的承担者依旧是调用的Java中的nio来进行连接的this.javaChannel().connect(remoteAddress)

注册

然后继续之前的注册操作的代码分析,先是跳转到AbstractBootstrap中,在这里面实际是同过ChannelFuture regFuture = this.group().register(channel)来生成一个该方法返回的ChannelFuture

final ChannelFuture initAndRegister() {
    Channel channel = this.channelFactory().newChannel();

    try {
        this.init(channel);
    } catch (Throwable var3) {
        channel.unsafe().closeForcibly();
        return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
    }

    ChannelFuture regFuture = this.group().register(channel);
    if(regFuture.cause() != null) {
        if(channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    return regFuture;
}

而这个是接口EventLoopGroup中定义的方法,故名思义,EventLoopGroup就是一个集,或是池,其中继承关系如下所示:

  • EventLoopGroup
    • MultithreadEventLoopGroup
      • LocalEventLoopGroup
      • EpollEventLoopGroup
      • NioEventLoopGroup
    • ThreadPerChannelEventLoopGroup
      • OioEventLoopGroup
    • EventLoop
      • SingleThreadEventLoop
        • EpollEventLoop
        • ThreadPerChannelEventLoop
        • LocalEventLoop
        • NioEventLoop
      • EmbeddedEventLoop

并且其中的group是我们在外部通过实例化private EventLoopGroup group = new NioEventLoopGroup();创建的,

在NioEventLoopGroup的父类MultithreadEventLoopGroup中找到了对应的实现方法,next返回的是一个单独的执行单元EventLoop;

public ChannelFuture register(Channel channel) {
    return this.next().register(channel);
}

public EventLoop next() {
    return (EventLoop)super.next();
}

由上面的继承关系可以看出,要找到 this.next().register(channel)的具体实现需要在SingleThreadEventLoop和NioEventLoop中寻找,在SingleThreadEventLoop中找到了:

public ChannelFuture register(Channel channel) {
    return this.register(channel, new DefaultChannelPromise(channel, this));
}

public ChannelFuture register(Channel channel, ChannelPromise promise) {
    if(channel == null) {
        throw new NullPointerException("channel");
    } else if(promise == null) {
        throw new NullPointerException("promise");
    } else {
        channel.unsafe().register(this, promise);
        return promise;
    }
}

TO BE CONTINUE

Loading Disqus comments...
Table of Contents