Apache Spark和Apache NiFi集成(第2部分,共2部分)


我们将简要地从我们的用例开始:获取能量数据并运行Apache Spark作业作为流程的一部分。

我们将使用新的(在Apache NiFi 1.5/HDF 3.1中)ExecuteSparkInteractive处理器和LivyController来实现这种集成。正如我们在本文的第一部分中提到的那样,设置它非常容易。

由于这是一个现代Apache NiFi项目,因此我们在代码上使用版本控制:

在一台本地机器上,我正在用一个Python脚本通过Wi-Fi与一个电量传感器通话。该代码经过处理,清理,并通过S2S over HTTP发送到云托管的Apache NiFi实例。

在云中,我们接收推送的消息。

一旦我们打开Spark It Up处理器组,我们就有了处理数据的流程。

流程概述

  • QueryRecord:根据对流数据的查询确定如何路由。将JSON转换为Apache Avro。

所有文件的路径:

  • updateAttribute:设置架构
  • MergeContent:对我们的数据进行Apache AVRO合并以生成更大的文件。
  • convertavrotoorc:从合并的Apache AVRO文件构建Apache ORC文件。
  • puthdfs:将Apache ORC文件存储在HDP 2.6.4集群上的HDFS目录中。

大电压路径:

  • ExecutesParkInteractive:调用我们的PySpark作业
  • puthDfs:将结果存储到HDFS中。

我们可以获取所有元数据属性并将其发送到某个地方,或者将其存储为JSON文件。

我们在Apache Zeppelin中测试了我们的PySpark程序,然后将其复制到我们的处理器中。







我们的ExecutesPark交互式处理器:

在QueryProcessor中,我们向Apache Spark executor发送电压较大的消息,以运行PySpark作业来进行更多的处理。

一旦我们通过Apache Livy提交了一个作业,我们现在就能够通过详细的Apache Livy UI屏幕和Spark屏幕在执行过程中和执行之后看到该作业。在下面的Apache Livy UI屏幕中,我们可以看到执行的PySpark代码及其输出。

Apache Livy UI:

Apache Spark Jobs UI:Jobs

Apache Spark作业UI:SQL

Apache Spark Jobs UI:Executors

Apache Zeppelin SQL数据搜索:


Apache NIFI自动生成的Hive/Spark SQL表DDL:

下面是与本文相关的源代码:

PySpark代码:

shdf = spark.read.json("hdfs://yourhdp264server:8020/spark2-history")

shdf.printSchema()

shdf.createOrReplaceTempView("sparklogs")

stuffdf = spark.sql("SELECT * FROM sparklogs")

stuffdf.count()


这是一个相当简单的PySpark应用程序,用于读取Spark2历史的JSON结果,打印从中推断出的模式,然后进行简单的选择和计数。我们可以很容易地在那里进行火花机器学习或其他处理。您可以通过PySpark运行Python2.x或3.x来实现这一点。我在运行Centos 7的HDP 2.6.4集群上托管的Apache Spark 2.2.0中运行此程序。有趣的部分是,每次我运行这个Spark作业时,它都会产生更多的结果供它读取。我可能应该在Apache NiFi中读一下这篇日志,但这是一个有趣的小例子。显然,您可以在这里运行任何类型的作业,我的下一篇文章将讨论如何通过Apache Livy和Apache Nifi运行Apache MXNet和Spark MLib作业。

简要说明一下,现在您有很多使用模式的选项:

能源数据架构:

