03反向压力:如何避免异步系统中的OOM问题?
文章目录
在第 02 课时,我们使用了 Netty 并配合 Java 8 中的 CompletableFuture 类,构建了一个完全异步执行的数据采集服务器。经过这种改造,CPU 和 IO 的使用效率被充分发挥出来,显著提高了服务器在高并发场景下的性能。
但是,关于异步的问题我们还并没有彻底解决。上面的改造还存在一个致命的缺陷,也就是今天我们要讨论的,在异步系统中流量控制和反向压力的问题。
异步系统中的 OOM 问题
回想下 02 课时中,基于 Netty 和 CompletableFuture 类的数据采集服务器,关键是下面这部分代码(请参见完整代码):
|
|
从上面的代码可以看出,我们在进行请求处理时,采用了 CompletableFuture 类提供的异步执行框架。在整个执行过程中,请求的处理逻辑都是提交给每个步骤各自的执行器,来进行处理,比如 decoderExecutor、ectExecutor 和 senderExecutor。
仔细分析下这些执行器你就会发现,在上面异步执行的过程中,没有任何阻塞的地方。只不过每个步骤都将它要处理的任务,存放在了执行器的任务队列中。每个执行器,如果它处理得足够快,那么任务队列里的任务都会被及时处理。这种情况下不存在什么问题。
但是,一旦有某个步骤处理的速度比较慢,比如在图 1 中,process 的速度比不上 decode 的速度,那么,消息就会在 process 的输入队列中积压。而由于执行器的任务队列,默认是非阻塞且不限容量的。这样,任务队列里积压的任务,就会越来越多。终有一刻,JVM 的内存会被耗尽,然后抛出 OOM 异常,程序就退出了。
所以,为了避免 OOM 的问题,我们必须对上游输出给下游的速度做流量控制。那怎么进行流量控制呢?
一种方式,是严格控制上游的发送速度。比如,控制上游每秒钟只能发送 1000 条消息。这种方法是可行的,但是非常低效。如果实际下游每秒钟能够处理 2000 条消息,那么,上游每秒钟发送 1000 条消息,就会使得下游一半的性能没有发挥出来。如果下游因为某种原因,性能降级为每秒钟只能处理 500 条消息,那么在一段时间后,同样会发生 OOM 问题。
所以,我们该如何进行流量控制呢?这里有一种更优雅的方法,也就是反向压力。
反向压力原理
在反向压力的方案中,上游能够根据下游的处理能力,动态地调整输出速度。当下游处理不过来时,上游就减慢发送速度,当下游处理能力提高时,上游就加快发送速度。
反向压力的思想,已经成为流计算领域的共识,并且形成了反向压力相关的标准,也就是Reactive Streams。
上面的图 2 描述了 Reactive Streams 的工作原理。当下游的消息订阅者,从上游的消息发布者接收消息前,会先通知消息发布者自己能够接收多少消息。然后消息发布者就按照这个数量,向下游的消息订阅者发送消息。这样,整个消息传递的过程都是量力而行的,就不存在上下游之间因为处理速度不匹配,而造成的 OOM 问题了。
目前,一些主流的异步框架都开始支持 Reactive Streams 标准,比如 RxJava、Reactor、Akka Streams、Vert.x 等。这足以说明, OOM 和反向压力问题在异步系统中是多么重要!
实现反向压力
现在,我们回到 Netty 数据采集服务器。那究竟该怎样为这个服务器加上反向压力的功能呢?
前面我们分析了异步执行的过程,之所以会出现 OOM 问题,主要还是因为,接收线程在接收到新的请求后,触发了一系列任务。这些任务都会被存放在任务队列中,并且这些任务队列,都是非阻塞且不限容量的。
因此,要实现反向压力的功能,只需要从两个方面来进行控制。
其一是,执行器的任务队列,它的容量必须是有限的。
其二是,当执行器的任务队列已经满了时,就阻止上游继续提交新的任务,直到任务队列,重新有新的空间可用为止。
按照上面这种思路,我们就可以很容易地实现反向压力。下面的图 3 就展示了,使用容量有限的阻塞队列,实现反向压力的过程。
当 process 比 decode 慢时,运行一段时间后,位于 process 前的任务队列就会被填满。当 decode 继续往里面提交任务时,就会被阻塞,直到 process 从这个任务队列中取走任务为止。
以上说的都是实现原理。那具体用代码该怎样实现呢?下面就是这样一个具备反向压力能力的 ExecutorService 的具体实现。
|
|
在上面的代码中,BackPressureExecutor 类在初始化时,新建一个或多个 ThreadPoolExecutor 对象,作为执行任务的线程池。这里面的关键点有两个。
第一个是,在创建 ThreadPoolExecutor 对象时,采用 ArrayBlockingQueue。这是一个容量有限的阻塞队列。因此,当任务队列已经满了时,就会停止继续往队列里添加新的任务,从而避免内存无限大,造成 OOM 问题。
第二个是,将 ThreadPoolExecutor 拒绝任务时,采用的策略设置为 AbortPolicy。这就意味着,在任务队列已经满了的时候,如果再向任务队列提交任务,就会抛出 RejectedExecutionException 异常。之后,我们再通过一个 while 循环,在循环体内,捕获 RejectedExecutionException 异常,并不断尝试,重新提交任务,直到成功为止。
这样,经过上面的改造,当下游的步骤执行较慢时,它的任务队列就会占满。这个时候,如果上游继续往下游提交任务,它就会不停重试。这样,自然而然地降低了上游步骤的处理速度,从而起到了流量控制的作用。
接下来,我们就可以在数据接收服务器中,使用这个带有反向压力功能的 BackPressureExecutor 了(请参见完整代码)。
|
|
从上面的代码可以看出,我们只需把 decode、doExtractCleanTransform 和 send 等每一个步骤用到的执行器,都替换成 BackPressureExecutor 即可。这样,就实现了反向压力功能,其他部分的代码,不需要做任何改变!
最后,还需要说明下的是,在 BackPressureExecutor 的实现中,为什么需要封装多个执行器呢?这是因为,使用 M * N 个线程,有三种不同的方法:
第一种是,每个执行器使用 1 个线程,然后使用个 M * N 执行器;
第二种是,每个执行器使用 M * N 个线程,然后使用 1 个执行器;
第三种是,每个执行器使用 M 个线程,然后使用 N 个执行器。
在不同场景下,三种使用方式的性能表现会有所不同。根据我的经验,主要是因为,队列的生产者之间,存在着相互竞争,然后队列的消费者之间,也存在着相互竞争。所以,如果你要使用这个类的话,还是需要根据实际的使用场景,分配合适的队列数和线程数,避免对同一个队列的竞争,过于激烈。这样,有利于提升程序的性能。
小结
今天,我用反向压力的功能进行流量控制,解决了异步系统中的 OOM 问题。对于一个能够在生产环境上稳定运行的系统来说,任何使用了异步技术的地方,都需要尤其注意OOM 问题。
其实,解决异步系统 OOM 问题的方法,并不限于反向压力。比如,我们在使用线程池时,设置线程的数量,这也是一种保护措施。但是,我们今天着重强调的是反向压力的方法。这是因为,反向压力在流计算系统中,有着非常重要的地位。像目前的流计算框架,比如 Flink、Spark Streaming 等,都支持反向压力。可以说,如果没有反向压力的功能,任何一个流计算系统,都会时时刻刻有着 OOM 崩溃的风险。
在今天的讨论中,我们已经多次用到了上游、下游,甚至是 Reactive Streams 这种,直接与“流”相关的字眼。我们已经隐隐约约感受到,“流”与“异步”之间,有着千丝万缕的关系。在接下来的课程中,我们还会专门讨论到,它们之间的关联关系。
相信通过今天的课程,你在以后使用异步编程时,一定会注意到系统的 OOM 问题。你在以往的编程中,有没有遇到过 OOM 问题呢?有的话,可以在评论区留言,我看到后会和你一起分析解决!
本课时精华:
点击此链接查看本课程所有课时的源码
PB 级企业大数据项目实战 + 拉勾硬核内推,5 个月全面掌握大数据核心技能。点击链接,全面赋能!
-– ### 精选评论 ##### *尚: > 麻烦问一下老师,文中解释的只是decode之后的情况,那么decode之前的情况呢?请求如果一直在来,然后decode的时候产生了任务积压,那么就会一直在decode的时候while吗?这里的压力没法继续往前反馈了呀 ###### 讲师回复: > 如果decode之前还有个步骤是receive,也就是从kafka中拉消息的话,那decode的情况就和process完全一样了,无需再讨论。所以这里我就讲下receive的情况, receive是从kafka拉取数据,也就是pull模式,它是自己主动从receive读消息的,当它发现decode的任务队列已经满了时,就会阻塞起来,进而从kafka“拉”数据的速度也会自动减慢,从而不存在OOM的问题。另外,对于你说的“decode的时候while”,decode自己的队列也是个容量有限的阻塞队列,当decode自己阻塞在往process的任务队列写数据时,这个while循环就卡住或者说停顿了,这样decode自己的任务队列也会占满。这样receive往decode的任务队列提交任务时,receive自然也会阻塞住,从而降低receive从kafka中pull消息的速度。你看,这样不就将反向压力从process一直传到了receive?再多几个步骤也是相同的原理,只要任务队列是容量有限的阻塞队列,就会将反向压力往前传。 ##### **队长: > 老师,请问一下,这套操作是不是用rxjava的提供的flowable就可以完成背压操作民 ###### 讲师回复: > 是的哈,给你点赞,一下子就能领悟这个课时的重点。确实是这样的,但凡用到异步的系统,一定要记得考虑上下游处理速度不匹配时的问题。你可以不用反向压力,但一定要提前想清楚在程序实际运行时会不会出现数据积压的问题,出现问题时你有怎么样的策略进行处理。程序开发时把问题考虑周全即可 ##### **强: > 感觉大部分内容是线程池的知识。我理想中流式计算反向压力机制不应该仅仅通过一个线程池来实现。 ###### 讲师回复: > 关键不是线程池哈,而是“队列”。只有带队列的线程池实现,才是“流”式的。然后实现反向压力的方式确实不只有线程池队列满了之后拒绝新任务提交的方法。像Storm和Spark Streaming也是有各自的反向压力实现方式的,尤其是Spark Streaming利用了PID的思想来进行反向压力实现。在模块四分析四种开源流计算框架时,我对它们实现反向压力的方法都各自做了描述,你也可以看下。另外,在Reactive Stream标准中,本质上是通过pull的方式来进行流控的,这也算是实现反向压力的一种思路。总的来说的话,采用容量有限的阻塞队列来实现反向压力,我觉得这已经是一种简洁直观且好用的方案了。当然,不排除有更精妙更高效的方案,但终究还是要经过实践来检验。 ##### **强: > 为什么会降低上游的步骤的处理速度??? 上游处理时有自己的队列,下游处理时也有自己的队列,下游是如何通知上游自己的处理能力的?是不是上下游之间有一个队列连接,上面的代码示例没有体现呀? ###### 讲师回复: > 图3中的上游和下游之间,只有1个队列,没有专门用于下游向上游通知的队列。下游是这样对上游进行流控的,当下游处理较慢时,它的任务队列就会被占满,由于我们这里的队列用的是有容量限制的阻塞队列,所以当这个任务队列满了之后,上游再往这个队列提交任务时就会阻塞住,这样它相当于暂停工作了,所以会减慢速度。当下游之后将队列里的任务再取出一个之后,队列里又有空间了,这样上游就可以继续往队列里提交任务,从而恢复执行了。你可以看下 BackPressureExecutor 类的代码哈,其中的do while{}部分就是在做下游队列阻塞后不断重试提交的过程,在这个地方会自然降低上游处理速度的,不需要下游再额外通知。 ##### 薛: > 老师,像Kafka这种消息队列也有反压机制,反压是在流计算框架处理还是消息队列处理呢? ###### 讲师回复: > 你说的Kafka支持反向压力,我想应该是指那些实现了Reactive Streams标准的Kafka客户端类吧,比如akka-stream-kafka。不过在我目前的处理过程中,主要还是利用流计算框架的反向压力功能,比如Flink和Spark Streaming都支持反向压力。你可以将我在这个课时里讲反向压力时用到的队列,视为是这些流计算框架里自带的数据传输队列,不是外部的Kafka。我自己使用Kafka的时候,通常不会做反向压力,原因在于Kafka可以将数据保存在磁盘中,来避免OOM问题,因为磁盘毕竟比内存大很多。比如像Samza就是这么使用Kafka的,它并不支持反向压力。另外,使用Kafka时,一般需要做好Kafka磁盘监控和topic积压情况的监控,同时设置合理的数据保存时间,也就是log.retention.hours参数,这样可以尽量避免Kafka磁盘写满的问题。 ##### *域: > 反向压力其实是,在解决任务处理速度慢于生产者生产的速度的场景,其目的是使用有限的服务器资源处理有限的数据。">服务器层面出发吗?还是有更好的解决方案🤔 ###### 讲师回复: > 对的。因为当一个异步系统有多级处理步骤时,你是很难只从源头来控制系统的处理速度的。比如有A->B->C三个步骤,B比A处理慢,C又比B处理的慢的话,光控制A的流量是不能解决B会造成C崩溃的问题的。而反向压力的存在,则可以保证服务不会时时刻刻都有可能因为OOM问题意味崩溃。同时它可以尽可能地发挥出整个系统的最高性能,也方便我们定位服务器的资源使用情况。比如在上面的A->B->C过程中,如果定位到B比A慢,你就可以通过分配更多资源给B,或者改善算法,优化程序性能的方式,提升B的处理能力,这样可以尽可能地让A也发挥出最佳的性能。至于有没有更好的解决方案,这个其实是有点偏哲学,因为在像Kubernetes那种无状态的微服务体系中,它们也是推崇快速失败重启的方式。但是在异步或流计算系统中,我认为还是反向压力的方案最佳。毕竟,系统时不时因为OOM问题崩溃下,也是挺烦人的。 ##### Q: > 老师你好,单从这个例子来看,是否只是列举了一个例子说把「线程池拒绝」这个现象当做一个反馈的信号,从而实现了反向压力控制的左右。1、可是这while循环就使得出现了类似于CPU一直在干无用功的情况,后续大量的线程都出现被拒绝,重试,被拒绝,重试的情况等等,改成类似阻塞唤醒的机制避免无用的while循环操作是否会更好些?2、此外反馈的信息也可以更丰富一些吧,可以是具体的任务量,也有可能是各机器的内存,CPU等数据? ###### 讲师回复: > 哇,你的想法非常不错哈!确实,实现反向压力的方式不止线程池的队列满了之后拒绝新任务提交的方法。这里的while并不会一直耗费CPU资源,因为有个rejectSleepMills会控制在提交任务被拒绝后的睡眠时间。这个时间即使设置为1ms,对于CPU而言也已经是非常非常长的时间了,因为现在CPU的主频一般可以达到2G Hz以上。所以CPU并不会一直被这个while循环占用哈。然后你说的通过唤醒的方式,也是一种方法。主动唤醒的好处在于缩短了阻塞的等待时间,确实有其优势。但是这种方法也有缺点,就是实现起来更复杂,并且在分布式情况下如果主动唤醒的信号因为网络或其他原因丢失的话,处理起来就有些麻烦了,要么不停重发唤醒信号,要么就是造成服务卡死。另外,你说的“反馈的信息”也可以更丰富些,确实是不错的想法哈!这和Spark Streaming实现反向压力的思路有些类似,是一种基于PID的控制方法。对于这种方案的话,我觉得需要更多的需要参考程序在实际运行时的资源使用情况和经验,否则可能实现出来的反向压力机制虽然考虑很全面,但是实际提升效果并不明显。综合而言,我觉得直接通过队列满了就睡眠(让出CPU)等待一段时间,以减慢消息发送速度的方式,就已经是一种简洁且好用的方案了。 ##### *震: > 这个反压如果数据还是处理不及时,那么反压处理这边也会oom ##### *锋: > 老师你好,数据采集不应该是从kafka来的?为啥还维护阻塞队列?开启指定线程处理,单个线程处理完毕才会去读后面的数据? ###### 讲师回复: > 不是的哈。02和03课时里的数据采集服务器,数据是从手机或网页等客户端,通过HTTP请求发送过来的。然后本课时中的channelRead0函数,就是在处理这些HTTP请求发送过来的数据。channelRead0在处理过程中,会依次将每个请求数据,交由decode、doExtractCleanTransform、send这三个步骤处理。这三个步骤之间,是通过队列queue进行数据传递的。大多数情况下,这三个步骤的处理能力都是不一致的,如果不用阻塞队列的话,就会造成数据在三个步骤之间的队列中积压,时间久了就会OOM。每个步骤是分配了一个带队列的executor进行处理的,不是你说的“单个线程处理完毕才会去读后面的数据”哈。在最后的send这个步骤中,数据才被发送到Kafka中,然后供后续的数据处理模块进行处理,在本课时中并不是从Kafka中读取数据的哈。 ##### *建: > 如果process队列满了,decode线程被阻塞,那岂不是会造成decode线程耗尽,进入线程池的异常策略中。 ###### 讲师回复: > 非常正确哈!我们在配置线程池时,必须设置一个数量有限的线程数量,否则我们即使设置了“容量有限的阻塞队列”,也不能够实现“反向压力”。因为这个时候,队列里的所有任务都可以分配一个单独的线程的话,那“容量有限的阻塞队列”根本就不会满了。但这种情况下,你的系统一定会因为分配的线程过多,运行一段时间后就会OOM崩溃掉。所以,设置一个有限的线程数量,是我们实现“反向压力”功能的一部分。 ##### **用户3164: > 老师,这视频里面讲的后面会有代码操作执行的结果的演示吗?光是听的话,理解的还不是很深刻 ###### 讲师回复: > 我在GitHub上放的代码是可以直接执行的哈,你可以用git clone下来后,用IntelliJ IDEA导入工程,直接找到对应的主类运行即可。有些工程是有README的,你可以参考看下。后面我把README再补充完善些。 ##### **一: > 老师这门课太好了,但最好能出个视频课,带我们一步一步敲,这样我们的收获会更大 ###### 讲师回复: > 哈哈,谢谢你的肯定!我在GitHub上放的代码是可以直接执行的哈,用git clone下来后,用IntelliJ IDEA导入工程,直接找到对应的主类就可以运行了。后面我再将代码各个工程的 README 补充详细些。视频的话,我后面再看有没有时间和机会弄,现在暂时没有时间,因为每次录一个课时的音频我都最少要三四个小时了,视频应该要更长的时间 ̗̀(๑ᵔ⌔ᵔ๑),
文章作者
上次更新 10100-01-10