Wallaroo中惯用的Python流处理


两年多来,我们一直致力于Wallaroo,这是我们的独立于规模的事件处理系统。当我们在2017年9月将其开源时,我们包含了一个使用Python API编写应用程序的API。这篇博客文章讲述了我们从收到的关于原始API的反馈中学到了什么,以及我们是如何应用这些反馈来改进我们的新API的。

三个月前我写信给a blog post关于我们的Python API的第一次迭代。在那篇帖子的末尾,我写道:“我们的API是新的,我们正在寻找改进的方法。我们有很多自己的想法,但如果您有任何想法,我们将很乐意听取您的意见。”我们从不同的渠道得到了一些反馈,包括Hacker News并与对沃拉鲁感兴趣的人们直接交流。从这些对话中出现的一个主题是,人们觉得API没有以惯用的方式使用Python(Python用户通常将其称为“Pythonic”)。在内部,我们认为API中有一些我们可以改进的地方。

我们接受了反馈,并开始思考是什么原因使得现有的API不具有伸缩性。然后我们模仿了我们的一些想法,并通过重新实现我们的一些示例程序来尝试它们,看看它们是什么感觉。最后,我们致力于解决这些变化带来的一些意想不到的问题。最终结果是一个新的Wallaroo Python API,它比原来的更加简洁和Python化。我们已经得到了一些很好的早期反馈,我们认为它比最初的API有了显著的改进。

原始Python API有什么问题

Wallaroo大量使用Pony类,而最初的Python API紧密地反映了这一点,因为这样做使我们能够更容易地推断API的各个部分如何与使Wallaroo工作的底层Pony对象配合在一起。不幸的是,这意味着该API对Python程序员来说并不自然。

其中一条评论是,API过于冗长。它要求用户创建许多类,其中许多类有一个用于执行操作的方法和另一个用于返回名称的方法。换句话说,这些类中的许多实际上只是带有名称的函数。人们认为仅仅为了得到一个函数就必须定义整个类是愚蠢的。他们是对的。

另一个评论是,在某些地方,开发人员每次实现某些东西时都必须编写相同的代码,即使大部分代码都是相同的。例如,在解码器中,用户必须提供以下方法:

  1. 返回表示消息标头的字节数。
  2. 获取字节数组并返回消息的实际大小。
  3. 获取消息字节并返回消息表示的对象。

以下是解码器过去的样子:

class Decoder(object):
    def header_length(self):
        return 4

    def payload_length(self, bs):
        return struct.unpack(">I", bs)[0]

    def decode(self, bs):
        return bs.decode("utf-8")

解码器唯一真正需要任何逻辑的部分是接受字节并返回消息的部分;第一项可以用整数描述,第二项可以作为参数传递给的字符串来描述struct.unpack(...)告诉它如何将传入的字节转换为数字。经过这些更改,解码器现在看起来如下所示:

@wallaroo.decoder(header_length=4, length_fmt=">I")
def decoder(bs):
    return bs.decode("utf-8")

我们为什么要创建新的API

甚至在我们发表关于原始Python API的博客文章之前,我们就已经讨论了我们可以做些什么来改进API的想法。我们考虑过的一个想法是使用装饰器来减少类的数量,从而减少需要编写的代码量。当我们请求反馈时,有几个人建议使用装饰器来改进API,所以我们觉得我们早先的想法得到了验证。我们继续设计了一个新的基于装饰器的API。

新的Python API

鼓舞人心的例子

我们将从规范的流数据处理应用程序Word Count开始。分析输入文本流,并报告每个单词已被看到的总次数。The example它的全部都在我们的GitHub repository

我们将作出以下假设:

  • 传入的消息将来自一个TCP连接,并被发送到另一个TCP连接。
  • 单词在可以包含零个或多个单词的消息中发送到系统。
  • 传入消息由字符串组成。
  • 传出消息由一个单词和该单词在事件流中出现的次数组成。

在我们的示例中,我们还将状态(每个单词被看到的次数)分成26个分区,每个分区处理以不同字母开头的单词。例如,“acorn”和“acorn”将转到“a”分区,而“bacon”将转到“b”分区。

