服务器发送的带有RxJava和SSE发射器的事件


Spring framework 4.2 GA即将发布,让我们看看它提供的一些新特性。引起我注意的是一个简单的新班级SseEmitter-抽象的结束sever-sent events易于在Spring MVC控制器中使用。SSE是一种技术,它允许您在一个单向的超文本传输协议连接中将数据从服务器流式传输到浏览器。这听起来像是什么的子集websockets可以。然而,由于它是一个简单得多的协议,当全双工是不必要的,例如实时推动股票价格变化或显示长期运行过程的进展时,它可以被使用。这将是我们的例子。


假设我们有一个具有以下应用编程接口的虚拟硬币挖掘器:

public interface CoinMiner {
 
    BigDecimal mine() {
        //...
    }
}


每次我们叫我的()时,我们必须等几秒钟,然后我们得到大约1个硬币作为回报(平均)。如果我们想挖掘多个硬币,我们必须多次调用这个方法:

@RestController
public class MiningController {
 
    //...
 
    @RequestMapping("/mine/{count}")
    void mine(@PathVariable int count) {
        IntStream
                .range(0, count)
                .forEach(x -> coinMiner.mine());
    }
 
}

客户代码必须提供ExecutorService明确地(只是设计选择):

@RequestMapping("/mine/{count}")
void mine(@PathVariable int count) {
    final List<CompletableFuture<BigDecimal>> futures = IntStream
            .range(0, count)
            .mapToObj(x -> coinMiner.mineAsync(executorService))
            .collect(toList());
    futures.forEach(CompletableFuture::join);
}

先打电话非常重要mineAsync多次,然后(作为第二阶段)等待所有期货完成join。很容易写道:

IntStream
        .range(0, count)
        .mapToObj(x -> coinMiner.mineAsync(executorService))
        .forEach(CompletableFuture::join);

然而,由于Java 8中流的惰性,这些任务将按顺序执行!如果你还没有发现流的惰性,请从头到尾读一遍:我们要求加入一些未来,所以流会上升,并调用我的异步()一次(惰性!),将它传递给join()。当这个连接()结束时,它会再次上升,请求另一个未来。通过使用collect(),我们强制所有mineAsync()执行,开始所有异步计算。后来,我们等待他们每一个人。


介绍SseEmitter

现在是时候做出更多的反应了(我说过了)。控制器可以返回SseEmitter的一个实例。一旦我们从一个处理程序方法返回,容器线程就被释放,并且可以服务更多即将到来的请求。但是连接没有关闭,客户端一直在等待!我们应该做的是保留一个SseEmitter实例的引用,然后从不同的线程调用它的send()和complete方法。例如,我们可以启动一个长时间运行的进程,并保持任意线程的发送进度。一旦这个过程完成,我们就完成了发送器,最后关闭了连接(至少在逻辑上,记住保持活动)。在下面的例子中,我们有一堆完整的期货,当每一个完成时,我们只需发送一个给客户端(notifyProgress())。当所有未来都完成后,我们完成流,然后运行(发送器::完成),关闭连接:

@RequestMapping("/mine/{count}")
SseEmitter mine(@PathVariable int count) {
    final SseEmitter sseEmitter = new SseEmitter();
    final List<CompletableFuture<BigDecimal>> futures = mineAsync(count);
    futures.forEach(future ->
            future.thenRun(() -> notifyProgress(sseEmitter)));
 
    final CompletableFuture[] futuresArr = futures.toArray(new CompletableFuture[futures.size()]);
    CompletableFuture
            .allOf(futuresArr)
            .thenRun(sseEmitter::complete);
 
    return sseEmitter;
}
 
