博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty新连接接入与NioSocketChannel分析
阅读量:6900 次
发布时间:2019-06-27

本文共 20208 字,大约阅读时间需要 67 分钟。

原文:

前面的一些章节,我们分析了Netty的三大组件 —— 、、 ,对Netty的工作原理有了深入的了解。在此基础上,我们来分析一下当Netty服务端启动后,Netty是如何处理新连接接入的。

本文内容主要分为以下四部分:

  • 新连接检测
  • NioSocketChannel创建
  • NioSocketChannel初始化与注册
  • NioSocketChannel注册READ兴趣集

新连接检测

前面,我们在讲 时,解读过下面这段代码:

public final class NioEventLoop extends SingleThreadEventLoop {        ...            private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {        ...            try {            ...            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {                // 读取read事件                unsafe.read();            }        } catch (CancelledKeyException ignored) {            unsafe.close(unsafe.voidPromise());        }                ...    }        ...    }

我们还是以服务端 NioServerSocketChannel 为例,它绑定的unsafe实例为 NioMessageUnsafe 。上面的 unsafe.read() 接口,会向下调用到 NioMessageUnsafe.read() 接口,如下:

public abstract class AbstractNioMessageChannel extends AbstractNioChannel {        ...           private final class NioMessageUnsafe extends AbstractNioUnsafe {                // 用于保存新建立的 NioSocketChannel 的集合        private final List readBuf = new ArrayList();                @Override        public void read() {            // 确保在当前线程与EventLoop中的一致            assert eventLoop().inEventLoop();            // 获取 NioServerSocketChannel config配置            final ChannelConfig config = config();            // 获取 NioServerSocketChannel 绑定的 pipeline            final ChannelPipeline pipeline = pipeline();            // 获取RecvByteBuf 分配器 Handle            // 当channel在接收数据时,allocHandle 会用于分配ByteBuf来保存数据            // 关于allocHandle后面再去做详细介绍            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();            // 重置已累积的所有计数器,并为下一个读取循环读取多少消息/字节数据提供建议            allocHandle.reset(config);                        boolean closed = false;            Throwable exception = null;            try {                try {                    do {                        // 调用后面的 doReadMessages 接口,读取到message则返回1                        int localRead = doReadMessages(readBuf);                        if (localRead == 0) {                            break;                        }                        if (localRead < 0) {                            closed = true;                            break;                        }                        // 对当前read循环所读取到的message数量计数+1                        allocHandle.incMessagesRead(localRead);                        // 判断是否继续读取message                    } while (allocHandle.continueReading());                } catch (Throwable t) {                    exception = t;                }                                int size = readBuf.size();                for (int i = 0; i < size; i ++) {                    readPending = false;                    // 调用pipeline传播ChannelRead事件                    pipeline.fireChannelRead(readBuf.get(i));                }                // 清空readBuf                readBuf.clear();                allocHandle.readComplete();                // 调用pipeline传播 ChannelReadComplete 事件                pipeline.fireChannelReadComplete();                if (exception != null) {                    closed = closeOnReadError(exception);                    pipeline.fireExceptionCaught(exception);                }                if (closed) {                    inputShutdown = true;                    if (isOpen()) {                        close(voidPromise());                    }                }            } finally {                // Check if there is a readPending which was not processed yet.                // This could be for two reasons:                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method                //                // See https://github.com/netty/netty/issues/2254                if (!readPending && !config.isAutoRead()) {                    removeReadOp();                }            }        }    }        ...    }

对于 doReadMessages(...) 的分析:

public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel {        ...        // 读取消息    @Override    protected int doReadMessages(List buf) throws Exception {        // 获取 SocketChannel         SocketChannel ch = SocketUtils.accept(javaChannel());                try {            if (ch != null) {                // 使用SocketChannel创建NioSocketChannel,将其存入buf list中                // 关于NioSocketChannel的创建请看后面的分析                buf.add(new NioSocketChannel(this, ch));                return 1;            }        } catch (Throwable t) {            logger.warn("Failed to create a new channel from an accepted socket.", t);            try {                ch.close();            } catch (Throwable t2) {                logger.warn("Failed to close a socket.", t2);            }        }        return 0;    }        ...    }

对于 continueReading() 接口的分析,至于结果为什么返回false,后面会单独分析:

public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessagesRecvByteBufAllocator {        private volatile int maxMessagesPerRead;    private volatile boolean respectMaybeMoreData = true;        ...        public abstract class MaxMessageHandle implements ExtendedHandle {        private ChannelConfig config;        // 每次读取最大的消息数        private int maxMessagePerRead;        private int totalMessages;        private int totalBytesRead;        private int attemptedBytesRead;        private int lastBytesRead;        private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData;        private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {            @Override            public boolean get() {                return attemptedBytesRead == lastBytesRead;            }        };                ...                // 判断是否继续读取message            @Override        public boolean continueReading() {            return continueReading(defaultMaybeMoreSupplier);        }                // 判断是否继续读取message        @Override        public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {            // 默认情况下 config.isAutoRead() 为true            // respectMaybeMoreData 默认为 true            // maybeMoreDataSupplier.get() 为false            // totalMessages第一次循环则为1            // maxMessagePerRead为16            // 结果返回false            return config.isAutoRead() &&                (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&                totalMessages < maxMessagePerRead &&                totalBytesRead > 0;        }                ...        }        ...        }

NioSocketChannel创建

上面分析新连接接入,提到了 NioSocketChannel 的创建,我们这里来详细分析一下,NioSocketChannel的创建过程与此前我们分析 大体类似。

构造器

先来看看 NioSocketChannel 的构造函数:

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {    ...        public NioSocketChannel(Channel parent, SocketChannel socket) {        // 调用父类构造器        super(parent, socket);        // 创建NioSocketChannelConfig        config = new NioSocketChannelConfig(this, socket.socket());    }        ...    }

父类 AbstractNioByteChannel 构造器:

public abstract class AbstractNioByteChannel extends AbstractNioChannel {    ...        protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {        // 调用父类构造器,并设置兴趣集为SelectionKey.OP_READ,对read事件感兴趣        super(parent, ch, SelectionKey.OP_READ);    }        ...    }

父类 AbstractNioChannel 构造器:

public abstract class AbstractNioChannel extends AbstractChannel {        ...        protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {        // 调用父类构造器        super(parent);        // 设置channel        this.ch = ch;        // 设置兴趣集        this.readInterestOp = readInterestOp;        try {            // 设置为非阻塞            ch.configureBlocking(false);        } catch (IOException e) {            ...        }    }}

父类 AbstractChannel 构造器:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {    ...        protected AbstractChannel(Channel parent) {        // 设置parent        this.parent = parent;        // 创建channelId        id = newId();        // 创建unsafe        unsafe = newUnsafe();        // 创建pipeline        pipeline = newChannelPipeline();    }        ...}

ChannelConfig创建

接着我们看看 NioSocketChannelConfig 的创建逻辑:

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {        ...           private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {        // 调用父类构造器        super(channel, javaSocket);        calculateMaxBytesPerGatheringWrite();    }        ...        }

父类 DefaultSocketChannelConfig 构造器:

public class DefaultSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig {      ...         public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {       // 调用父类构造器,绑定socketchannel        super(channel);        if (javaSocket == null) {            throw new NullPointerException("javaSocket");        }        // 绑定java socket        this.javaSocket = javaSocket;                // Enable TCP_NODELAY by default if possible.        // netty一般运行在服务器上,不在Android上,canEnableTcpNoDelayByDefault返回true        if (PlatformDependent.canEnableTcpNoDelayByDefault()) {            try {                // 开启 TCP_NODELAY ,开启TCP的nagle算法                // 尽量不要等待,只要发送缓冲区中有数据,并且发送窗口是打开的,就尽量把数据发送到网络上去。                setTcpNoDelay(true);            } catch (Exception e) {                // Ignore.            }        }    }                                           ...                                         }

NioSocketChannel初始化与注册

上面小节分析了NioSocketChannel的创建逻辑,创建完成之后,我们来分析一下NioSocketChannel是如何注册到NioEventLoop上去的。

在前面小节分析新连接检测的有如下小段代码:

private final class NioMessageUnsafe extends AbstractNioUnsafe {        ...    int size = readBuf.size();    for (int i = 0; i < size; i ++) {        readPending = false;        // 调用pipeline传播ChannelRead事件        pipeline.fireChannelRead(readBuf.get(i));    }    ...}

调用pipeline传播ChannelRead事件,这里的Pipeline是服务端Channel,也就是NioServerSocketChannel所绑定的Pipeline,此时的Pipeline的内部结构是怎么样子的呢?

Pipeline-ServerBootstrapAcceptor

那这个 ServerBootstrapAcceptor 是从哪里来的呢?

在此前,我们分析 时,有过下面这段代码:

public class ServerBootstrap extends AbstractBootstrap
{ ... // NioServerSocketChannel初始化 void init(Channel channel) throws Exception { // 获取启动器 启动时配置的option参数,主要是TCP的一些属性 final Map
, Object> options = options0(); // 将获得到 options 配置到 ChannelConfig 中去 synchronized (options) { setChannelOptions(channel, options, logger); } // 获取 ServerBootstrap 启动时配置的 attr 参数 final Map
, Object> attrs = attrs0(); // 配置 Channel attr,主要是设置用户自定义的一些参数 synchronized (attrs) { for (Entry
, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey
key = (AttributeKey) e.getKey(); channel.attr(key).set(e.getValue()); } } // 获取channel中的 pipeline,这个pipeline使我们前面在channel创建过程中设置的 pipeline ChannelPipeline p = channel.pipeline(); // 将启动器中配置的 childGroup 保存到局部变量 currentChildGroup final EventLoopGroup currentChildGroup = childGroup; // 将启动器中配置的 childHandler 保存到局部变量 currentChildHandler final ChannelHandler currentChildHandler = childHandler; final Entry
, Object>[] currentChildOptions; final Entry
, Object>[] currentChildAttrs; // 保存用户设置的 childOptions 到局部变量 currentChildOptions synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } // 保存用户设置的 childAttrs 到局部变量 currentChildAttrs synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer
() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); // 获取启动器上配置的handler ChannelHandler handler = config.handler(); if (handler != null) { // 添加 handler 到 pipeline 中 pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { // 用child相关的参数创建出一个新连接接入器ServerBootstrapAcceptor // 通过 ServerBootstrapAcceptor 可以将一个新连接绑定到一个线程上去 // 每次有新的连接进来 ServerBootstrapAcceptor 都会用child相关的属性对它们进行配置,并注册到ChaildGroup上去 pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); } ... }

ServerBootstrapAcceptor

NioServerSocketChannel初始化时,向NioServerSocketChannel所绑定的Pipeline添加了一个InboundHandler节点 —— ServerBootstrapAcceptor ,其代码如下:

public class ServerBootstrap extends AbstractBootstrap
{ ... private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { // 子EventLoopGroup,即为workGroup private final EventLoopGroup childGroup; // ServerBootstrap启动时配置的 childHandler private final ChannelHandler childHandler; // ServerBootstrap启动时配置的 childOptions private final Entry
, Object>[] childOptions; // ServerBootstrap启动时配置的 childAttrs private final Entry
, Object>[] childAttrs; private final Runnable enableAutoReadTask; // 构造函数 ServerBootstrapAcceptor( final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler, Entry
, Object>[] childOptions, Entry
, Object>[] childAttrs) { this.childGroup = childGroup; this.childHandler = childHandler; this.childOptions = childOptions; this.childAttrs = childAttrs; // Task which is scheduled to re-enable auto-read. // It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may // not be able to load the class because of the file limit it already reached. // // See https://github.com/netty/netty/issues/1328 enableAutoReadTask = new Runnable() { @Override public void run() { channel.config().setAutoRead(true); } }; } // 处理Pipeline所传播的channelRead事件 // 也就是前面新连接检测时看到的那段代码 // pipeline.fireChannelRead(readBuf.get(i)); // ServerBootstrapAcceptor的channelRead接口将会被调用,用于处理channelRead事件 @Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { // 获取传播事件的对象数据,即为前面的readBuf.get(i) // readBuf.get(i)取出的对象为 NioSocketChannel final Channel child = (Channel) msg; // 向 NioSocketChannel 添加childHandler,也就是我们常看到的 // ServerBootstrap在启动时配置的代码: // ServerBootstrap.childHandler(new ChannelInitializer
() {...} ) // 最终的结果就是向NioSocketChannel的Pipeline添加用户自定义的ChannelHandler // 用于处理客户端的channel连接 child.pipeline().addLast(childHandler); // 配置 NioSocketChannel的TCP属性 setChannelOptions(child, childOptions, logger); // 配置 NioSocketChannel 一些用户自定义数据 for (Entry
, Object> e: childAttrs) { child.attr((AttributeKey
) e.getKey()).set(e.getValue()); } // 将NioSocketChannel注册到childGroup,也就是Netty的WorkerGroup当中去 try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } } ... } ... }

关于 ChannelInitializer 的讲解,可以看此前 文章。

后面的register逻辑,就与我们前面讲解 大体类似了,这里简单介绍一下。

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {    ...        // 注册NioSocketChannel    // eventLoop为childGroup        @Override    public final void register(EventLoop eventLoop, final ChannelPromise promise) {                ...        // 绑定eventLoop到NioSocketChannel上        AbstractChannel.this.eventLoop = eventLoop;        // 现在分析的逻辑是在服务端的线程上,eventLoop与主线程不同,返回false        if (eventLoop.inEventLoop()) {            register0(promise);        } else {            try {                eventLoop.execute(new Runnable() {                    @Override                    public void run() {                        // 这里来调用register0方法                        register0(promise);                    }                });            } catch (Throwable t) {                logger.warn(                        "Force-closing a channel whose registration task was not accepted by an event loop: {}",                        AbstractChannel.this, t);                closeForcibly();                closeFuture.setClosed();                safeSetFailure(promise, t);            }        }    }        // 注册    private void register0(ChannelPromise promise) {        try {                        ...                            boolean firstRegistration = neverRegistered;            // 调用 doRegister()            doRegister();            neverRegistered = false;            registered = true;                        pipeline.invokeHandlerAddedIfNeeded();            safeSetSuccess(promise);            pipeline.fireChannelRegistered();                        // 服务端的NioServerSocketChannel已经与客户端的NioSocketChannel建立了连接            // 所以,NioSocketChannel是处于激活状态,isActive()返回ture            if (isActive()) {                // 对于新连接,是第一次注册                if (firstRegistration) {                    // 传播ChannelActive事件                    pipeline.fireChannelActive();                } else if (config().isAutoRead()) {                    beginRead();                }            }            ...                        } catch (Throwable t) {            // Close the channel directly to avoid FD leak.            closeForcibly();            closeFuture.setClosed();            safeSetFailure(promise, t);        }    }        ...            }

调用到NioSocketChannel中的doRegister()方法:

public abstract class AbstractNioChannel extends AbstractChannel {    ...            @Override    protected void doRegister() throws Exception {        boolean selected = false;        for (;;) {            try {                // 将selector注册到底层JDK channel上,并附加了NioSocketChannel对象                // 兴趣集设置为0,表示不关心任何事件                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);                return;            } catch (CancelledKeyException e) {                ...            }        }    }         ...    }

NioSocketChannel 注册OP_READ兴趣集

紧接着上面的分析,传播ChannelActive事件之后的逻辑,主要就是向客户端的NioSocketChannel注册一个Read兴趣集

if (isActive()) {    // 对于新连接,是第一次注册    if (firstRegistration) {        // 传播ChannelActive事件        pipeline.fireChannelActive();    } else if (config().isAutoRead()) {        beginRead();    }}

通过 ,最终会调用到doBeginRead()接口,如下:

public abstract class AbstractNioChannel extends AbstractChannel {        ...    protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {        ...                @Override        protected void doBeginRead() throws Exception {            // Channel.read() or ChannelHandlerContext.read() was called            // 保存selectionKey到局部变量            final SelectionKey selectionKey = this.selectionKey;            // 判断有效性            if (!selectionKey.isValid()) {                return;            }                    readPending = true;                        // 获取selectionKey的兴趣集            // 前面小结分析doRegister()接口提到,selectionKey的兴趣集设置为0            final int interestOps = selectionKey.interestOps();            // 这里的 readInterestOp 是前面讲NioSocketChannel创建时设置的值            // 为 SelectionKey.OP_READ,也就是1            if ((interestOps & readInterestOp) == 0) {                // 这样,selectionKey最终设置的兴趣集为SelectionKey.OP_READ                // 表示对读事件感兴趣                selectionKey.interestOps(interestOps | readInterestOp);            }        }                        ...                }            ...        }

小结

  • Netty是在哪里检测有新连接接入的?
  • 新连接是怎样注册到NioEventLoop线程上的?
  • NioSocketChannel是怎样初始化及注册的?

参考资料

转载地址:http://nuvdl.baihongyu.com/

你可能感兴趣的文章
Java程序员的日常——经验贴(纯干货)
查看>>
Spring配置文件头及xsd文件版本
查看>>
一个简单的Android富文本TextView实现
查看>>
iOS:个人浅谈工厂模式
查看>>
js-权威指南学习笔记14
查看>>
linux查看文件夹大小,备份文件夹zip压缩解压
查看>>
算法笔记_149:图论之桥的应用(Java)
查看>>
最小化托盘的实现方法
查看>>
2018第2周日
查看>>
Pusher 消息推送测试工具
查看>>
直方图与bin
查看>>
Bloom Filter算法
查看>>
UI控件——自定义按钮
查看>>
集成框架jar包的一些选择
查看>>
Codeforces Round #253 (Div. 2)
查看>>
IE 浏览器不能记录表单数据解决方案,纠结了我一个星期。
查看>>
jQuery EasyUI API 中文文档 - 菜单(Menu)
查看>>
未能获得数据库 'MODEL' 上的排它锁。请稍后再尝试该操作。CREATE DATABASE 失败。...
查看>>
浅谈WebService返回数据效率对比
查看>>
基于Web的代码编辑器ACE,发布1.0版本
查看>>