阿帕奇尼菲,不是从零开始


如果你还没有听说过,Apache NiFi是Hortonworks在开源社区帮助开发的大数据技术列表中的最新成员。Hadoop是一个静态数据和数据处理平台,而NiFi则是一种使用flow based processing paradigm。如果你想了解更多关于NiFi的历史和背景,看看official overview

乍一看,对于不同背景的开发者来说,NiFi看起来会非常不同。Java程序员认为这是一回事;ETL开发人员想到了别的东西。本文以准备为例our population health solution从几百人到几百万人,突出以下主题:

  • 从Java开发人员的角度来看
  • 从ETL开发人员的角度来看
  • 使用NiFi从头开始重写您现有的解决方案
  • 了解NiFi真正有帮助的地方
  • 调整您现有的Java项目,使其在NiFi中运行,而无需从头开始

如果我真的是那样吗?

大多数开发人员会以两种不同的方式来实现NiFi:

  1. 来自传统的软件编程背景,有一些数据处理经验,但主要侧重于应用程序开发,或
  2. 在数据仓库的背景下,提取、转换和加载(ETL)工具是常态。

富有冒险精神和开放思想的开发人员将会在how to stream the Twitter gardenhose into HDFS只是兴奋地学习新的东西。除非他们已经面临大数据可伸缩性的挑战,否则来自应用程序开发阵营的怀疑论者可能会认为NiFi是一种点击式的时间浪费,配置文件和动态代码将更加灵活。

用于ETL的NiFi

来自ETL阵营的怀疑论者可能会嘲笑NiFi,并将其视为大数据人士试图重新创建ETL轮。在所有这些情况下,与这些开发人员相关联的项目经理可能会发现,当开发人员想要(或者被告知)使用NiFi重写现有代码时,生产率会受到巨大的冲击。

但请放心:不一定非要这样。与其从零开始,以NiFi为基础,倒过来看看你的项目是否不能简单地用NiFi重构自己,这是值得的。在Amitech Solutions和Big Cloud Analytics,我们一直在研究可穿戴健身设备的现实世界场景——这比固定的Twitter教程更适合医疗保健。

健身设备制造商正在使用基于网络的应用编程接口向合作伙伴提供他们设备中的数据。这些供应商的模型都非常相似,并且与许多消费者应用程序接口保持一致:将自己验证为用户,然后使用该会话密钥向应用程序接口查询数据。在医疗保健行业,初创企业正在寻找使用可穿戴设备数据的方法,以帮助为人口健康管理等服务提供信息。

在我们阿米泰克的具体案例中,我们一直在使用一个遗留的代码库,这个代码库是在公司成立初期开发的。它只对几个客户有效,覆盖了几千个人。随着早期试点项目的结束以及商业案例对其他潜在客户变得更加清晰,该解决方案需要扩展到数十万乃至数百万台个人健身设备。这个项目突然进入了大数据领域,大数据需要以各种形式从大量不断增长的设备中获取。

从头开始

作为负责规划解决方案的可伸缩性的首席架构师,我觉得NiFi、Struts、Spark Streaming或类似的技术将是解决方案中明智的一部分,所以我穿上传统的数据集成/ETL心态,看一看可用的NiFi处理器。

list is exhaustive与传统ETL工具中可用的组件种类非常不同。在这篇文章发表时,已经列出了超过135种不同的处理器。当然,这个列表包括了一套完整的与超文本传输协议和休息相关的处理器,我可以用它们来与健身设备供应商的应用编程接口进行通信。因此,我将一系列简单的处理器连接在一起,这些处理器将用户名和密码作为输入,根据应用编程接口进行身份验证以检索授权令牌,将其添加到超文本传输协议报头中,然后在应用编程接口中查询我想要的数据集。

Proctor

耶!一个简单的现实世界中的非推特的使用。一个工作的、实用的应用程序总是值得庆祝的,但是我突然想到,使用这种方法,我们需要使用类似的方法来重建已经为每种类型的设备编写的现有的Java库。这并不难,但是当我们过渡到新模型时,肯定会对项目时间表造成冲击,并对数据完整性造成风险。

如果我不是从零开始

通常,数据仓库和ETL工具供应商建议我们编写您自己的定制组件。毕竟,ETL工具的目标市场是这样一个空间,在这个空间中,工具被专门推销为减少对“容易出错且耗时的”手动编码的需求。当我在writing your own NiFi processor我想到,如果我是完全相反的。它既是开源的,也是为从头开始的可扩展性而设计的。我发现编写一个利用我们现有代码库的定制NiFi处理器是非常合理的。

