使用阿帕奇API进行情感分析的流式最大似然管道:卡夫卡、火花和演练(第1部分)


对社交媒体、电子邮件、支持票、聊天、产品评论和建议的文本挖掘和分析已经成为几乎所有行业垂直行业研究数据模式的宝贵资源,以帮助企业获得洞察力、了解客户、预测和增强客户体验、定制营销活动以及帮助决策。

情感分析使用机器学习算法来确定文本内容的正面或负面程度。情感分析的示例用例包括:

  • 快速理解客户评论的语气
    • 了解顾客对产品或服务的喜欢或不喜欢
    • 了解什么会影响新客户的购买决策
    • 给企业市场意识
    • 尽早解决问题
  • Understanding stock market sentiment获得对金融信号预测的见解(见本例:http://sentdex.com/financial-analysis/)。
  • 确定人们对客户支持的看法
  • 社交媒体监控
  • 品牌/产品/公司知名度/声誉/感知监控
  • 不满意的客户检测监控和警报
  • 营销活动监控/分析
  • 客户服务意见监控/分析
  • 品牌情感态度分析
  • 客户反馈分析
  • 竞争情绪分析
  • 品牌影响者监控

手动分析客户或潜在客户生成的大量文本非常耗时;机器学习效率更高,通过流式分析,可以实时提供见解。

这是一系列文章中的第一篇,讨论了将流数据与机器学习和快速存储相结合的数据管道体系结构。在第一部分,我们将探索使用火花机器学习数据管道的情感分析。我们将使用亚马逊产品评论数据集,并建立一个机器学习模型,将评论分为正面或负面。在本教程的第二部分,我们将使用这种带有流数据的机器学习模型对文档进行实时分类。第二篇文章将讨论使用已保存的模型和流数据来进行产品情感的实时分析,将结果存储在MapR数据库中,并使它们可以快速地用于Spark和Drill SQL。

在这篇文章中,我们将讨论以下内容:

  • 分类和情感分析概念概述
  • 从文本文档构建特征向量
  • 使用逻辑回归训练机器学习模型来分类正面和负面评论
  • 评估和保存机器学习模型

分类

分类是一系列有监督的机器学习算法,根据标记的数据(如电子邮件主题和消息文本)识别项目属于哪个类别(如电子邮件是否是垃圾邮件)。分类的一些常见用例包括信用卡欺诈检测、电子邮件垃圾邮件检测和情感分析。

分类采用一组具有已知标签和预定特征的数据,并根据这些信息学习如何给新记录贴标签。特征是可以用来进行预测的属性。为了构建一个分类器模型,您需要探索和提取对分类贡献最大的特征。

让我们通过一个情感分析的例子来对文本进行正面或负面的分类。

  • 我们试图预测什么?
    • 在本例中,客户评论评级用于将评论标记为肯定或否定。4到5颗星的评论被认为是正面评论,1到2颗星的评论被认为是负面评论。
  • 你可以用来做预测的属性是什么?
    • 评论文本词被用作发现正面或负面相似性的特征,以便将客户文本情感分类为正面或负面。

机器学习工作流

使用机器学习是一个迭代过程,包括:

  1. 数据发现和模型创建
    • 历史数据分析
    • 由于格式、大小或结构的原因,识别传统分析或数据库不使用的新数据源
    • 跨多个数据源收集、关联和分析数据
    • 了解并应用正确的机器学习算法,从数据中获取价值
    • 训练、测试和评估机器学习算法的结果以建立模型
  2. 在生产中使用模型进行预测
  3. 数据发现和用新数据更新模型

特征抽出

特征是数据中有趣的属性,可以用来进行预测。特征工程是将原始数据转换成机器学习算法的输入的过程。为了在火花机器学习算法中使用,必须将特征放入特征向量中,特征向量是代表每个特征值的数字向量。为了建立一个分类器模型,你需要提取和测试以找到对分类贡献最大的特征。

用于文本特征提取的阿帕奇火花

术语频率-反向文档频率特征提取器SparkMLlib可用于将文本单词转换为特征向量。TF-IDF计算单个文档中最重要的单词,而不是一组文档。对于文档集合中的每个单词,它计算:

  • 术语频率(TF),即一个单词在特定文档中出现的次数
  • 文档频率(DF),即一个单词在一组文档中出现的次数
  • 术语频率-反向文档频率(TF-IDF),衡量文档中某个单词的重要性(该单词在该文档中出现频率很高,但在文档集合中却很少出现)

例如,如果您收集了关于自行车配件的评论,那么评论中的“退回”一词对于该文档来说比“自行车”一词更重要在下面的简单示例中,有一个正文本文档和一个负文本文档,带有单词token‘love’、‘bike’和‘returned’(在过滤掉诸如‘this’和‘I’之类的不重要的单词之后)。显示了TF、DF和TF-IDF计算。单词“bike”在2个文档中的字长为1(每个文档中的字数),文档频率为2(一组文档中的字数),字长-字长为(字长除以字长)。

逻辑回归

Logistic regression is a popular method to predict a binary response.这是预测结果概率的广义线性模型的一个特例。逻辑回归通过估计概率来测量Y“标签”和X“特征”之间的关系a logistic function.该模型预测一个概率,用于预测标签类别。

在我们的文本分类案例中,逻辑回归试图预测评论文本为正或负的概率,给定标签和特征向量的TF-IDF值。逻辑回归通过将每个TF-IDF特征乘以一个权重,并通过一个sigmoid函数传递该和,为文本集合中的每个单词找到最佳匹配权重,该函数将输入x转换为输出y,一个介于0和1之间的数字。换句话说,逻辑回归可以理解为发现parameters that best fit:

逻辑回归具有以下优点:

  • 可以处理稀疏数据
  • 快速训练
  • 重量可以解释
    • 正权重将对应于正的单词
    • 负权重将对应于负的单词

数据探索和特征提取

我们将使用亚马逊运动和户外产品评论数据的数据集,您可以在此下载:http://jmcauley.ucsd.edu/data/amazon/。数据集具有以下架构;我们将使用红色突出显示的字段进行情绪分析:

审核人—审核人的标识,例如A2SUAM1J3GNN3B
产品标识,例如0000013714
审阅者姓名—审阅者的姓名
有帮助性—评论的帮助性评级,例如2/3
reviewText—审查文本
overall—产品的等级
summary—审查摘要
unixReviewTime —审阅时间(Unix时间)
审核时间—审核时间(原始)

数据集具有以下JSON格式:

{
    "reviewerID": "A1PUWI9RTQV19S",
    "asin": "B003Y5C132",
    "reviewerName": "kris",
    "helpful": [0, 1],
    "reviewText": "A little small in hind sight, but I did order a .30 cal box. Good condition, and keeps my ammo organized.",
    "overall": 5.0,
    "summary": "Nice ammo can",
    "unixReviewTime": 1384905600,
    "reviewTime": "11 20, 2013"
}

在这种情况下,我们将使用逻辑回归来预测标签的阳性与否,基于以下几点:

标签:

  • 总体—产品评级4-5 = 1正面
  • 总体—产品1-2的评级= 0否定

功能:

  • 审查文本+审查摘要→ TF-IDF功能

使用火花毫升包

Spark ML提供了一套建立在数据框架之上的统一的高级应用编程接口,目的是使机器学习变得可扩展和简单。将最大似然语言应用编程接口构建在数据框架之上,提供了分区数据处理的可伸缩性,同时也为数据操作提供了简单的SQL。

我们将使用最大似然管道将数据通过变压器,以便提取特征,并使用估计器生成模型。

  • 变形金刚:变形金刚是一种变形金刚的算法DataFrame变成另一个DataFrame。我们将使用变形金刚来获得DataFrame带有特征向量列。
  • 估计量:估计量是一种算法,可以适合于DataFrame生产变压器。我们将使用一个估计器来训练一个模型,它可以转换输入数据来得到预测。
  • 流水线:流水线将多个转换器和评估器链接在一起,以指定一个最大似然工作流。

将文件中的数据加载到数据帧中

第一步是将我们的数据加载到DataFrame。下面,我们specify the data source format and path to load into a DataFrame。接下来,我们使用withColumn方法来添加一个将审阅摘要和审阅文本组合在一起的列,然后删除不需要的列。

import org.apache.spark._
import org.apache.spark.ml._
import org.apache.spark.sql._

var file ="/user/mapr/data/revsporttrain.json"

val df0  = spark.read.format("json")
 .option("inferSchema", "true")
 .load(file)

val df = df0.withColumn("reviewTS",
  concat($"summary", lit(" "),$"reviewText"))
 .drop("helpful")
 .drop("reviewerID")
 .drop("reviewerName")
 .drop("reviewTime")

DataFrame printSchema显示模式:

df.printSchema

root
 |-- asin: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)
 |-- reviewTS: string (nullable = true)