private void notifyProgress(SseEmitter sseEmitter) {
    try {
        sseEmitter.send(1);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}
 
private List<CompletableFuture<BigDecimal>> mineAsync(@PathVariable int count) {
    return IntStream
            .range(0, count)
            .mapToObj(x -> coinMiner.mineAsync(executorService))
            .collect(toList());
}

调用此方法会产生以下响应(请注意Content-Type):

< HTTP/1.1 200 OK
< Content-Type: text/event-stream;charset=UTF-8
< Transfer-Encoding: chunked
< 
data:1
 
data:1
 
data:1
 
data:1
 
* Connection #0 to host localhost left intact

稍后我们将学习如何在客户端解释这样的响应。暂时让我们稍微整理一下设计。


用介绍RxJavaObservable进步

上面的代码可以工作,但是看起来相当混乱。我们实际上拥有的是一系列事件,每一个都代表着计算的进程。计算最终完成,因此流也应该发出结束信号。听起来完全像可观察的!我们从重构CoinMiner开始,以便返回可观察的<大十进制>:

Observable<BigDecimal> mineMany(int count, ExecutorService executorService) {
    final ReplaySubject<BigDecimal> subject = ReplaySubject.create();
    final List<CompletableFuture<BigDecimal>> futures = IntStream
            .range(0, count)
            .mapToObj(x -> mineAsync(executorService))
            .collect(toList());
    futures
            .forEach(future ->
                    future.thenRun(() -> subject.onNext(BigDecimal.ONE)));
 
    final CompletableFuture[] futuresArr = futures.toArray(new CompletableFuture[futures.size()]);
    CompletableFuture
            .allOf(futuresArr)
            .thenRun(subject::onCompleted);
 
    return subject;
}


每次事件出现在Observable从返回mineMany()我们刚刚开采了那么多硬币。当所有的未来都完成了,我们也完成了这个流程。从实现的角度来看,这还不是更好,但是从控制器的角度来看,这是多么干净:

@RequestMapping("/mine/{count}")
SseEmitter mine(@PathVariable int count) {
    final SseEmitter sseEmitter = new SseEmitter();
    coinMiner
            .mineMany(count, executorService)
            .subscribe(
                    value -> notifyProgress(sseEmitter),
                    sseEmitter::completeWithError,
                    sseEmitter::complete
            );
    return sseEmitter;
}


打完电话后coinMiner.mineMany()我们只是订阅活动。结果是ObservableSseEmitter方法匹配1:1。这里发生的事情非常简单明了:启动异步计算,每当后台计算发出任何进展的信号时,就将其转发给客户端。好,让我们回到实现。它看起来很乱,因为我们混在一起CompletableFutureObservable。我已经描述了如何convert CompletableFuture into an Observable with just one element。以下是总结,包括rx.Single自RxJava 1.0.13以来发现的抽象(此处未使用):

public class Futures {
 
    public static <T> Observable<T> toObservable(CompletableFuture<T> future) {
        return Observable.create(subscriber ->
                future.whenComplete((result, error) -> {
                    if (error != null) {
                        subscriber.onError(error);
                    } else {
                        subscriber.onNext(result);
                        subscriber.onCompleted();
                    }
                }));
    }
 
    public static <T> Single<T> toSingle(CompletableFuture<T> future) {
        return Single.create(subscriber ->
                future.whenComplete((result, error) -> {
                    if (error != null) {
                        subscriber.onError(error);
                    } else {
                        subscriber.onSuccess(result);
                    }
                }));
    }
 
}

有了这些实用程序操作符,我们可以改进实现并避免混合使用两个应用程序接口:

Observable<BigDecimal> mineMany(int count, ExecutorService executorService) {
    final List<Observable<BigDecimal>> observables = IntStream
            .range(0, count)
            .mapToObj(x -> mineAsync(executorService))
            .collect(toList());
    return Observable.merge(observables);
}
 
Observable<BigDecimal> mineAsync(ExecutorService executorService) {
    final CompletableFuture<BigDecimal> future = 
         CompletableFuture.supplyAsync(this::mine, executorService);
    return Futures.toObservable(future);
}

RxJava有一个内置的操作符,用于将多个可观测值合并成一个,我们的每个底层可观测值只发出一个事件并不重要。


深入研究RxJava操作符

让我们利用RxJava的力量来稍微改进一下我们的流。


scan()

目前,每次我们挖掘一枚硬币,我们都会向客户发送(1)个事件。这意味着每个客户必须跟踪它已经收到了多少硬币,以便计算出总的计算金额。如果服务器总是发送总量而不是增量就好了。然而,我们不想改变实现。事实证明,使用可观察扫描()运算符非常简单:

@RequestMapping("/mine/{count}")
SseEmitter mine(@PathVariable int count) {
    final SseEmitter sseEmitter = new SseEmitter();
    coinMiner
            .mineMany(count, executorService)
            .scan(BigDecimal::add)
            .subscribe(
                    value -> notifyProgress(sseEmitter, value),
                    sseEmitter::completeWithError,
                    sseEmitter::complete
            );
    return sseEmitter;
}
 
private void notifyProgress(SseEmitter sseEmitter, BigDecimal value) {
    try {
        sseEmitter.send(value);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

scan()操作员将前一个事件和当前事件组合在一起。通过应用BigDecimal::add我们简单地把所有的数字相加。例如1、1 + 1、(1 + 1) + 1,等等。scan()就像flatMap(),但保留中间值。
 

用...取样sample()

这可能是因为我们的后端服务产生了太多我们可以使用的进度更新。我们不想让不相关的更新充斥客户端,让带宽饱和。每秒最多发送两次更新听起来很合理。幸运的是,RxJava也有一个内置的操作符:

Observable<BigDecimal> obs = coinMiner.mineMany(count, executorService);
obs
        .scan(BigDecimal::add)
        .sample(500, TimeUnit.MILLISECONDS)
        .subscribe(
            //...
        );


sample()将定期查看底层流并仅发出最新的项,丢弃中间的项。幸运的是,我们可以随时收集物品scan()所以我们不会丢失任何更新。

window()-恒定发射间隔

但是有一个问题。如果在所选的500毫秒内没有新的东西出现,sample()将不会发出同一个项目两次。这很好,但是请记住,我们正在通过TCP/IP连接推送这些更新。定期向客户端发送更新是一个好主意,即使在此期间什么也没发生——只是为了保持连接活跃,有点像ping。可能有很多方法可以达到这个要求,例如超时()操作符。我选择使用window()运算符每500毫秒对所有事件进行分组:


Observable<BigDecimal> obs = coinMiner.mineMany(count, executorService);
obs
        .window(500, TimeUnit.MILLISECONDS)
        .flatMap(window -> window.reduce(BigDecimal.ZERO, BigDecimal::add))
        .scan(BigDecimal::add)
        .subscribe(
            //...
        );

这个很棘手。首先,我们在500毫秒窗口内对所有进度更新进行分组。然后,我们使用reduce计算在此时间段内挖掘的硬币总数(类似于scan())。如果在那个时期没有硬币被开采,我们只需返回零。最后,我们使用scan()来汇总每个窗口的小计。我们不再需要样本(),因为窗口()确保每500毫秒发出一个事件。


客户端

JavaScript中有很多使用SSE的例子,所以给你一个调用我们控制器的快速解决方案:

var source = new EventSource("/mine/10");
source.onmessage = function (event) {
    console.info(event);
};


我相信SseEmitter是对Spring MVC的一个重大改进,它将允许我们编写更健壮、更快的web应用程序,这些应用程序需要即时的单向更新。