现有的代码是一个Java程序,每个设备供应商有不同的类,所有的类都有相同的接口,从主数据导出程序中抽象出每个供应商的细微差别。这个界面遵循传统的范例:登录、查询、查询、查询、注销。鉴于我在上面的NiFi中输入了简单的用户名、密码和查询条件参数,创建一个NiFi处理器类来适应NiFi应用编程接口中的现有代码似乎并不重要。下面是实际代码的一个略缩略版本。(实际上,这是全部70行代码。(

我们在Maven POM文件中添加了一些依赖项和一个构建器,Maven生成了需要部署到NiFi中的NAR文件。快速重启后,新处理器会显示在可用处理器列表中,新流程如下所示——比一系列的超文本传输协议和属性解析处理器简单得多:

public void onTrigger(final ProcessContext context, final ProcessSession session) {
final ProcessorLog log = this.getLogger();
final AtomicReference<String> value = new AtomicReference<>();
boolean success = false;
FlowFile flowfile = session.get();
try {
AbstractDevice vendor = null;
String v = context.getProperty(DEVICE).evaluateAttributeExpressions(flowfile).toString();
switch (v) {
// … Instantiate specific device class depending on flow file attribute
default:
log.error(“Invalid device vendor type: ” + v);
throw new ProcessException(“Unable to determine vendor type: ” + v);
}
// … Get various other attributes we need to call the API
// Here’s where we actually query the vendor API
if (vendor.login(userProp, passProp)) {
   vendor.queryVendor(startDate, endDate);
   value.set(vendor.getDataAsString());
   if (!vendor.getDataAsString().contentEquals(“”)) {
     success = true;
   }
}
flowfile = session.write(flowfile, new OutputStreamCallback() {
public void process(OutputStream out) throws IOException {
out.write(value.get().getBytes());
}
});
if (success) {
session.transfer(flowfile, SUCCESS);
} else {
session.transfer(flowfile, FAILURE);
}
}


NiFi如何增加价值?

因此,在这种解决方案的情况下,在不引入大量时间和用新工具重写核心业务逻辑的风险的情况下,调整现有代码以在NiFi框架内运行似乎是相当合理的。那么,将这个现有的过程嵌入到NiFi中有什么好处呢?传统程序中的数据流是:

  1. 向运营数据库查询要处理的人员列表
  2. 对于每个人:
    1. 登录供应商应用编程接口
    2. 向供应商应用编程接口查询数据
    3. 将数据解析为规范化格式
    4. 保存到关系数据库管理系统

这种连续的过程对于成百上千的用户来说很好。这个过程不到一个小时。尽管有一百万用户,这个过程不能在任何合理的时间范围内连续运行。需要几天时间才能完成一天的处理。合理的反应是找到某种并行化过程的方法。

我们可以在多台服务器上设置和管理多个Java程序实例。然而,在这个模型中,在管理基础设施和代码部署方面有许多新的风险。这并不是说这些都是不可克服的,但是它们是NiFi框架很容易适应的一件事情。NiFi还为我们提供了强大的数据来源,无需任何额外的编程。

数据来源日志记录了每个流文件(数据和属性的组合)以及在此过程中该流文件发生的每个转换。

任何曾经参与过数据集成和数据处理应用程序的操作支持和调试的人都可以看到这些数据来源特性的优势。

后续步骤

我们计划在这个项目中做的另一件事是将后端数据存储从其当前的关系数据库管理系统迁移到更灵活、更易于扩展的东西,如HBase或MongoDB。事实证明,将现有的业务逻辑集成到NiFi中会使这个过程变得更加容易。我们可以简单地在NiFi中路由和存储相同的数据,而不是不得不插入现有的Java程序并添加新的类来写入新的数据存储。NiFi已经拥有了以分布式和可扩展的方式完成这一任务的处理器:

最终想法

对于任何有需要扩展的现有应用程序或扩展成本过高的应用程序的人来说,认真考虑一下简单地将当前的业务逻辑打包到一个定制的Apache Nifi处理器中。

  • 超越推特花园软管的例子
  • 为NiFi编写一个定制的处理器相对简单
  • NiFi中的其他处理器使输入和输出更容易适应您现有的代码
  • NiFi中的数据来源对于支持和故障排除是一个非常有价值的特性
  • 如果你创造了广泛有用的东西,ask about contributing为NiFi社区干杯!

NiFi正在启用我们的population health management solution快速跟踪并帮助改善全球数百万人的健康状况。想象一下你可以用NiFi为你的企业和行业做些什么…

关于作者:保罗·波尔是大数据实践的领导者Amitech Solutions。在StampedeCon2016年7月26日至28日在圣路易斯,他将展示更多关于使用NiFi和Hadoop管理和分析来自可穿戴健身设备的数据的详细信息Big Cloud Analytics