Java中的反应流是什么?


如果您关注Java社区,您可能听说过Java中的反应流。似乎在所有主要的技术会议上,你都能看到关于反应编程的演示。去年讨论的都是函数式编程,今年讨论的是反应式编程。

那么,Java社区的注意力持续时间就那么短暂吗?我们Java开发人员是否已经忘记了函数式编程而转向了反应式编程呢?

不完全是。实际上,函数式编程范式非常好地补充了反应式编程范式。

您不需要使用函数式编程范例来遵循反应式编程。您可以使用Java开发人员传统上使用的好的旧的命令式编程范例。也许至少。如果你这么做的话,你会给自己制造很多麻烦的。(仅仅因为你能做某事,并不意味着你应该做那件事!)

函数式编程对反应式编程很重要,但我不会在这篇文章中深入讨论函数式编程。

在这篇文章中,我想看看Java中的整体反应性景观。

反应性编程与反应性流

有了这些新的流行语,人们很容易搞不清它们的意思。

反应式编程是一种编程范式,但我不会称之为新的。其实已经有一段时间了。

就像面向对象编程,函数式编程或过程式编程一样,反应式编程只是另一种编程范式。

另一方面,反应流是一种规范。对于Java程序员来说,反应流是一种API。Reactive Streams为Java中的Reactive编程提供了一个通用API。

Reactive Streams API是来自Kaazing,Netflix,Pivotal,Red Hat,Twitter,Typesafe和许多其他公司的工程师协作的产物。

反应流很像JPA或JDBC。两者都是API规范。这两者都需要使用API规范的实现。

例如,在JDBC规范中,您拥有Java DataSource接口。Oracle JDBC实现将为您提供DataSource接口的实现。正如Microsoft的SQL Server JDBC实现也将提供DataSource接口的实现一样。

现在,您的高级程序可以接受DataSource对象,并且应该能够处理数据源,而不需要担心它是由Oracle还是Microsoft提供的。

就像JPA或JDBC一样,反应流为我们提供了一个API接口,我们可以对其进行编码,而无需担心底层实现。

反应程序设计

关于什么是反应式编程,有很多观点。还有很多关于反应式编程的宣传!

学习反应式编程范例的最佳起点是阅读Reactive ManifestoReactive Manifesto是构建现代云规模架构的处方。

Reactive Manifesto是构建现代云规模架构的处方。

反应性宣言

反应式宣言描述了反应式系统的四个关键属性:

反应灵敏

system在可能的情况下及时作出反应。响应性是可用性和效用的基石,但不止于此,响应性意味着问题可能被快速检测并有效处理。响应系统专注于提供快速和一致的响应时间,建立可靠的上限,从而提供一致的服务质量。这种一致的行为反过来简化了错误处理,建立了最终用户的信心,并鼓励了进一步的交互。

有弹性

系统在面对failure。这不仅适用于高度可用的,任务关键的系统--任何没有弹性的系统在故障后都将毫无反应。通过以下方式实现弹性replication,遏制,isolationdelegation。故障包含在每个component,将组件彼此隔离,从而确保系统的各部分能够在不损害整个系统的情况下发生故障和恢复。每个组件的恢复被委托给另一个(外部)组件,并且在必要的地方通过复制来确保高可用性。组件的客户端没有处理其故障的负担。

弹性的

该系统在不同的工作负载下保持响应能力。电抗系统可以通过增加或降低resources分配给这些输入。这意味着没有争用点或中心瓶颈的设计,导致能够分割或复制组件并在它们之间分配输入。反应性系统通过提供相关的实时性能度量来支持预测性和反应性的缩放算法。他们实现了elasticity以具有成本效益的方式在商品硬件和软件平台上。

消息驱动

反应系统依赖于asynchronous message-passing在组件之间建立边界,以确保松散耦合,隔离和location transparency。此边界还提供了将failures作为信息。通过对系统中的消息队列进行整形和监视,并应用显式消息传递,可以实现负载管理,弹性和流控制back-pressure必要时。位置透明消息传递作为一种通信手段,使得在集群或单个主机内使用相同的结构和语义进行故障管理成为可能。Non-blocking通信只允许接收者使用resources从而减少系统开销。

前三个属性(响应性,弹性,弹性)与您的架构选择更相关。很容易理解为什么微服务,Docker和Kubernetes等技术是反应系统的重要方面。在单个服务器上运行LAMP堆栈显然不符合Reactive Manifesto的目标。

traits of reactive systems消息驱动和反应编程

作为Java开发人员,我们最感兴趣的是最后一个属性,即消息驱动属性。

消息驱动的体系结构当然没有什么革命性。如果您需要消息驱动系统的入门知识,我建议您阅读Enterprise Integration Patterns--一本真正标志性的计算机科学书籍。这本书中的概念为Spring Integration和Apache Camel奠定了基础。

