Spark提供流库来处理来自实时系统的连续数据流。
星火流最初是用运行在星火RDD上的数据流应用编程接口实现的,在那里数据从流源被分成块,处理后发送到目的地。
从火花2开始,火花开发了一个新的模型,这是一个结构化的流,建立在火花SQL引擎的基础上,处理数据帧和数据集。
结构化流利用连续数据流作为一个无界表,当从流中处理事件时,该表会不断更新。
火花流应用程序可以使用对这种无界数据执行各种计算的SQL查询来实现。
结构化流处理一些挑战,如一次性流处理、增量更新等。
结构化流基于从源获取数据的触发间隔来轮询数据。将结果数据集写入接收器时需要输出模式。它支持追加模式(只有添加到结果表中的新数据元素才会被写入接收器)、更新模式(只有结果表中更新的数据元素才会被写入接收器)、完成模式(结果表中的所有项目都会被写入接收器)。
结构化流支持以下内置数据源。
文件来源:允许读取放在特定目录的文件。支持的格式有文本、CSV、拼花、JSON
卡夫卡来源:流库提供卡夫卡消费者从卡夫卡经纪人读取数据。这在生产中被大量使用。
套接字源:可以使用UTF-8格式的套接字连接从套接字读取数据
支持的各种内置水槽如下
文件接收器:将输出存储到目录中
卡夫卡接收器:在卡夫卡中存储一个或多个主题的输出
控制台接收器:将输出打印到控制台,用于调试目的
内存接收器:输出作为内存表存储在内存中,用于调试目的
Foreach接收器:对输出中的记录运行临时计算
结构化流通过使用检查点保存作业状态并从失败阶段重新启动作业来提供容错。这也适用于使用数据流的火花流。
此外,结构化流提供了以下基于条件的容错恢复机制。
在这个演示中,我们将看到火花流,使用文件源进行一些计算,并生成输出到控制台接收器。
此用例包含与员工相关的示例csv数据集,包含“empId,empName,department”字段。csv格式的数据将放在文件夹的特定位置。
火花流内置文件源监听目录更新事件通知,并将数据传递给计算层进行所需的分析。在本例中,输出被流式传输到控制台。
以下示例是火花结构化流程序,它根据文件流数据计算特定部门的员工人数
package com.sparkstreaming;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
public class FileStream {
public static void main(String[] args) throws Exception {
//build the spark sesssion
SparkSession spark = SparkSession.builder().appName("spark streaming").config("spark.master", "local")
.config("spark.sql.warehouse.dir", "file:///app/").getOrCreate();
//set the log level only to log errors
spark.sparkContext().setLogLevel("ERROR");
//define schema type of file data source
StructType schema = new StructType().add("empId", DataTypes.StringType).add("empName", DataTypes.StringType)
.add("department", DataTypes.StringType);
//build the streaming data reader from the file source, specifying csv file format
Dataset<Row> rawData = spark.readStream().option("header", true).format("csv").schema(schema)
.csv("D:/streamingfiles/*.csv");
rawData.createOrReplaceTempView("empData");
//count of employees grouping by department
Dataset<Row> result = spark.sql("select count(*), department from empData group by department");
//write stream to output console with update mode as data is being aggregated
StreamingQuery query = result.writeStream().outputMode(OutputMode.Update()).format("console").start();
query.awaitTermination();
}
}
输出以批处理方式进行,其中第一批与第一个文件相关,第二批与第二个文件相关,等等
第一个csv文件包含以下数据
第二个csv文件包含以下数据
流式查询的输出如下所示,其中计数按部门名称分组。第二批数据根据键值与第一批数据聚合,输出在第二批中更新。
在这个演示中,我们将看到卡夫卡经纪人的火花结构化流读取的实现,以及批处理的2分钟窗口期间的计算聚合
在这个用例中,网络应用程序不断地向卡夫卡经纪人生成用户的登录会话持续时间。
使用火花流程序,对于每2分钟的窗口,我们计算用户登录到网站的会话持续时间的总和
下面是Java中计算基于窗口的聚合的火花流程序
x
package com.sparkstreaming;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
public class DataStream {
public static void main(String[] args) throws StreamingQueryException
{
//set the hadoop home directory for kafka source
System.setProperty("hadoop.home.dir", "d:/winutils");
SparkSession session = SparkSession.builder()
.master("local[*]")
.appName("structuredViewingReport")
.getOrCreate();
session.sparkContext().setLogLevel("ERROR");
//define UDF to parse kafka message that can be passed to Spark SQL
session.udf().register("sessionDurationFn", new UDF1<String, Long>() {
public Long call(String messageValue) throws Exception {
String[] strArr = messageValue.split(",");
//returns the session duration value from Kafka message which is the first value in the coma delimited string value passed fron Kafka broker
return Long.parseLong(strArr[0]);
}
}, DataTypes.LongType);
session.udf().register("userNameFn", new UDF1<String, String>() {
public String call(String messageValue) throws Exception {
String[] strArr = messageValue.split(",");
//returns user name value from Kafka message which is the second value in the coma delimited string value passed fron Kafka broker
return strArr[1];
}
}, DataTypes.StringType);
//define kafka streaming reader
Dataset<Row> df = session.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "sessiondata")
.load();
// start dataframe operations
df.createOrReplaceTempView("session_data_init");
// key, value, timestamp are the core attributes in the kafka message. Value contains the coma delimited string with sessionduration,userid value format
Dataset<Row> preresults = session.sql("select sessionDurationFn(cast (value as string)) as session_duration, userNameFn(cast (value as string)) as userName,timestamp from session_data_init");
preresults.createOrReplaceTempView("session_data");
//compute sum of session duration grouping on 2 minute window and userName
Dataset<Row> results =
session.sql("select window,sum(session_duration) as session_duration,userName from session_data group by window(timestamp,'2 minutes'),userName");
//log the results to console
StreamingQuery query = results
.writeStream()
.format("console")
.outputMode(OutputMode.Update())
.option("truncate", false)
.option("numRows", 50)
.start();
query.awaitTermination();
}
}
本例中的卡夫卡经纪人以键值对的形式发送消息,其中值是以会话持续时间和用户名值分隔的coma字符串。
上述程序的输出包含窗口计算的批处理序列
这些用例是作为maven项目构建的。以下是在pom.xml中添加的依赖项
xxxxxxxxxx
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.2.0</version>
</dependency>
以下是火花流应用中需要考虑的一些性能提示
1.在上面卡夫卡源的火花流输出中,有一些延迟到达的数据。例如,如下所示,用户19数据的第一个版本可能应该到达batch1,但是它已经到达批处理2进行计算。
存在延迟到达数据的情况,并且必须对较早的窗口数据执行该数据的计算。在这种情况下,较早窗口数据的结果存储在内存中,然后与较晚到达的数据聚合在一起。像火花流这样的框架负责这个过程。但是,由于历史数据存储在内存中,直到丢失的数据到达,这可能会导致内存累积,因此可能会消耗更多内存。在这些场景中,火花流具有水印的特性,当延迟到达的数据超过阈值时,就丢弃它。在某些情况下,由于丢弃这些值,业务结果可能不匹配。为了避免这些类型的问题,必须实现自定义功能来检查数据的时间戳,然后将其存储在HDFS或任何云本地对象存储系统中,以对数据执行批处理计算,而不是应用水印功能。这种实现导致了复杂性。
2.在上述卡夫卡源的火花流输出中,存在一些数据计算缓慢的性能滞后。当我们打开spark控制台并查看任务和作业信息时,我们可以预测此问题的根本原因。
这项工作耗时超过1.5分钟
当我们打开已完成作业的任务信息时,我们可以看到有许多分区是用0个洗牌任务创建的。这些分区上的这些虚拟任务将花费一些时间来启动和停止,从而增加了导致延迟的总处理时间
默认情况下,会创建200个分区,如上所示。执行分组操作时,边处理边触发使用混洗。在洗牌过程中,将创建数据分区,每个分区将被分配任务。火花SQL必须决定使用多少分区。在这些情况下,在随机读取期间会创建更多的分区,并且许多分区将没有数据可处理。这是因为在输出中没有很多键作为分区大小。这些分区上的虚拟任务需要一些时间来启动和停止,这样会增加总处理时间,从而导致延迟。建议在火花代码中设置以下属性。此属性确定在为联接或聚合重排数据时使用的分区数量
xxxxxxxxxx
session.conf().set("spark.sql.shuffle.partitions", "10");
使用此参数,我们可以观察到作业性能已经提高,并且创建了最佳数量的分区,如下所示,可从火花控制台查看
通过这种方式,我们可以在实时应用程序中利用火花结构化流,并从基于流数据的优化火花SQL计算中获益。