Netty
服务端启动完成,这时候客户端连接就可以接入进来了,下面我们就来分析下客户端连接接入的流程。
之前分析过NioEventLoop
线程启动方法是startThread()
,由于这个方法里面的逻辑比较复杂,并没有展开,这一节就是从这个方法开始分析。
(资料图片)
private void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { try { doStartThread(); } catch (Throwable cause) { STATE_UPDATER.set(this, ST_NOT_STARTED); PlatformDependent.throwException(cause); } } }}
这个方法主要主要完成2件事:
利用cas
将NioEventLoop
的状态由ST_NOT_STARTED
修改成ST_STARTED
,即表示NioEventLoop
线程启动;执行doStartThread()
方法;doStartThread()
方法看着比较复杂,核心逻辑如下,向线程池执行器executor
提交一个任务,而这个线程池执行器类型是ThreadPerTaskExecutor
,即每次执行任务都会创建一个新线程,而且这个任务是无限循环的:事件轮询selector.select()
、事件处理processSelectedKeys()
和任务队列处理runAllTasks()
,这样NioEventLoop
就和具体的Thread
线程进行了关联:
private void doStartThread() { assert thread == null; //executor线程执行器,类型是:ThreadPerTaskExecutor,即每次执行任务都会创建一个新线程 executor.execute(new Runnable() { @Override public void run() { //将executor线程执行器创建的线程:FastThreadLocalThread保存到EventLoop的全局变量中,相当于thread和EventLoop的绑定 thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { //然后调用EventLoop中的run方法进行启动 SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } } });}
该方法大致完成2件事:
thread = Thread.currentThread();
:将executor
线程池分配的线程保存起来,这样就完成了NioEventLoop
和Thread
线程的关联;SingleThreadEventExecutor.this.run()
:具体实现在NioEventLoop.run()
方法,所以,startThread()
核心就是分配一个线程运行NioEventLoop.run()
方法。protected void run() { for (;;) { try { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE:// 默认实现下,不存在这个情况 continue; case SelectStrategy.BUSY_WAIT: case SelectStrategy.SELECT: //selector.select轮询io事件 select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } } catch (IOException e) { rebuildSelector0(); handleLoopException(e); continue; } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { // 处理 Channel 感兴趣的就绪 IO 事件 processSelectedKeys(); } finally { // 运行所有普通任务和定时任务,不限制时间 runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { // 处理IO事件 processSelectedKeys(); } finally { // 运行所有普通任务和定时任务,限制时间 final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } // EventLoop 优雅关闭 try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } }}
该方法主要完成三件事:
select(wakenUp.getAndSet(false))
:主要执行selector.select()
方法进行事件轮询processSelectedKeys()
:如果轮询到事件,会在这里进行处理runAllTasks()
:处理任务队列和定时任务队列中的任务下面我们就分别来分析下这三个方法。
private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0;//计数器置0 long currentTimeNanos = System.nanoTime(); /** * selectDeadLineNanos是select()方法运行的截止时间 * * currentTimeNanos:可以看成当前时间 * delayNanos(currentTimeNanos):获取间隔时间,这里分为两种情况: * 1、netty里面定时任务队列scheduledTaskQueue是按照延迟时间从小到大进行排序,如果定时任务队列中有任务, * 则只需要获取到第一个任务的启动时间 - 当前时间 = select()方法可以运行的时间间隔,即:select()方法要在第一个定时任务执行之前退出,这样才能去执行定时任务 * 2、如果定时任务队列没有任务,则delayNanos(currentTimeNanos)返回1秒对应的时间间隔 * */ long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { //计算超时时间 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; /** * timeoutMillis <= 0表示当前已经超时了,不能继续向下执行select()方法了,需要立即退出select方法,在退出前还有个判断:selectCnt == 0 * selectCnt == 0表示第一次进入循环,则执行下Selector.selectNow()检出准备好的网络IO事件,该方法不会阻塞, */ if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } /** * 如果没有超时,但是通过hasTasks()判断到taskQueue任务队列中有需要执行的任务,这时也需要退出select()方法 * 1、利用cas将wakeUp值由false变成true,wakeUp=true表示线程处于唤醒状态,可以执行任务,进入select()方法前会把wakeUp设置成false * 表示线程处于select()方法阻塞中,不能处理任务队列中的任务,这时只要处理Selector.select() * 2、退出前执行一次:selector.selectNow() */ if (hasTasks() && wakenUp.compareAndSet(false, true)) { //有任务,进行一次非阻塞式的select selector.selectNow(); selectCnt = 1; break; } //调用select方法,阻塞时间为上面算出的最近一个将要超时的定时任务时间 /** * 未超时,任务队列中也没有需要执行的任务,这时就可以放心的执行Selector.select()方法了,这里带上之前计算出的超时时间 * 如果之前计算时存在定时任务,则保证在第一个定时任务启动前唤醒即可,没有定时任务则默认超时1秒 */ int selectedKeys = selector.select(timeoutMillis); //轮询次数+1 selectCnt ++; /** * 发生如下几种情况,select()方法都需要退出: * 1、selectedKeys != 0:表示轮询到IO事件 * 2、oldWakenUp:这个是入参,值为false,是在select()方法中控制是否需要退出,默认是没有使用到的,没有意义 * 3、wakenUp.get():进入select()方法之前,wakeUp被设置成false,如果这里为true,表示已有外部线程对线程进行唤醒操作, * 一般就是addTask()添加新任务时会触发唤醒,然后及时去执行taskQueue中的任务 * 4、hasTasks() || hasScheduledTasks():判断任务队列和定时任务队列是否有任务需要执行 */ if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; } //线程中断响应:如果线程被中断,计数器置1,break退出for循环,则退出select()检测 if (Thread.interrupted()) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } selectCnt = 1; break; } /** * 正常情况下:time >= currentTimeNanos + TimeUnit.MILLISECONDS.toNanos(timeoutMillis) * 但是,jdk nio中存在一个bug,selector.select(timeoutMillis)在没有IO事件触发时并不会等待超时而是立即返回,造成空轮询 * * 下面就是Netty解决空轮询问题 * 1、if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) * 表示selector.select(timeoutMillis)经过超时后才被唤醒,属于正常情况,把selectCnt重置成1 * 2、如果不是,表示可能发生空轮询selectCnt不会被重置成1,for循环一次selectCnt就会被累加1次; * 3、等到 selectCnt > 门限值,默认是512,可以通过io.netty.selectorAutoRebuildThreshold参数设置, * 则判断真正发生了nio空循环bug,则重建Selector替换掉当前这个出问题的Selector */ long time = System.nanoTime(); //判断执行了一次阻塞式select后,当前时间和开始时间是否大于超时时间。(大于是很正常的,小于的话,说明没有执行发生了空轮询) if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { selector = selectRebuildSelector(selectCnt); selectCnt = 1; break; } currentTimeNanos = time; } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } }}
select()
方法代码看着很复杂,其核心思想理解再来分析就比较简单的。select()
主要是用来执行selector.select()
对IO
事件进行轮询,作为server
,这里就是轮询OP_ACCEPT
事件,看是否有客户端接入进来。但是NioEventLoop
是单线程处理模式,不可能让线程一直处理selector.select()
,还有轮询到的事件以及任务队列中任务等等都需要使用这个线程进行处理,所以,上面一大堆代码都是用来判断什么时候退出select()
方法的,总结下退出逻辑主要分为如下几种情况:
selector.select()
方法之前,计算出一个超时时间,超时时间默认是1秒
,如果定时任务队列有任务,则取出第一个任务(按顺序存放),保证在该定时任务执行之前退出select()
方法即可;如果超时就退出,退出前判断是否是第一次进入for
循环,如果是在退出之前调用一次无阻塞的selector.selectNow()
轮询下判断任务队列taskQueue
中是否有任务,如果有则将wakenUp
利用cas
设置成true
,执行下无阻塞的selector.selectNow()
轮询后退出select()
方法如果上面情况都不存在,开始执行阻塞selector.select(timeoutMillis)
轮询,并将之前计算的超时时间带上;selector.select(timeoutMillis)
执行完成后,继续判断是否需要退出select()
方法,发生如下任一情况则要退出:轮询到IO
事件,则需要退出select()
方法去处理事件外部线程对对线程执行过唤醒操作,比如addTask()
等操作需要唤醒线程执行队列任务,才能及时去执行taskQueue
中的任务:进入select()
方法之前,wakeUp
被设置成false
,如果这里为true
,表示已有外部线程对线程进行唤醒操作任务队列taskQueue
或定时任务队列scheduledTaskQueue
中有需要处理的任务,这时需要退出select()
方法,转去执行任务private void processSelectedKeys() { //selectedKeys != null表示已对Selector进行优化过,替换掉Selector内部的selectedKeys,正常情况下进入这个流程 if (selectedKeys != null) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
processSelectedKeys()
主要是对selector.select()
方法轮询到的事件进行处理,作为server
,如果轮询到OP_ACCEPT
,就表示有客户端接入进来了,那我们就跟踪下这个方法,看接入进来的客户端处理流程。
Netty
是对Selector
进行了优化,将selectedKeys
由Set
实现替换成了数组实现,提升性能,所以,这里一般走的是processSelectedKeysOptimized()
这个流程:
private void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; selectedKeys.keys[i] = null; //k.attachment()获取到的就是NioServerSocketChannel final Object a = k.attachment(); if (a instanceof AbstractNioChannel) {//一般是走这个分支流程 processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask task = (NioTask) a; processSelectedKey(k, task); } if (needsToSelectAgain) { selectedKeys.reset(i + 1); selectAgain(); i = -1; } }}
这里关键一点是Object a = k.attachment();
,之前分析过向selector
注册时把NioServerSocketChannel
作为attachment
添加进去,所以,这里取出来的就是NioServerSocketChannel
对象。processSelectedKey()
方法通过if判断事件类型进行处理,server
端这里肯定是OP_ACCEPT
:
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read();}
具体的处理逻辑交由Unsafe
对象进行处理:
public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //doReadMessages()读出来一个客户端连接的Channel int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } }}
这个类主要完成2件事:
doReadMessages(readBuf)
:调用serverSocketChannel.accept()
接收到客户端连接socketChannel
,并封装成Netty中类型:NioSocketChannel
,然后放入到readBuf集合中;pipeline.fireChannelRead(readBuf.get(i));
:将读入的客户端连接作为参数,即NioSocketChannel
对象,通过pipeline触发channelRead
事件进行handler
间传播,注意这里的pipeline
是NioServerSocketChannel
中的,即server
端的。最终会进入到ServerBootstrapAcceptor#channelRead
方法中进行处理。public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry, Object> e: childAttrs) { child.attr((AttributeKey
这个方法主要完成3件事:
child.pipeline().addLast(childHandler)
:向NioSocketChannel
中添加ServerBootstrap.childHandler(new TestServerInitializer())
,后面通过触发handlerAdded()
时回调initChannel()
实现向pipeline添加handler
;设置option
和attr
信息;childGroup.register(child)
:将客户端连接NioSocketChannel
注册到NioEventLoop
实例上,基本和之前分析NioServerSocketChannel
注册逻辑一致,这个过程中会触发三个事件:handlerAdded
、channelRegistered
和channelActive
,之前NioServerSocketChannel
注册时只能触发前两个,绑定端口后才能触发第三个事件,客户端连接不存在端口绑定问题,所以这里会直接触发channelActive
。和NioServerSocketChannel一样,真正向selector
注册感兴趣事件就是在channelActive
触发这里:public void channelActive(ChannelHandlerContext ctx) { //触发channelActive事件传播 ctx.fireChannelActive(); //向selector注册真正关注的事件 readIfIsAutoRead();}
channelActive
和之前分析NioServerSocketChannel
的处理逻辑一致,就不再分析。
分析到这里,基本搞清楚了客户端接入的处理流程,现在再次总结下:
NioServerSocketChannel
绑定的NioEventLoop
不停轮询OP_ACCEPT
,触发后通过调用java api
获取到ServerSocket
,然后包装成NioSocketChannel
;然后触发channelRead
事件传播,然后会进入server pipeline
中非常重要的一个handler
:ServerBootstrapAcceptor
,连接处理器专门处理客户端连接;在ServerBootstrapAcceptor#channelRead()
方法中,完成NioSocketChannel
的设置:option
、attr
、handler
添加等;最重要的是将channel
注册到NioEventLoop
上,注册过程中会触发三种事件:handlerAdded
、channelRegistered
和channelActive
,和之前分析server channel
注册过程一样,最终在channelActive
这里向selector
注册真正感兴趣IO事件
,整个流程全部完成。 关键词:
上一篇 : 成语塞翁失马焉知非福是什么意思_塞翁失马焉知非福是什么意思-世界滚动
下一篇 : ap指_ap指标_全球最资讯
最新推荐
1、安民巷古民居位于福州市安民巷。文章到此就分享结束,希望对大家有
对于selfiecity是什么软件这个问题感兴趣的朋友应该很多,这个也是目前
夫妻离婚一方出轨财产的分割可以由双方协商处理,协商的时候还可以在自
谢帆是WE老板的好友,他也是第一个爆料EDG签了Uzi的人,并且也爆料过WE
在比赛中,李诗沣表现出了极强的竞技实力和严谨的比赛态度,让人叹为观
2023年法网首轮,德约科维奇三盘直落战胜了首次出战大满贯正赛的美国选
从奥运会期间接济被媳妇马蓉戴了绿帽子的王宝强开始,郭斌就被外界认为
CSGO最划算交易网站是哪个CSGO饰品交易网站最划算大全还是值得大家去体验一把,如果你担心亏钱,可以先...
手机这种移动通讯设备闲置已经相当的普及了,大多数的手机用户,对手机
产假工资如何支付正常进行支付。会由生育保险账户进行支出。法律依据:《社会保险法》第五十四条 用人...
夫妻没有结婚证财产怎么分没有领取结婚证的夫妻不是合法的夫妻,不是法律上的夫妻,自然也不用走法律上...
有抚养权能带孩子在身边吗可以的,抚养权就是指直接抚养照顾的权利。法律依据:《婚姻法》第三十六条 ...
自愿离婚需要带什么资料离婚需要出具双方的户口簿、身份证、结婚证、双方当事人共同签署的离婚协议书等...
抚养费该谁负担离婚后,一方抚养的子女,另一方应负担必要的生活费和教育费的一部或全部,就是说,抚养...
女方有理由不出抚养费吗没有。因为根据婚姻法,一方抚养的子女,另一方应负担必要的生活费和教育费的一...
离婚案件房产的分割分哪两种可以选择房产或货币补偿。法律依据:《物权法》第一百条 共有人可以协商确...
夫妻共同财产的特别规定是什么法律分析:夫妻对共同财产,有平等的处理权。离婚时,夫妻的共同财产由双...
1、逆水寒追命支线神秘的箱子任务攻略介绍:破庙场景,正中间马车中三
沪深两市成交额突破8000亿元
赛前,国足球员的号码确定,武磊将继续身穿7号球衣,艾克森9号,10号归
直播吧6月15日讯RAC1记者RogerSaperas报道了关于巴萨引进基米希和维克
凭借这一冠,她不但成为2007年的海宁以来首位在罗兰·加洛斯实现卫冕的
北京时间6月15日消息,2023年U19男篮世界杯将于6月24日-7月2日在匈牙利
文|朱志强、宋照康、孙泽志图|翟一龙、周杰、冉思远、李进平大漠砺精兵
1、无人直升机的构造和旋翼飞行器差不多,其工作原理和旋翼式直升机也
显然,“梅罗时代”的诞生离不开当年西甲的繁荣,皇马与巴萨两大豪门的
6月15日消息,利物浦3500万英镑搞定麦卡利斯特,这给今夏转会打了一个
并肩作战是热爱的【无限】可能,在桀然闪耀的星途之行中,操控【无线】
搜狐体育消息,北京时间6月15日,据媒体人丁旭消息,国足23名球员的球
为传承弘扬“奉献、友爱、互助”的志愿精神,全力为考生以及陪考的家长
6月12日,五月天演唱会官方售票平台纷玩岛APP发布公告称,纷玩岛APP工
大连航运职业技术学院船舶电子电气技术(专科类)专业介绍各位同学大家好
正义网清远6月14日电(通讯员唐艳)为检验近期开展的大数据应用工具实操
正义网清远6月14日电(通讯员高海兰)近年来,广东省英德市检察院在开展未
6月5日有6只新债招标。
1、百度文库直接下载·······················...
(点击图片查看视频)日前,山西临汾尧都区的38万亩小麦进入夏收高峰期
处理违章扣分应当要到车辆辖区车管所或交警队处理,具体需要携带的资料
汽车违章罚款扣分网上交罚款的的方式是用车主身份证在交管12123官方应
来为大家解答以上问题,拼好货商城app下载,拼好货商城很多人还不知道
6月14日是第20个世界献血者日,从天津市血液中心获悉,天津市已连续12
6月13日,市教育局发布《关于做好2023年普通中小学招生入学工作的
华夏鼎利债券6月14日每10份分红0 65元。本次分红为2023年度的第1次分红
为加快农作物病虫害绿色防控关键技术的推广应用,促进农药减量增效,保
除了放弃姆巴佩之外,皇家马德里还拒绝了来自切尔西的“敲竹杠”,老佛
直播吧6月15日讯西班牙媒体Relevo的记者MatteoMoretto报道,AC米兰和一
作为国内知名的膏药代加工企业之一,仙佑集团已经拥有了多年的历史,一直致力于膏药代加工、膏药加工、...
1、ct是宝石重量单位,克拉。2、1CT=200mg,多用于称量宝石,克拉(Ct)
很多家庭为了安全起见,会安装大玻璃,不过对于楼层较高的业主来说,最
苏铁植物起源于苏铁植物还保存得下来所以它被称为植物界的活化石最近,
全球看点:《广西“最美撑伞女孩”找到了》后续:她用奖金购买慰问品回报乡邻
每日精选:用法治力量推动湿地保护高质量发展 2023年广西环保世纪行宣传活动启动
重实效 强实干 抓落实丨广州市国资国企入桂开展产业合作 全球讯息
前5月广西外贸增速快于全国48.5个百分点 进出口2875.1亿元,同比增长53.2%|全球短讯
最新快讯!拒续约后,姆巴佩爆猛料,梅西平反,巴黎演砸,皇马卷入,太黑暗
当前视点!记者:AC米兰和一些西班牙球队均有意免签卢卡-罗梅罗
晚上8点,国际足联重磅官宣!24天后实行2大变革,K联赛先做榜样 热议
中新网丨张家川:派出所打造矛盾纠纷解决新机制专解群众烦心事 每日消息
百度地图App中怎么查看卫星图和热力图 百度地图怎么看卫星地图实景 快看
离休和退休的区别有哪些?离退休人员死亡一次性抚恤金发放办法看这里
世界讯息:拒离队!巴萨乱了!亏5600万,哈维沉默,下最后通牒,拿他没办法
聚焦:温网奖金再创新高,德约科维奇若冠军将拿2000万,冲击第24冠!
2023年西安市中考政策发布 2所初中学业水平考试被新纳入城六区管理-焦点精选
重点聚焦!稳经济 促发展 强信心|简化行政审批 提升服务效率
当前要闻:测血糖用的稀有GBA卡带,被收录进了游戏历史基金会
罗体:罗马近日将官宣恩迪卡 米兰巴黎试图截胡但穆帅说服了球员 环球最资讯
今头条!意甲媒体透露:楚克乌泽已同意加盟米兰,双方就合同问题正在沟通
天天实时:马琳下课!孔令辉回归?李隼发言透露态度,国乒球迷争议不断
美联储暂停加息 美股收盘涨跌不一 科技股多数上涨英伟达大涨超4%|今日观点
最新战报!印尼公开赛第二比赛日,中国队7胜3负,陈雨菲继续抗日
联系我们:55 16 53 8@qq.com
关于我们| 联系方式| 版权声明| 供稿服务| 友情链接
塞北网 版权所有,未经书面授权禁止使用
Copyright©2008-2020 By www.saibeinews.com All Rights Reserved