Reactive Manifesto的几个方面确实让我们Java开发人员感兴趣:消息失败,反向压力和非阻塞。这些都是Java中反应性编程的微妙但重要的方面。

作为消息的失败

通常在反应式编程中,您将处理一个消息流。不希望的是抛出异常并结束消息流的处理。

首选的方法是优雅地处理故障。

可能您需要执行一个web服务,但它已关闭。也许有备份服务你可以用?或者10秒后重试?

我不是要解决所有的边缘案件。关键是您不想因为运行时异常而导致严重的失败。理想情况下,您希望注意到失败,并设置某种类型的重试或恢复逻辑。

通常,故障是通过回调处理的。JavaScript开发人员很习惯使用回调。

但是回调使用起来可能会很难看。JavaScript开发人员将此称为回调地狱。

在反应蒸汽,例外是一等公民。异常不会被粗暴地抛出。错误处理被直接内置到Reactive Streams API规范中。

背压

你听说过“从消防水带里喝水”这句话吗?

drinking from the firehose - importance of back pressure in reactive programing.

反压是反应式编程中一个非常重要的概念。它给下游客户提供了一种方式,可以说:“请再来点。”

想象一下,如果您正在对数据库进行查询,结果集返回1000万行。传统上,数据库会以客户机接受它们的速度将所有1000万行吐出来。

当客户端不能再接受时,它就会阻塞。数据库焦急地等待着。挡住了。链中的线程耐心地等待着被疏通。

在一个被动的世界里,我们希望我们的客户有权说,给我第一个1000。然后我们可以给他们1,000,继续我们的业务-直到客户回来要求另一套记录。

这与客户端没有发言权的传统系统形成鲜明对比。节流是通过阻塞线程来完成的,而不是以编程方式完成的。

非阻塞

对于我们Java开发人员来说,反应体系结构的最后一个,也可能是最重要的一个方面是非阻塞性的。

在Reactive出现很久之前,无阻塞似乎并不是什么大事。

作为Java开发人员,我们被教导通过使用线程来利用强大的现代硬件。越来越多的内核意味着我们可以使用越来越多的线程。因此,如果我们需要等待数据库或web服务返回,则不同的线程可以利用CPU。这对我们来说似乎是有道理的。当我们的阻塞线程等待某种类型的I/O时,另一个线程可以使用CPU。

所以,封杀没什么大不了的。对吧?

嗯,没那么多。系统中的每个线程都将消耗资源。每阻塞一个线程,就会消耗资源。虽然CPU在服务不同的线程方面非常高效,但仍然存在成本问题。

我们Java开发人员可能是一群傲慢的人。

他们总是看不起JavaScript。一种下流的小语言,喜欢写剧本的孩子。JavaScript共用“Java”一词这一事实总是让我们Java程序员感到有点儿肮脏。

如果您是一名Java开发人员,那么当您不得不指出Java和JavaScript是两种不同的语言时,您有多少次感到恼火呢?

然后Node.js出现了。

和Node.js在吞吐量方面建立了疯狂的基准。

然后Java社区注意到了这一点。

是的,剧本里的孩子们已经长大了,正在侵占我们的地盘。

这并不是说运行在Google的V8 JavaScript引擎中的JavaScript对于编程来说是天赐良机。Java过去在性能方面有缺陷,但它非常高效,即使与现代本地语言相比也是如此。

secret sauce的性能是无阻塞的。

js使用一个带有有限线程数的事件循环。虽然阻塞在Java世界中通常被视为没什么大不了的,但在Node.js世界中,它将是性能的死亡之吻。

这些图形可以帮助你将差异形象化。

在Node.js中,有一个非阻塞事件循环。请求以非阻塞方式处理。线程在等待其他进程时不会卡住。

node.js single thread event loop processing

将Node.js模型与Java中使用的典型多线程服务器进行对比。并发性是通过使用多个线程来实现的。这是由于多核处理器的增长而被普遍接受的。

multi threaded server with blocking

我个人认为,这两种方法之间的区别就像一条超级高速公路和许多有灯光的城市街道之间的区别一样。

通过一个单线程事件循环,您的进程可以在高速公路上快速行驶。在多线程服务器中,您的进程被困在城市街道上走走停停的车流中。

两者都能移动大量的交通。但是,我宁愿以高速行驶!

your code on reactive streams

当您转移到非阻塞范例时,您的代码在CPU上停留的时间会更长。线程的切换较少。您不仅减少了管理许多线程的开销,而且还减少了线程之间的上下文切换。

您将看到系统容量中有更多的余量供您的程序使用。

非阻塞是一个不是性能的圣杯。你不会看到事情跑得更快的。

是的,管理阻塞是有成本的。但综合考虑,还是比较有效率的。

事实上,在一个适度使用的系统中,我不确定这种差异会有多大。