此应用程序将在消息到达时对其进行处理。这与其他一些围绕微批处理消息而设计的流数据处理系统形成了鲜明对比。这会降低延迟,因为消息处理不会延迟。

沃拉罗的核心抽象

为了理解Python API,了解Wallaroo的核心抽象非常重要:

  • 一段时间内存储的数据的状态累积结果。
  • 计算-将输入转换为输出的代码。
  • 状态计算-接受输入和状态实体,对该输入和状态进行操作(可能进行状态更新),并可选地生成输出的代码。
  • 源-将数据从外部系统输入到应用程序的输入点。
  • 汇点-从应用程序到外部系统的输出点。
  • 解码器-将外部系统的字节流转换为一系列应用程序输入类型的代码。
  • 编码器-将应用程序输出类型转换为字节以发送到外部系统的代码。
  • 流水线-源自源的计算和/或状态计算的序列,并且可选地终止于接收器。
  • 应用程序-管道的集合。

在继续进行时,我们将更详细地介绍这些抽象。

应用程序设置

沃拉鲁称application_setup函数创建表示应用程序的数据结构。

def application_setup(args):
    in_host, in_port = wallaroo.tcp_parse_input_addrs(args)[0]
    out_host, out_port = wallaroo.tcp_parse_output_addrs(args)[0]

    word_partitions = list(string.ascii_lowercase)
    word_partitions.append("!")

    ab = wallaroo.ApplicationBuilder("Word Count Application")
    ab.new_pipeline("Split and Count",
                    wallaroo.TCPSourceConfig(in_host, in_port, decoder))
    ab.to_parallel(split)
    ab.to_state_partition(count_word, WordTotals, "word totals",
        partition, word_partitions)
    ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, encoder))
    return ab.build()

此代码使用前面描述的拓扑创建应用程序。它表示一个由无状态计算组成的管道,称为Split它将一串单词拆分成单独的单词,并进行状态计算,称为CountWord这将更新应用程序的状态并创建表示字数统计的传出消息。以下各节将详细介绍此处使用的对象和函数。

状态分区和状态实体

在本例中,状态是每个单词被看到的次数。要做到这一点,最简单的方法是使用字典,其中关键字是一个单词,与该关键字相关联的值是该单词在事件流中出现的次数。

Wallaroo允许您将状态分区划分为称为状态实体的部分。状态实体是由某种关键字唯一标识的多个状态分区。状态可以按任意数量的键进行分区。唯一的限制是状态实体必须彼此完全隔离,以便可以独立访问和更新。

当发送消息时,Wallaroo将分区函数应用于该消息,以确定将其发送到哪个状态分区。不同的状态实体可以驻留在不同的工作器上,并且当向群集中添加工作器或从群集中删除工作器时,实体可以从一个工作器移动到另一个工作器。这使得随着集群中工作进程数量的增加和减少,可以轻松地向上和向下扩展应用程序。

此示例将状态表示为一个字典,该字典包装在一个知道如何更新它的对象中,并且具有一个返回表示给定单词计数的传出消息对象的方法。

class WordTotals(object):
    def __init__(self):
        self.word_totals = {}

    def update(self, word):
        if self.word_totals.has_key(word):
            self.word_totals[word] = self.word_totals[word] + 1
        else:
            self.word_totals[word] = 1

    def get_count(self, word):
        return WordCount(word, self.word_totals[word])

partition是一个分区函数,它接受一个字符串,如果第一个字符是小写字母,则返回第一个字符;或者"!"如果不是的话。这个@wallaroo.partition必须使用修饰符来指示该函数是分区函数。

@wallaroo.partition
def partition(data):
    if data[0] >= 'a' or data[0] <= 'z':
        return data[0]
    else:
        return "!"

传入消息和解码器

decoder包含将来自TCP流的传入字节解释为表示应用程序内消息的对象的逻辑。在本例中,传入消息表示为字符串。

@wallaroo.decoder(header_length=4, length_fmt=">I")
def decoder(bs):
    return bs.decode("utf-8")