DataFrame show方法显示前20行或指定的行数:

df.show(5)

汇总统计

火花数据帧包括一些built-in functions用于统计处理。这describe()函数对所有数值列执行汇总统计计算,并将它们作为DataFrame。下面,我们分析产品评级整体栏目:

df.describe("overall").show

result:
+-------+------------------+
|summary|           overall|
+-------+------------------+
|  count|            200000|
|   mean|          4.395105|
| stddev|0.9894654790262587|
|    min|               1.0|
|    max|               5.0|
+-------+------------------+

在下面的代码中,我们过滤掉中性等级(=3),然后是火花Bucketizer用于将标签0/1列添加到数据集,用于正面(总体评分> =4)和非正面(总体评分< 4)评论。然后,显示结果总计数。按标签列对数据进行分组,并计算每个组中的实例数,结果显示阳性样本的数量大约是非阳性样本的13倍。

val df1 = df.filter("overall !=3")

val bucketizer = new Bucketizer()
.setInputCol("overall")
.setOutputCol("label")
.setSplits(Array(Double.NegativeInfinity, 4.0,
 Double.PositiveInfinity))

val df2= bucketizer.transform(df1)

df2.groupBy("overall","label").count.show

result:
+-------+-----+------+
|overall|label| count|
+-------+-----+------+
|    2.0|  0.0|  6916|
|    5.0|  1.0|127515|
|    1.0|  0.0|  6198|
|    4.0|  1.0| 43303|
+-------+-----+------+

