我们将简要地从我们的用例开始:获取能量数据并运行Apache Spark作业作为流程的一部分。
我们将使用新的(在Apache NiFi 1.5/HDF 3.1中)ExecuteSparkInteractive处理器和LivyController来实现这种集成。正如我们在本文的第一部分中提到的那样,设置它非常容易。
由于这是一个现代Apache NiFi项目,因此我们在代码上使用版本控制:
在一台本地机器上,我正在用一个Python脚本通过Wi-Fi与一个电量传感器通话。该代码经过处理,清理,并通过S2S over HTTP发送到云托管的Apache NiFi实例。
在云中,我们接收推送的消息。
一旦我们打开Spark It Up处理器组,我们就有了处理数据的流程。
所有文件的路径:
大电压路径:
我们可以获取所有元数据属性并将其发送到某个地方,或者将其存储为JSON文件。
我们在Apache Zeppelin中测试了我们的PySpark程序,然后将其复制到我们的处理器中。
我们的ExecutesPark交互式处理器:
在QueryProcessor中,我们向Apache Spark executor发送电压较大的消息,以运行PySpark作业来进行更多的处理。
一旦我们通过Apache Livy提交了一个作业,我们现在就能够通过详细的Apache Livy UI屏幕和Spark屏幕在执行过程中和执行之后看到该作业。在下面的Apache Livy UI屏幕中,我们可以看到执行的PySpark代码及其输出。
Apache Livy UI:
Apache Spark Jobs UI:Jobs
Apache Spark作业UI:SQL
Apache Spark Jobs UI:Executors
Apache Zeppelin SQL数据搜索:
Apache NIFI自动生成的Hive/Spark SQL表DDL:
下面是与本文相关的源代码:
PySpark代码:
shdf = spark.read.json("hdfs://yourhdp264server:8020/spark2-history")
shdf.printSchema()
shdf.createOrReplaceTempView("sparklogs")
stuffdf = spark.sql("SELECT * FROM sparklogs")
stuffdf.count()
这是一个相当简单的PySpark应用程序,用于读取Spark2历史的JSON结果,打印从中推断出的模式,然后进行简单的选择和计数。我们可以很容易地在那里进行火花机器学习或其他处理。您可以通过PySpark运行Python2.x或3.x来实现这一点。我在运行Centos 7的HDP 2.6.4集群上托管的Apache Spark 2.2.0中运行此程序。有趣的部分是,每次我运行这个Spark作业时,它都会产生更多的结果供它读取。我可能应该在Apache NiFi中读一下这篇日志,但这是一个有趣的小例子。显然,您可以在这里运行任何类型的作业,我的下一篇文章将讨论如何通过Apache Livy和Apache Nifi运行Apache MXNet和Spark MLib作业。
简要说明一下,现在您有很多使用模式的选项:
能源数据架构:
inferred.avro.schema
{ "type" : "record", "name" : "smartPlug", "fields" : [ { "name" : "day19", "type" : "double", "doc" : "Type inferred from '2.035'" }, { "name" : "day20", "type" : "double", "doc" : "Type inferred from '1.191'" }, { "name" : "day21", "type" : "double", "doc" : "Type inferred from '0.637'" }, { "name" : "day22", "type" : "double", "doc" : "Type inferred from '1.497'" }, { "name" : "day23", "type" : "double", "doc" : "Type inferred from '1.151'" }, { "name" : "day24", "type" : "double", "doc" : "Type inferred from '1.227'" }, { "name" : "day25", "type" : "double", "doc" : "Type inferred from '1.387'" }, { "name" : "day26", "type" : "double", "doc" : "Type inferred from '1.138'" }, { "name" : "day27", "type" : "double", "doc" : "Type inferred from '1.204'" }, { "name" : "day28", "type" : "double", "doc" : "Type inferred from '1.401'" }, { "name" : "day29", "type" : "double", "doc" : "Type inferred from '1.288'" }, { "name" : "day30", "type" : "double", "doc" : "Type inferred from '1.439'" }, { "name" : "day31", "type" : "double", "doc" : "Type inferred from '0.126'" }, { "name" : "day1", "type" : "double", "doc" : "Type inferred from '1.204'" }, { "name" : "day2", "type" : "double", "doc" : "Type inferred from '1.006'" }, { "name" : "day3", "type" : "double", "doc" : "Type inferred from '1.257'" }, { "name" : "day4", "type" : "double", "doc" : "Type inferred from '1.053'" }, { "name" : "day5", "type" : "double", "doc" : "Type inferred from '1.597'" }, { "name" : "day6", "type" : "double", "doc" : "Type inferred from '1.642'" }, { "name" : "day7", "type" : "double", "doc" : "Type inferred from '0.443'" }, { "name" : "day8", "type" : "double", "doc" : "Type inferred from '0.01'" }, { "name" : "day9", "type" : "double", "doc" : "Type inferred from '0.009'" }, { "name" : "day10", "type" : "double", "doc" : "Type inferred from '0.009'" }, { "name" : "day11", "type" : "double", "doc" : "Type inferred from '0.075'" }, { "name" : "day12", "type" : "double", "doc" : "Type inferred from '1.149'" }, { "name" : "day13", "type" : "double", "doc" : "Type inferred from '1.014'" }, { "name" : "day14", "type" : "double", "doc" : "Type inferred from '0.851'" }, { "name" : "day15", "type" : "double", "doc" : "Type inferred from '1.134'" }, { "name" : "day16", "type" : "double", "doc" : "Type inferred from '1.54'" }, { "name" : "day17", "type" : "double", "doc" : "Type inferred from '1.438'" }, { "name" : "day18", "type" : "double", "doc" : "Type inferred from '1.056'" }, { "name" : "sw_ver", "type" : "string", "doc" : "Type inferred from '\"1.1.1 Build 160725 Rel.164033\"'" }, { "name" : "hw_ver", "type" : "string", "doc" : "Type inferred from '\"1.0\"'" }, { "name" : "mac", "type" : "string", "doc" : "Type inferred from '\"50:C7:BF:B1:95:D5\"'" }, { "name" : "type", "type" : "string", "doc" : "Type inferred from '\"IOT.SMARTPLUGSWITCH\"'" }, { "name" : "hwId", "type" : "string", "doc" : "Type inferred from '\"60FF6B258734EA6880E186F8C96DDC61\"'" }, { "name" : "fwId", "type" : "string", "doc" : "Type inferred from '\"060BFEA28A8CD1E67146EB5B2B599CC8\"'" }, { "name" : "oemId", "type" : "string", "doc" : "Type inferred from '\"FFF22CFF774A0B89F7624BFC6F50D5DE\"'" }, { "name" : "dev_name", "type" : "string", "doc" : "Type inferred from '\"Wi-Fi Smart Plug With Energy Monitoring\"'" }, { "name" : "model", "type" : "string", "doc" : "Type inferred from '\"HS110(US)\"'" }, { "name" : "deviceId", "type" : "string", "doc" : "Type inferred from '\"8006ECB1D454C4428953CB2B34D9292D18A6DB0E\"'" }, { "name" : "alias", "type" : "string", "doc" : "Type inferred from '\"Tim Spann's MiniFi Controller SmartPlug - Desk1\"'" }, { "name" : "icon_hash", "type" : "string", "doc" : "Type inferred from '\"\"'" }, { "name" : "relay_state", "type" : "int", "doc" : "Type inferred from '1'" }, { "name" : "on_time", "type" : "int", "doc" : "Type inferred from '1995745'" }, { "name" : "active_mode", "type" : "string", "doc" : "Type inferred from '\"schedule\"'" }, { "name" : "feature", "type" : "string", "doc" : "Type inferred from '\"TIM:ENE\"'" }, { "name" : "updating", "type" : "int", "doc" : "Type inferred from '0'" }, { "name" : "rssi", "type" : "int", "doc" : "Type inferred from '-34'" }, { "name" : "led_off", "type" : "int", "doc" : "Type inferred from '0'" }, { "name" : "latitude", "type" : "double", "doc" : "Type inferred from '40.268216'" }, { "name" : "longitude", "type" : "double", "doc" : "Type inferred from '-74.529088'" }, { "name" : "index", "type" : "int", "doc" : "Type inferred from '18'" }, { "name" : "zone_str", "type" : "string", "doc" : "Type inferred from '\"(UTC-05:00) Eastern Daylight Time (US & Canada)\"'" }, { "name" : "tz_str", "type" : "string", "doc" : "Type inferred from '\"EST5EDT,M3.2.0,M11.1.0\"'" }, { "name" : "dst_offset", "type" : "int", "doc" : "Type inferred from '60'" }, { "name" : "month1", "type" : "double", "doc" : "Type inferred from '32.674'" }, { "name" : "month2", "type" : "double", "doc" : "Type inferred from '8.202'" }, { "name" : "current", "type" : "double", "doc" : "Type inferred from '0.772548'" }, { "name" : "voltage", "type" : "double", "doc" : "Type inferred from '121.740428'" }, { "name" : "power", "type" : "double", "doc" : "Type inferred from '91.380606'" }, { "name" : "total", "type" : "double", "doc" : "Type inferred from '48.264'" }, { "name" : "time", "type" : "string", "doc" : "Type inferred from '\"02/07/2018 11:17:30\"'" }, { "name" : "ledon", "type" : "boolean", "doc" : "Type inferred from 'true'" }, { "name" : "systemtime", "type" : "string", "doc" : "Type inferred from '\"02/07/2018 11:17:30\"'" } ] }
Python源码(更新为包含31天):
from pyHS100 import SmartPlug, SmartBulb
#from pprint import pformat as pf
import json
import datetime
plug = SmartPlug("192.168.1.203")
row = { }
emeterdaily = plug.get_emeter_daily(year=2017, month=12)
for k, v in emeterdaily.items():
row["day%s" % k] = v
emeterdaily = plug.get_emeter_daily(year=2018, month=1)
for k, v in emeterdaily.items():
row["day%s" % k] = v
emeterdaily = plug.get_emeter_daily(year=2018, month=2)
for k, v in emeterdaily.items():
row["day%s" % k] = v
hwinfo = plug.hw_info
for k, v in hwinfo.items():
row["%s" % k] = v
sysinfo = plug.get_sysinfo()
for k, v in sysinfo.items():
row["%s" % k] = v
timezone = plug.timezone
for k, v in timezone.items():
row["%s" % k] = v
emetermonthly = plug.get_emeter_monthly(year=2018)
for k, v in emetermonthly.items():
row["month%s" % k] = v
realtime = plug.get_emeter_realtime()
for k, v in realtime.items():
row["%s" % k] = v
row['alias'] = plug.alias
row['time'] = plug.time.strftime('%m/%d/%Y %H:%M:%S')
row['ledon'] = plug.led
row['systemtime'] = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')
json_string = json.dumps(row)
print(json_string)
输出示例:
{"text\/plain":"root\n |-- App Attempt ID: string (nullable = true)\n |-- App ID: string (nullable = true)\n |-- App Name: string (nullable = true)\n |-- Block Manager ID: struct (nullable = true)\n | |-- Executor ID: string (nullable = true)\n | |-- Host: string (nullable = true)\n | |-- Port: long (nullable = true)\n |-- Classpath Entries: struct (nullable = true)\n | |-- \/etc\/hadoop\/conf\/: string (nullable = true)\n | |-- \/etc\/hadoop\/conf\/secure: string (nullable = true)\n | |-- \/etc\/zeppelin\/conf\/external-dependency-conf\/: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_conf__: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/JavaEWAH-0.3.2.jar: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/RoaringBitmap-0.5.11.jar: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/ST4-4.0.4.jar: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/activation-1.1.1.jar: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/aircompressor-0.8.jar: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/antlr-2.7.7.jar: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/antlr-runtime-3.4.jar: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/antlr4-runtime-4.5.3.jar: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/
Shell提示:Apache MXnet可能会有一些警告发送到stderr。我不想要这些,所以将它们发送到/dev/null:
python3 -W ignore analyze.py 2>/dev/null