Scala和Clojure中的惰性序列


懒惰序列(也称为溪流)是一种您可能从未听说过的有趣的函数数据结构。基本上,惰性序列是一个直到您实际使用它才能完全知道/计算的列表。想象一下,创建一个非常昂贵的列表,并且您不想计算太多-但仍然允许客户端根据自己的需要消费。与迭代器类似,但是迭代器是破坏性的-一旦您读取它们,它们就消失了。另一方面,懒惰的序列要记住已经计算过的元素。

请注意,这种抽象甚至允许我们构造和使用无限溪流!完全有可能创建一个懒惰的素数序列,或者Fibonacci series。它由客户决定他们想要消费多少个元素--只有这些元素才会被生成。将其与急切列表进行比较-必须在第一次使用和迭代器之前预先计算-忘记已计算的值。

但是请记住,惰性序列总是从头开始遍历,因此为了找到第N个元素,惰性序列必须计算前N-1个元素。

我尽量避免纯学术性的例子,因此不会有斐波那契级数的例子。你可以在每一篇关于这个主题的文章中找到它。相反,我们将实现一些有用的东西-Cron expression测试实用程序,返回下一次射击时间序列。我们已经implemented testing Cron expressions在此之前,使用递归和迭代器。简而言之,我们希望确保我们的Cron表达式是正确的,并在我们真正期望的时候触发。Quartz scheduler提供方便CronExpression.getNextValidTimeAfter(Date)方法,该方法返回给定日期之后的下一个激发时间。例如,如果我们想要计算下十次射击时间,我们需要调用此方法十次,但是!第一次调用的结果应该作为参数传递给第二次调用-毕竟,一旦我们知道作业将在何时触发,我们就想知道下一次调用(在第一次之后)的触发时间。接下来,为了找到第三个调用时间,我们必须将第二个调用时间作为参数传递。这一描述将我们引向了简单的递归算法:

1. def findTriggerTimesRecursive(expr: CronExpression, after: Date): List[Date] =
2.     expr getNextValidTimeAfter after match {
3.         case null => Nil
4.         case next => next :: findTriggerTimesRecursive(expr, next)
5.     }

getNextValidTimeAfter()可能会回来null指示Cron表达式永远不会再次触发(例如,它只在2013年运行,而我们已经到了年底)。但是,此解决方案存在多个问题:

  • 我们真的不知道客户端需要多少个未来日期,因此我们很可能会生成过多、不必要地消耗CPU周期1
  • 更糟糕的是,一些Cron表达式永远不会结束。"0 0 17 * * ? *"将在每天下午5点运行,每年都是如此,而且是无限的。我们绝对没有那么多时间和记忆
  • 我们的实现不是尾递归的。不过,修复起来很容易

如果我们有一个“类似列表”的数据结构,我们可以像处理任何其他序列一样传递和使用它,但不必急于评估它,会怎么样?下面是Scala中的一个实现Stream[Date]仅在需要时才计算下一次射击时间:

1. def findTriggerTimes(expr: CronExpression, after: Date): Stream[Date] =
2.     expr getNextValidTimeAfter after match {
3.         case null => Stream.Empty
4.         case next => next #:: findTriggerTimes(expr, next)
5.     }

仔细看,它们几乎一模一样!我们更换了List[Date]Stream[Date](两者都实施LinearSeq),NilStream.Empty::#::。最后的变化是至关重要的。#::方法(是的,这是一种方法.)接受tl: => Stream[A]-按名称。这意味着findTriggerTimes(expr, next)并不是真的叫这里!它实际上是一个闭合,我们把它传递给#::高阶函数。只有在需要时才会评估此闭包。让我们用下面的代码来演示一下:

01. val triggerTimesStream = findTriggerTimes("0 0 17 L-3W 6-9 ? *")
02.   
03. println(triggerTimesStream)
04. //Stream(Thu Jun 27 17:00:00 CEST 2013, ?)
05.   
06. val firstThree = triggerTimesStream take 3
07. println(firstThree.toList)
08. //List(Thu Jun 27 17:00:00 CEST 2013, Mon Jul 29 17:00:00 CEST 2013, Wed Aug 28 17:00:00 CEST 2013)
09.   
10. println(triggerTimesStream)
11. //Stream(Thu Jun 27 17:00:00 CEST 2013, Mon Jul 29 17:00:00 CEST 2013, Wed Aug 28 17:00:00 CEST 2013, ?)

仔细看。最初,打印流仅显示第一个元素。中的问号Stream.toString表示流的未知剩余部分。然后我们取前三个元素。有趣的是,我们必须将结果转化为List。正在调用take(3)单独返回几乎不会返回另一个流,从而进一步推迟评估尽可能长的时间。但是再次打印原始流也会显示所有这三个元素,但第四个元素尚不清楚。

让我们做一些更高级的事情吧。假设我们想知道Cron表达式将在什么时候启动第100次?从今天开始的一年内它会发射几次?

1. val hundredth = triggerTimesStream.drop(99).head
2.   
3. val calendar = new GregorianCalendar()
4. calendar.add(Calendar.YEAR, 1)
5. val yearFromNow = calendar.getTime
6.   
7. val countWithinYear = triggerTimesStream.takeWhile(_ before yearFromNow).size

计算第100次射击时间非常简单--只需丢弃前99个日期,取剩下的第一个日期。然而,这个词丢弃有点不幸-这些项被计算并缓存在triggerTimesStream因此,下次我们尝试访问前100个元素中的任何一个时,它们都会立即可用。有趣的事实:Stream[T]在Scala中是不可变的,并且是线程安全的,但是当您迭代它时,它会在内部不断变化。但这是一个实现细节。