分层抽样

为了确保我们的模型对负样本敏感,我们可以使用分层抽样将两种样本类型放在同一基础上。数据框架sampleBy()当提供要返回的每个样本类型的分数时,函数会执行此操作。这里,我们保留所有的负实例,但是将负实例下采样到10%,然后显示结果。

val fractions = Map(1.0 -> .1, 0.0 -> 1.0)
val df3 = df2.stat.sampleBy("label", fractions, 36L)
df3.groupBy("label").count.show

result:

+-----+-----+
|label|count|
+-----+-----+
|  0.0|13114|
|  1.0|17086|
+-----+-----+

下面,数据分为训练数据集和测试数据集:80%的数据用于训练模型,20%用于测试。

// split into training and test dataset
val splitSeed = 5043
val Array(trainingData, testData) = df3.randomSplit(Array(0.8, 0.2), splitSeed)

特征提取和流水线操作

ML包需要将标签和特征向量作为列添加到输入中DataFrame。我们建立了一个管道,通过变压器传递数据,以便提取特征和标签。

RegexTokenizer获取输入文本列并返回DataFrame使用提供的正则表达式模式将文本的一个附加列拆分成一个单词数组。这StopWordsRemover过滤掉应该被排除的单词,因为这些单词经常出现,没有太多的含义——例如,“我”,“是”,“the”

在下面的代码中RegexTokenizer将包含审阅和摘要文本的列拆分为包含一系列单词的列,然后由StopWordsRemover

val tokenizer = new RegexTokenizer()
.setInputCol("reviewTS")
.setOutputCol("reviewTokensUf")
.setPattern("\\s+|[,.()\"]")