inferred.avro.schema
{ "type" : "record", "name" : "smartPlug", "fields" : [ { "name" : "day19", "type" : "double", "doc" : "Type inferred from '2.035'" }, { "name" : "day20", "type" : "double", "doc" : "Type inferred from '1.191'" }, { "name" : "day21", "type" : "double", "doc" : "Type inferred from '0.637'" }, { "name" : "day22", "type" : "double", "doc" : "Type inferred from '1.497'" }, { "name" : "day23", "type" : "double", "doc" : "Type inferred from '1.151'" }, { "name" : "day24", "type" : "double", "doc" : "Type inferred from '1.227'" }, { "name" : "day25", "type" : "double", "doc" : "Type inferred from '1.387'" }, { "name" : "day26", "type" : "double", "doc" : "Type inferred from '1.138'" }, { "name" : "day27", "type" : "double", "doc" : "Type inferred from '1.204'" }, { "name" : "day28", "type" : "double", "doc" : "Type inferred from '1.401'" }, { "name" : "day29", "type" : "double", "doc" : "Type inferred from '1.288'" }, { "name" : "day30", "type" : "double", "doc" : "Type inferred from '1.439'" }, { "name" : "day31", "type" : "double", "doc" : "Type inferred from '0.126'" }, { "name" : "day1", "type" : "double", "doc" : "Type inferred from '1.204'" }, { "name" : "day2", "type" : "double", "doc" : "Type inferred from '1.006'" }, { "name" : "day3", "type" : "double", "doc" : "Type inferred from '1.257'" }, { "name" : "day4", "type" : "double", "doc" : "Type inferred from '1.053'" }, { "name" : "day5", "type" : "double", "doc" : "Type inferred from '1.597'" }, { "name" : "day6", "type" : "double", "doc" : "Type inferred from '1.642'" }, { "name" : "day7", "type" : "double", "doc" : "Type inferred from '0.443'" }, { "name" : "day8", "type" : "double", "doc" : "Type inferred from '0.01'" }, { "name" : "day9", "type" : "double", "doc" : "Type inferred from '0.009'" }, { "name" : "day10", "type" : "double", "doc" : "Type inferred from '0.009'" }, { "name" : "day11", "type" : "double", "doc" : "Type inferred from '0.075'" }, { "name" : "day12", "type" : "double", "doc" : "Type inferred from '1.149'" }, { "name" : "day13", "type" : "double", "doc" : "Type inferred from '1.014'" }, { "name" : "day14", "type" : "double", "doc" : "Type inferred from '0.851'" }, { "name" : "day15", "type" : "double", "doc" : "Type inferred from '1.134'" }, { "name" : "day16", "type" : "double", "doc" : "Type inferred from '1.54'" }, { "name" : "day17", "type" : "double", "doc" : "Type inferred from '1.438'" }, { "name" : "day18", "type" : "double", "doc" : "Type inferred from '1.056'" }, { "name" : "sw_ver", "type" : "string", "doc" : "Type inferred from '\"1.1.1 Build 160725 Rel.164033\"'" }, { "name" : "hw_ver", "type" : "string", "doc" : "Type inferred from '\"1.0\"'" }, { "name" : "mac", "type" : "string", "doc" : "Type inferred from '\"50:C7:BF:B1:95:D5\"'" }, { "name" : "type", "type" : "string", "doc" : "Type inferred from '\"IOT.SMARTPLUGSWITCH\"'" }, { "name" : "hwId", "type" : "string", "doc" : "Type inferred from '\"60FF6B258734EA6880E186F8C96DDC61\"'" }, { "name" : "fwId", "type" : "string", "doc" : "Type inferred from '\"060BFEA28A8CD1E67146EB5B2B599CC8\"'" }, { "name" : "oemId", "type" : "string", "doc" : "Type inferred from '\"FFF22CFF774A0B89F7624BFC6F50D5DE\"'" }, { "name" : "dev_name", "type" : "string", "doc" : "Type inferred from '\"Wi-Fi Smart Plug With Energy Monitoring\"'" }, { "name" : "model", "type" : "string", "doc" : "Type inferred from '\"HS110(US)\"'" }, { "name" : "deviceId", "type" : "string", "doc" : "Type inferred from '\"8006ECB1D454C4428953CB2B34D9292D18A6DB0E\"'" }, { "name" : "alias", "type" : "string", "doc" : "Type inferred from '\"Tim Spann's MiniFi Controller SmartPlug - Desk1\"'" }, { "name" : "icon_hash", "type" : "string", "doc" : "Type inferred from '\"\"'" }, { "name" : "relay_state", "type" : "int", "doc" : "Type inferred from '1'" }, { "name" : "on_time", "type" : "int", "doc" : "Type inferred from '1995745'" }, { "name" : "active_mode", "type" : "string", "doc" : "Type inferred from '\"schedule\"'" }, { "name" : "feature", "type" : "string", "doc" : "Type inferred from '\"TIM:ENE\"'" }, { "name" : "updating", "type" : "int", "doc" : "Type inferred from '0'" }, { "name" : "rssi", "type" : "int", "doc" : "Type inferred from '-34'" }, { "name" : "led_off", "type" : "int", "doc" : "Type inferred from '0'" }, { "name" : "latitude", "type" : "double", "doc" : "Type inferred from '40.268216'" }, { "name" : "longitude", "type" : "double", "doc" : "Type inferred from '-74.529088'" }, { "name" : "index", "type" : "int", "doc" : "Type inferred from '18'" }, { "name" : "zone_str", "type" : "string", "doc" : "Type inferred from '\"(UTC-05:00) Eastern Daylight Time (US & Canada)\"'" }, { "name" : "tz_str", "type" : "string", "doc" : "Type inferred from '\"EST5EDT,M3.2.0,M11.1.0\"'" }, { "name" : "dst_offset", "type" : "int", "doc" : "Type inferred from '60'" }, { "name" : "month1", "type" : "double", "doc" : "Type inferred from '32.674'" }, { "name" : "month2", "type" : "double", "doc" : "Type inferred from '8.202'" }, { "name" : "current", "type" : "double", "doc" : "Type inferred from '0.772548'" }, { "name" : "voltage", "type" : "double", "doc" : "Type inferred from '121.740428'" }, { "name" : "power", "type" : "double", "doc" : "Type inferred from '91.380606'" }, { "name" : "total", "type" : "double", "doc" : "Type inferred from '48.264'" }, { "name" : "time", "type" : "string", "doc" : "Type inferred from '\"02/07/2018 11:17:30\"'" }, { "name" : "ledon", "type" : "boolean", "doc" : "Type inferred from 'true'" }, { "name" : "systemtime", "type" : "string", "doc" : "Type inferred from '\"02/07/2018 11:17:30\"'" } ] }