你可能想知道我为什么要用takeWhile(...).size不是简单的filter(...).size甚至是count(...)?嗯,从定义来看,我们流中的火灾时间正在增加,所以如果我们只想计算一年内的日期,当我们找到第一个不匹配的日期时,我们可以停止。但这不仅仅是一个微观优化。还记得溪流可以是无限的吗?想想看。同时,我们将把我们的小实用程序移植到Clojure。

Clojure

这条小溪(lazy-seq)在Clojure中:
1. (defn find-trigger-times [expr after]
2.     (let [next (. expr getNextValidTimeAfter after)]
3.         (case next
4.             nil []
5.             (cons next (lazy-seq (find-trigger-times expr next))))))
这几乎是Scala代码的精确翻译,只是它使用了let绑定到捕获getNextValidTimeAfter()结果。识字较少但更紧凑的翻译可以用if-let表格:
(defn find-trigger-times [expr after]
    (if-let [next (. expr getNextValidTimeAfter after)] 
        (cons next (lazy-seq (find-trigger-times expr next)))
        []))
if-let结合条件和绑定。如果表达式绑定到next为假(或nil在我们的例子中),第三行根本不求值。相反,返回第四行(空序列)的结果。这两个实现是等效的。为了完整起见,让我们看看如何获取第100个元素,并计算一年内与Cron表达式匹配的日期数:
(def expr (new CronExpression "0 0 17 L-3W 6-9 ? *"))
(def trigger-times (find-trigger-times expr (new Date)))
 
(def hundredth (first (drop 99 trigger-times)))
 
(def year-from-now (let [calendar (new GregorianCalendar)] 
    (. calendar add Calendar/YEAR 1)
    (. calendar getTime)))
 
(take-while #(.before % year-from-now) trigger-times)
请注意,我们再次使用take-while不是简单的filter

空间和时间复杂性

想象一下使用filter()而不是takeWhile()来计算下一年内Cron触发器将发射多少次。请记住,一般的流(特别是我们的Cron流)可以是无限的。简单filter()在一个Stream将一直运行到末尾-这可能永远不会发生在无限流的情况下。即使是像这样简单的方法也是如此size-Stream会越来越多地评估,直到最后。但是很快你的程序就会填满整个堆空间。为什么?因为一旦评估了元素,Stream[T]将缓存它以备以后使用。

一不小心抱住了一只大的Stream是另一个危险:
val largeStream: Stream[Int] = //,..
//...
val smallerStream = largeStream drop 1000000
smallerStream是对没有前一百万个元素的流的引用。但是这些元素仍然缓存在原始文件中largeStream。只要你保留对它的引用,它们就会保存在内存中。那一刻largeStream引用超出范围时,第一百万个元素有资格进行垃圾回收,而流的其余部分仍被引用。

上面的讨论同样适用于Scala和Clojure。正如您所看到的,在处理惰性序列时必须非常小心。它们在函数式语言中非常强大且无处不在-但是"With great power, comes great responsibility"。当您开始处理可能无限的实体时,您必须小心。

iterate

如果您对Clojure或Scala更有经验,您可能会感到奇怪,为什么我没有使用(iterate f x)或者Stream.iterate()。当您拥有无限大的流并且每个元素都可以作为前一个元素的函数进行计算时,这些帮助器方法非常有用。显然,Cron stream不能利用这个方便的工具,因为它如前面所示,是有限的。但为了完整起见,这里有一个短得多的,但是不正确实施使用iterate
def infiniteFindTriggerTimes(expr: CronExpression, after: Date) =
    Stream.iterate(expr getNextValidTimeAfter after){last =>
        expr getNextValidTimeAfter last
    }
.和Clojure:
(defn find-trigger-times [expr after]
    (iterate 
        #(. expr getNextValidTimeAfter %)
        (. expr getNextValidTimeAfter after)))
这两种情况的想法都很简单:我们提供初始元素x(Scala中的第一个参数,Clojure中的第二个参数)和一个函数f它将前一个元素转换为当前元素。换句话说,我们生成以下流:[x, f(x), f(f(x)), f(f(f(x))), ...]

上面的实现一直工作到它们到达流的末尾(如果有的话)。所以为了用一些积极的东西来结束,我们应该用iterate使用天真产生无限的素数流(为这样的理论问题道歉)prime?谓词:
(defn- divisors [x] 
    (filter #(zero? (rem x %))
        (range 2 (inc (Math/sqrt x)))))
(defn- prime? [x] (empty? (divisors x)))
(defn- next-prime [after] 
    (loop [x (inc after)] 
        (if (prime? x) 
            x
            (recur (inc x)))))
(def primes (iterate next-prime 2))
我希望无论是理念还是执行都是清晰的。如果一个数没有任何约数,它就被认为是素数。next-prime返回大于给定值的后续质数。所以(next-prime 2)收益率3,(next-prime 3)赠送5诸若此类。使用此函数,我们可以构建primes懒惰序列,只需提供第一个素数和next-prime功能。

结论

懒惰序列(或流)是伟大的抽象,不可能或乏味地用命令式语言表示。它们感觉像是普通的列表,但只有在需要的时候才会进行评估。Scala和Clojure都非常支持它们,并且它们的行为相似。你可以在流上映射、过滤、剪切等,只要不是真的需要,它们就永远不会真正计算它们的元素。此外,它们缓存已经计算出的值,同时仍然是线程安全的。然而,在处理无穷大时,必须小心。如果您试图无意识地计算无限流的元素或查找不存在的项目(例如primes.find(_ == 10)没有人会救你。

1-getNextValidTimeAfter()全面实施是400 lines long