@wallaroo.decoder必须使用装饰符来指示这是解码器。这个header_length参数指定将用于消息长度的字节数。这个length_fmt参数指定长度字节的结构方式,该格式与struct模块。在这种情况下,">I"意味着长度将是大端32位整数。

无状态计算

split是无状态计算。它获取一个字符串并将其拆分成一个字符串列表,列表中的每个字符串代表一个单词。

"why hello world" -> Split -> ["why", "hello", "world"] 

这就是我们要做的split计算如下所示:

@wallaroo.computation_multi(name="split into words")
def split(data):
    punctuation = " !\"#$%&'()*+,-./:;<=>?@[\]^_`{|}~"

    words = []

    for line in data.split("\n"):
        clean_line = line.lower().strip(punctuation)
        for word in clean_line.split(' '):
            clean_word = word.strip(punctuation)
            words.append(clean_word)

    return words

split计算返回单个单词的列表,Wallaroo框架将这些单词作为消息发送到管道中的下一步。Wallaroo负责确保每条消息都被传递到正确的分区。您的应用程序不需要知道数据是如何分区的,也不需要知道哪台计算机持有该分区。这个@wallaroo.computation_multi(...)必须使用修饰符来指示这是一个返回多个传出消息的计算。这个name参数指定Wallaroo在报告有关应用程序的信息时将使用的计算名称。

状态计算

count_word是一种状态计算;它使用传入消息和状态实体来更新新词的字数统计,并返回一条消息供Wallaroo代表其发送。返回的元组中的第二个值向Wallaroo指示应该持久化状态实体,因为它已经更新。

@wallaroo.state_computation(name="Count Word")
def count_word(word, word_totals):
    word_totals.update(word)
    return (word_totals.get_count(word), True)

@wallaroo.state_computation(...)必须使用修饰符来指示这是状态计算。与上面的计算一样,name参数指定Wallaroo在报告有关应用程序的信息时将使用的名称。

传出消息和编码器

在我们的示例中,传出消息在应用程序中表示为一个对象,该对象存储单词和该单词在事件流中出现的次数计数。

class WordCount(object):
    def __init__(self, word, count):
        self.word = word
        self.count = count

encoder包含将此对象转换为随后将在传出TCP连接上发送的字节列表的逻辑。在该示例中,传出消息是字符串WORD => COUNT\n在哪里WORD这个词是否在计算中,并且COUNT是伯爵。

@wallaroo.encoder
def encoder(data):
    return data.word + " => " + str(data.count) + "\n"

@wallaroo.encoder必须使用修饰符来指示此函数是编码器。

此示例使用TCP接收器,但Wallaroo也支持Kafka接收器。以后还会增加其他类型的水槽。

一种可伸缩的事件处理应用程序

此应用程序可以在一个工作器上运行,并且可以通过添加越来越多的工作器进行水平扩展。Wallaroo的灵活性使您可以轻松地适应您的应用程序所需的任何分区策略。请查看我们的文档,了解information about how to run a Wallaroo cluster

看看这个

如果您对自己运行此应用程序感兴趣,请查看Wallaroo documentation以及word count example application这是我们建造的。您将找到有关设置Wallaroo和运行应用程序的说明。再来看看我们的community page注册我们的邮件列表或加入我们的IRC频道,询问您可能有的任何问题。

这个API代表了我们认为是对我们最初的Python API的改进。使用新API编写的应用程序比以前更紧凑、更具可读性,给人的感觉更像Python。虽然我们对这一改进感到非常高兴,但我们知道总有办法让事情变得更好,所以如果您有改进建议,我们会很乐意听取您的意见。请在以下时间与我们联系our mailing list或者our IRC channel

让Wallaroo试一试

我们希望这篇文章能激发你对Wallaroo的兴趣!

如果您刚刚开始,我们建议您试用我们的Docker image,它允许您在短短几分钟内启动并运行Wallaroo。

了解Wallaroo的其他一些很好的方法:

谢谢!我们始终感谢您的坦诚反馈(以及GitHub star)!