Python源码(更新为包含31天):

from pyHS100 import SmartPlug, SmartBulb
#from pprint import pformat as pf
import json
import datetime


plug = SmartPlug("192.168.1.203")

row = { }

emeterdaily = plug.get_emeter_daily(year=2017, month=12)
for k, v in emeterdaily.items():
     row["day%s" % k] = v


emeterdaily = plug.get_emeter_daily(year=2018, month=1)
for k, v in emeterdaily.items():
     row["day%s" % k] = v


emeterdaily = plug.get_emeter_daily(year=2018, month=2)
for k, v in emeterdaily.items():
     row["day%s" % k] = v


hwinfo = plug.hw_info
for k, v in hwinfo.items():
     row["%s" % k] = v


sysinfo = plug.get_sysinfo()
for k, v in sysinfo.items():
     row["%s" % k] = v


timezone = plug.timezone
for k, v in timezone.items():
     row["%s" % k] = v


emetermonthly =  plug.get_emeter_monthly(year=2018)
for k, v in emetermonthly.items():
     row["month%s" % k] = v


realtime = plug.get_emeter_realtime()
for k, v in realtime.items():
     row["%s" % k] = v


row['alias'] = plug.alias
row['time'] =  plug.time.strftime('%m/%d/%Y %H:%M:%S')
row['ledon'] =  plug.led
row['systemtime'] = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')
json_string = json.dumps(row)
print(json_string)


输出示例:

{"text\/plain":"root\n |-- App Attempt ID: string (nullable = true)\n |-- App ID: string (nullable = true)\n |-- App Name: string (nullable = true)\n |-- Block Manager ID: struct (nullable = true)\n |    |-- Executor ID: string (nullable = true)\n |    |-- Host: string (nullable = true)\n |    |-- Port: long (nullable = true)\n |-- Classpath Entries: struct (nullable = true)\n |    |-- \/etc\/hadoop\/conf\/: string (nullable = true)\n |    |-- \/etc\/hadoop\/conf\/secure: string (nullable = true)\n |    |-- \/etc\/zeppelin\/conf\/external-dependency-conf\/: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_conf__: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/JavaEWAH-0.3.2.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/RoaringBitmap-0.5.11.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/ST4-4.0.4.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/activation-1.1.1.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/aircompressor-0.8.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/antlr-2.7.7.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/antlr-runtime-3.4.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/antlr4-runtime-4.5.3.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/


Shell提示:Apache MXnet可能会有一些警告发送到stderr。我不想要这些,所以将它们发送到/dev/null:

python3 -W ignore analyze.py 2>/dev/null


软件

  • Pyspark
  • 大蟒
  • Apache NiFi
  • Apache Spark
  • HDF 3.1
  • HDP 2.6.4
  • Apache蜂巢
  • 阿帕奇·阿夫罗
  • 阿帕奇兽人
  • 阿帕奇·安巴里
  • 阿帕奇齐柏林