val remover = new StopWordsRemover()
.setStopWords(StopWordsRemover
.loadDefaultStopWords("english"))
.setInputCol("reviewTokensUf")
.setOutputCol("reviewTokens")

结果示例RegexTokenizerStopWordsRemover,作为输入列reviewTS添加reviewTokens过滤后的文字列,如下所示:

审查 审阅令牌
阻力很好,但是质量不好,所以它在几周内都很有效,但是在弓步训练中,它突然袭击了我。我喜欢它,并认为它是一个伟大的产品,直到这种情况发生。我注意到带子上有小裂口。这可能就是问题所在。 数组(阻力,好,质量,工作,好,几个星期,冲刺,锻炼,抓拍,喜欢,思考,伟大,产品,发生,注意,小,撕裂,乐队,问题)

ACountVectorizer用于将前一步中的单词标记数组转换为单词标记计数向量。这CountVectorizer正在执行TF-IDF特征提取的TF部分。

val cv = new CountVectorizer()
.setInputCol("reviewTokens")
.setOutputCol("cv")
.setVocabSize(200000)

结果示例CountVectorizer,作为输入列reviewTokens添加cv列的矢量化字数,如下所示。在cv列:56004是TF单词词汇表的大小;第二个数组是单词在单词词汇表中的位置,按照在语料库中的词频排序;第三个数组是reviewTokens文本。