但是,随着系统负载的增加,您可以期望看到的是,您将拥有额外的容量来服务更多的请求。您将获得更大的并发性。

多少钱?

问得好。用例是非常具体的。与所有基准一样,您的里程数也会有所不同。

Learn more about my Spring Framework 5 course here!反应流API

让我们看一下用于Java的Reactive Streams API。Reactive Streams API仅由四个接口组成。

发布者

Apublisher是一个提供程序,提供可能无限数量的已排序元素,并根据从其订阅者收到的需求发布这些元素。

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}


订户

的实例之后,将接收一次对Subscriber.onSubscribe(订阅)的调用Subscriber到Publisher.Subscribe(订阅服务器)。

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}


订阅

ASubscription表示订阅发布服务器的订阅服务器的一对一生命周期。

public interface Subscription {
    public void request(long n);
    public void cancel();
}


处理机

AProcessor表示一个处理阶段,它既是订阅者又是发布者,并遵守两者的约定。

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}


Java的反应流实现

Java的反应性景观正在演变和成熟。David Karnok有一篇很棒的博客文章Advanced Reactive Java在书中,他把各种反应性的项目分解成几代。下面我会记下每一个版本的版本(可能会随着新版本的发布而随时改变)。

RXJava

RxJava是ReactiveX项目。在撰写本文时,ReactiveX项目已经实现了Java,JavaScript,。NET(C#),Scala,Clojure,C++,Ruby,Python,PHP,Swift和其他一些工具。

ReactiveX提供了一个reactive twist在。。。上GoF Observer模式,这是一个很好的方法。ReactiveX称他们的方法为“观察者模式做对了”。

ReactiveX结合了观察者模式,迭代器模式和函数式编程的最佳思想。

RxJava早于Reactive Streams规范。虽然RXJava2.0+确实实现了Reactive Streams API规范,但您会注意到术语上的细微差别。

DavidKarnok是RxJava的一个重要提交人,他认为RxJava是第三代反应性库。

反应堆

Reactor是来自Pivotal的响应流兼容实现。从Reactor3.0开始,Java8或更高版本是一个需求。

中的反应性功能Spring Framework 5是建立在3.0反应堆上的。

反应堆是一个4th-generation reactive library。(David Karnok也是project Reactor的成员)

阿卡河

Akka Streams还完全实现了Reactive Streams规范。Akka使用Actors来处理流数据。虽然Akka Streams符合Reactive Streams API规范,但Akka Streams API与Reactive Streams接口完全解耦。

Akka Streams被认为是第三代反应库。

Ratpack

Ratpack是一组用于构建现代高性能HTTP应用程序的Java库。Ratpack使用Java8,Netty和Reactive原则。Ratpack提供了Reactive Stream API的基本实现,但并不是设计成一个功能齐全的Reactive工具包。

Optionally,您可以使用RxJava或与ratpack一起使用Reactor。

vert.x

Vert.x是一个Eclipse基础项目。它是一个用于JVM的多语言事件驱动应用程序框架。Vert.x中的反应性支持类似于RatPack。Vert.x允许您使用RxJava或其本机实现的Reactive Streams API。

反应流和JVM版本

Java 1.8的反应流

使用Java1.8,您将发现对Reactive Streams规范的健壮支持。

在Java1.8中,反应流不是Java API的一部分。但是,它可以作为一个单独的JAR来使用。

反应流Maven依赖关系

<dependency>
    <groupId>org.reactivestreams</groupId>
    <artifactId>reactive-streams</artifactId>
    <version>1.0.0</version>
</dependency>


虽然您可以直接包含这个依赖项,但是您使用的任何反应流的实现都应该自动地将其作为依赖项包含。

Java 1.9的反应流

当您转到Java1.9时,情况会发生一些变化。反应流已成为official Java 9 API

您将注意到,反应流接口移动到Flow类。但除此之外,该API与Java1.8中的Reactive Streams1.0相同。

结论

在撰写本文时,Java9即将问世。在Java9中,反应流正式成为Java API的一部分。

在本文的研究中,可以清楚地看到各种反应性库一直在进化和成熟(例如David Karnok世代分类)。

在Reactive Streams之前,各种Reactive库无法实现互操作性。他们不能互相交谈。RxJava的早期版本与project Reactor的早期版本不兼容。

但是在Java9发布前夕,主要的reactive库都采用了reactive Streams规范。不同的库现在是可互操作的。

互操作性是一张即将倒下的重要多米诺骨牌。例如,MongoDB实现了一个Reactive Streams driver现在,在我们的应用程序中,我们可以使用Reactor或RxJava来使用MongoDB中的数据。

我们在适应反应流方面还处于早期阶段。但是在接下来的一年左右,我们可以期待越来越多的开源项目提供反应流兼容性。

我预计在不久的将来,我们会看到更多的反应流。

现在是Java开发人员的乐趣时刻!