审阅令牌 简历
数组(阻力,好,质量,工作,好,几个星期,冲刺,锻炼,抓拍,喜欢,思考,伟大,产品,发生,注意,小,撕裂,乐队,问题) (56004,[1,2,6,8,13,31,163,168,192,276,487,518,589,643,770,955,1194,1297,4178,19185],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.1.1.0

在...下面cv列,由CountVectorizer(TF-IDF特征提取的TF部分),是IDF的输入。IDF采用从CountVectorizer和降权特征,它们经常出现在文本集合中(TF-IDF特征提取的IDF部分)。输出features列是TF-IDF特征向量,逻辑回归函数将使用它。

// list of feature columns
val idf = new IDF()
.setInputCol("cv")
.setOutputCol("features")

作为输入列的IDF结果示例cv添加features矢量化TF-IDF列如下所示。在cv列,56004是单词词汇表的大小;第二个数组是单词在单词词汇表中的位置,按照在语料库中的词频排序;第三个数组是reviewTokens文本中单词的TF-IDF。

简历 特征
(56004,[1,2,6,8,13,31,163,168,192,276,487,518,589,643,770,955,1194,1297,4178,19185],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.1.1.0 (56004,[1,2,6,8,13,31,163,168,192,276,487,518,589,643,770,955,1194,1297,4178,19185],[1.31674537971118,1.3189555542

我们管道中的最后一个元素是一个估计器,一个逻辑回归分类器,它将在标签和特征的向量上进行训练,并返回一个(转换)模型。

// create Logistic Regression estimator
// regularizer parameters avoid overfitting

val lr = new LogisticRegression()
.setMaxIter(100)
.setRegParam(0.02)
.setElasticNetParam(0.3)

下面,我们把Tokenizer,CountVectorizer、IDF和逻辑回归分类器。流水线将多个转换器和评估器链接在一起,以指定用于训练和使用模型的最大似然工作流。

val steps =  Array( tokenizer, remover, cv, idf,lr)
val pipeline = new Pipeline().setStages(steps)

训练模型

接下来,我们训练逻辑回归模型with elastic net regularization。通过在输入特征和与这些特征相关联的标记输出之间建立关联来训练模型。这pipeline.fit方法返回拟合的管道模型。

val model = pipeline.fit(trainingData)

注意:训练模型的另一个选项是使用网格搜索来调整参数,并使用带有火花交叉验证器和ParamGridBuilder的k重交叉验证来选择最佳模型,您可以在complimentary MapR ebook, Getting Started with Spark 2.x

接下来,我们可以得到CountVectorizerLogisticRegression从拟合的管道模型中建模,以便打印出文本词汇中单词的系数权重(单词特征重要性)。

// get vocabulary from the CountVectorizer
val vocabulary = model.stages(2)
.asInstanceOf[CountVectorizerModel]
.vocabulary

// get the logistic regression model
val lrModel = model.stages.last
.asInstanceOf[LogisticRegressionModel]

// Get array of coefficient weights
val weights = lrModel.coefficients.toArray

// create array of word and corresponding weight
val word_weight = vocabulary.zip(weights)

// create a dataframe with word and weights columns
val cdf = sc.parallelize(word_weight)
.toDF("word","weights")

回想一下,逻辑回归生成公式的系数权重,以预测特征x(在本例中为单词)出现的概率,从而最大化结果Y、1或0(在本例中为正面或负面文本情感)的概率。权重可以解释为:

  • 正权重将对应于正的单词
  • 负权重将对应于负的单词

下面,我们按降序排列权重,以显示最积极的词。结果显示,“好”、“完美”、“简单”、“有效”和“优秀”是最重要的正面词汇。

// show the most positive weighted words
cdf.orderBy(desc("weights")).show(10)

result:
+---------+-------------------+
|     word|             weight|
+---------+-------------------+
|    great| 0.6078697902359276|
|  perfect|0.34404726951273945|
|excellent|0.28217372351853814|
|     easy|0.26293906850341764|
|     love|0.23518819188672227|
|    works|  0.229342771859023|
|     good| 0.2116386469012886|
|   highly| 0.2044040462730194|
|     nice|0.20088266981583622|
|     best|0.18194893152633945|
+---------+-------------------+

下面,我们按升序对权重进行排序,以显示最负面的单词。结果显示,“返回”、“贫穷”、“浪费”和“无用”是最重要的负面词汇。

// show the most negative sentiment words
cdf.orderBy("weights").show(10)

result:
+-------------+--------------------+
|         word|              weight|
+-------------+--------------------+
|     returned|-0.38185206877117467|
|         poor|-0.35366409294425644|
|        waste| -0.3159724826017525|
|      useless| -0.2914292653060789|
|       return| -0.2724012497362986|
|disappointing| -0.2666580559444479|
|        broke| -0.2656765359468423|
| disappointed|-0.23852780960293438|
|    returning|-0.22432617475366876|
|         junk|-0.21457169691127467|
+-------------+--------------------+

预测和模型评估

模型的性能可以通过使用没有用于任何训练的测试数据集来确定。我们改变了测试DataFrame使用管道模型,它将通过测试数据,根据管道步骤,通过特征提取阶段,用逻辑回归模型估计,然后在新的一列中返回标签预测DataFrame

val predictions = model.transform(testData)

BinaryClassificationEvaluator提供一个度量标准来衡量一个合适的模型在测试数据上的表现。该评估器的默认度量是ROC曲线下的面积。该区域衡量测试正确区分真阳性和假阳性的能力。一个随机预测值应该是0 . 5。该值越接近1,其预测就越好。

下面,我们传递预测DataFrame(它有一个rawPrediction列和标签列)BinaryClassificationEvaluator,返回. 93作为ROC曲线下的面积。

val evaluator = new BinaryClassificationEvaluator()  
val areaUnderROC = evaluator.evaluate(predictions)

result:  0.9350783400583272

下面,我们再计算一些指标。假阳性/真阳性和阴性预测的数量也很有用:

  • 真正的积极因素是模型正确预测积极情绪的频率。
  • 假阳性是模型错误预测积极情绪的频率。
  • 真正的负面表示模型正确预测负面情绪的频率。
  • 假阴性表明模型错误预测负面情绪的频率。
val lp = predictions.select("label", "prediction")
val counttotal = predictions.count()
val correct = lp.filter($"label" === $"prediction").count()
val wrong = lp.filter(not($"label" === $"prediction")).count()
val ratioWrong = wrong.toDouble / counttotal.toDouble
val lp = predictions.select(  "prediction","label")
val counttotal = predictions.count().toDouble
val correct = lp.filter($"label" === $"prediction")
 .count()
val wrong = lp.filter("label != prediction")
.count()
val ratioWrong=wrong/counttotal
val ratioCorrect=correct/counttotal

val truen =( lp.filter($"label" === 0.0)
 .filter($"label" === $"prediction")
 .count()) /counttotal

val truep = (lp.filter($"label" === 1.0)
 .filter($"label" === $"prediction")
 .count())/counttotal

val falsen = (lp.filter($"label" === 0.0)
 .filter(not($"label" === $"prediction"))
 .count())/counttotal

val falsep = (lp.filter($"label" === 1.0)
 .filter(not($"label" === $"prediction"))
 .count())/counttotal

val precision= truep / (truep + falsep)
val recall= truep / (truep + falsen)
val fmeasure= 2  precision  recall / (precision + recall)
val accuracy=(truep + truen) / (truep + truen + falsep + falsen)

result:
counttotal: 6112.0
correct: 5290.0
wrong: 822.0
ratioWrong: 0.13448952879581152
ratioCorrect: 0.8655104712041884
truen: 0.3417866492146597
truep: 0.5237238219895288
falsen: 0.044829842931937175
falsep: 0.08965968586387435
precision: 0.8538276873833023
recall: 0.9211510791366907
fmeasure: 0.8862126245847176
accuracy: 0.8655104712041886

下面,我们打印出负面情绪概率最高的评论的摘要和评论标记词:

predictions.filter($"prediction" === 0.0)
.select("summary","reviewTokens","overall","prediction")
.orderBy(desc("rawPrediction")).show(5)

result:
+--------------------+--------------------+-------+----------+
|             summary|        reviewTokens|overall|prediction|
+--------------------+--------------------+-------+----------+
|  Worthless Garbage!|[worthless, garba...|    1.0|       0.0|
|Decent but failin...|[decent, failing,...|    1.0|       0.0|
|over rated and po...|[rated, poorly, m...|    2.0|       0.0|
|dont waste your m...|[dont, waste, mon...|    1.0|       0.0|
|Cheap Chinese JUNK! |[cheap, chinese,....|    1.0|       0.0|
+--------------------+--------------------+-------+----------+

下面,我们打印出正面情绪可能性最高的评论的摘要和评论标记词:

predictions.filter($"prediction" === 1.0)
.select("summary","reviewTokens","overall","prediction")
.orderBy("rawPrediction").show(5)

result:
+--------------------+--------------------+-------+----------+
|             summary|        reviewTokens|overall|prediction|
+--------------------+--------------------+-------+----------+
|               great|[great, excellent...|    5.0|       1.0|
|Outstanding Purchase|[outstanding, pur...|    5.0|       1.0|
|A fantastic stov....|[fantastic, stov....|    5.0|       1.0|
|Small But Delight...|[small, delightfu...|    5.0|       1.0|
|Kabar made a good...|[kabar, made, goo...|    5.0|       1.0|
+--------------------+--------------------+-------+----------+

保存模型

我们现在可以将我们安装好的管道模型保存到分布式文件存储中,以便以后在生产中使用。这将特征提取阶段和逻辑回归模型都保存在管道中。

var dir = "/user/mapr/sentmodel/"
model.write.overwrite().save(dir)

保存管道模型的结果是元数据的JSON文件和模型数据的Parquet文件。我们可以用加载命令重新加载模型;原始模型和重新加载的模型是相同的:

val sameModel = org.apache.spark.ml.PipelineModel.load(modeldirectory)

摘要

有很多很好的工具来建立分类模型。Apache Spark为构建业务问题解决方案提供了一个很好的框架,可以从海量的分布式数据集中提取价值。

机器学习算法不能完美地回答所有问题。但是它们确实提供了证据,让人类在解释结果时考虑,假设首先问了正确的问题。

密码

使用Apache Spark来训练模型并得出自己结论的所有数据和代码都位于GitHub中;有关运行代码的更多信息,请参考GitHub“自述文件”。