博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark ML 特征转换及处理算子实战技巧-Spark商业ML实战
阅读量:6822 次
发布时间:2019-06-26

本文共 15880 字,大约阅读时间需要 52 分钟。

本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,如有任何商业交流,可随时联系。

1 燃烧吧特征转换

1.1 Tokenization 分词器技术(RegexTokenizer)

Tokenization是将文本(例如句子)分割成单词,默认是空格分割 RegexTokenizer是基于正则表达式进行单词分割,默认打分割方式是'\s+',

import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.functions._val sentenceDataFrame = spark.createDataFrame(Seq(  (0, "Hi I heard about Spark"),  (1, "I wish Java could use case classes"),  (2, "Logistic,regression,models,are,neat"))).toDF("id", "sentence")=>  默认匹配的是空格val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")val tokenized = tokenizer.transform(sentenceDataFrame)val countTokens = udf { (words: Seq[String]) => words.length }tokenized.select("sentence", "words").withColumn("tokens", countTokens(col("words"))).show(false)     +-----------------------------------+------------------------------------------+------+|sentence                           |words                                     |tokens|+-----------------------------------+------------------------------------------+------+|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     ||I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     ||Logistic,regression,models,are,neat|[logistic,regression,models,are,neat]     |1     |+-----------------------------------+------------------------------------------+------+        => W 表示匹配出的是除单词之外的任何分隔符val regexTokenizer = newRegexTokenizer().setInputCol("sentence").setOutputCol("words").setPattern("\\W")         val regexTokenized = regexTokenizer.transform(sentenceDataFrame)regexTokenized.select("sentence", "words").withColumn("tokens", countTokens(col("words"))).show(false)scala> regexTokenized.select("sentence", "words").withColumn("tokens", countTokens(col("words"))).show(false)+-----------------------------------+------------------------------------------+------+|sentence                           |words                                     |tokens|+-----------------------------------+------------------------------------------+------+|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     ||I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     ||Logistic,regression,models,are,neat|[logistic, regression, models, are, neat] |5     |+-----------------------------------+------------------------------------------+------+=>  w 表示为 [a-zA-Z0-9],setGaps(false)表示匹配的不是空格,而是直接匹配出单词val regexTokenizer = new RegexTokenizer().setInputCol("sentence").setOutputCol("words").setPattern("\\w+").setGaps(false) val regexTokenized = regexTokenizer.transform(sentenceDataFrame)regexTokenized.select("sentence", "words").withColumn("tokens", countTokens(col("words"))).show(false) regexTokenized.select("sentence", "words").withColumn("tokens", countTokens(col("words"))).show(false) +-----------------------------------+------------------------------------------+------+|sentence                           |words                                     |tokens|+-----------------------------------+------------------------------------------+------+|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     ||I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     ||Logistic,regression,models,are,neat|[logistic, regression, models, are, neat] |5     |+-----------------------------------+------------------------------------------+------+复制代码

1.2 移除停用词

发现 I ,and ,had , a 被移除

import org.apache.spark.ml.feature.StopWordsRemoverval remover = new StopWordsRemover().setInputCol("raw").setOutputCol("filtered")val dataSet = spark.createDataFrame(Seq(  (0, Seq("I", "saw", "the", "red", "balloon")),  (1, Seq("Mary", "had", "a", "little", "lamb")))).toDF("id", "raw")remover.transform(dataSet).show(false)scala> remover.transform(dataSet).show(false)+---+----------------------------+--------------------+|id |raw                         |filtered            |+---+----------------------------+--------------------+|0  |[I, saw, the, red, balloon] |[saw, red, balloon] ||1  |[Mary, had, a, little, lamb]|[Mary, little, lamb]|+---+----------------------------+--------------------+## 要求的默认停用词列表requirement failed: US is not in the supported language list: french, spanish, german, finnish, turkish, english, russian, norwegian, dutch, danish, hungarian, italian, swedish, portuguese.scala> StopWordsRemover.loadDefaultStopWords("english")res17: Array[String] = Array(i, me, my, myself, we, our, ours, ourselves, you, your, yours, yourself, yourselves, he, him, his, himself, she, her, hers, herself, it, its, itself, they, them, their, theirs, themselves, what, which, who, whom, this, that, these, those, am, is, are, was, were, be, been, being, have, has, had, having, do, does, did, doing, a, an, the, and, but, if, or, because, as, until, while, of, at, by, for, with, about, against, between, into, through, during, before, after, above, below, to, from, up, down, in, out, on, off, over, under, again, further, then, once, here, there, when, where, why, how, all, any, both, each, few, more, most, other, some, such, no, nor, not, only, own, same, so, than, too, very, s, t, can, will, just, don, should, now, i'll, you'll, he'll...复制代码

1.3 n-gram (得到组合序列)

每个n-gram由n个连续字的空格分隔的字符串表示。 如果输入序列包含少于n个字符串,则不会生成输出。 import org.apache.spark.ml.feature.NGram

val wordDataFrame = spark.createDataFrame(Seq(      (0, Array("Hi", "I", "heard", "about", "Spark")),      (1, Array("I", "wish", "Java", "could", "use", "case", "classes")),      (2, Array("Logistic", "regression", "models", "are", "neat"))    )).toDF("id", "words")    val ngram = new NGram().setN(3).setInputCol("words").setOutputCol("ngrams")    val ngramDataFrame = ngram.transform(wordDataFrame)    ngramDataFrame.select("ngrams").show(false)    +--------------------------------------------------------------------------------+    |ngrams                                                                          |    +--------------------------------------------------------------------------------+    |[Hi I heard, I heard about, heard about Spark]                                  |    |[I wish Java, wish Java could, Java could use, could use case, use case classes]|    |[Logistic regression models, regression models are, models are neat]            |    +--------------------------------------------------------------------------------+复制代码

1.4 二值化(还是比较厉害的)

二值化是将数字特征阈值为二进制(0/1)特征的过程。 Binarizer接受通用参数inputCol和outputCol以及二进制阈值。 大于阈值的特征值被二进制化为1.0; 等于或小于阈值的值被二值化为0.0。 inputCol支持Vector和Double类型。

import org.apache.spark.ml.feature.Binarizerval data = Array((0, 0.1), (1, 0.8), (2, 0.2))val dataFrame = spark.createDataFrame(data).toDF("id", "feature")val binarizer: Binarizer = new Binarizer().setInputCol("feature").setOutputCol("binarized_feature").setThreshold(0.5)val binarizedDataFrame = binarizer.transform(dataFrame)scala> println(s"Binarizer output with Threshold = ${binarizer.getThreshold}")Binarizer output with Threshold = 0.5scala>binarizedDataFrame.show()+---+-------+-----------------+| id|feature|binarized_feature|+---+-------+-----------------+|  0|    0.1|              0.0||  1|    0.8|              1.0||  2|    0.2|              0.0|+---+-------+-----------------+复制代码

1.5 规范化(StandardScaler)

StandardScaler处理的对象是每一列,也就是每一维特征,将特征标准化为单位标准差或是0均值,或是0均值单位标准差。

StandardScaler=(x-u)/标准差Sn

主要有两个参数可以设置:

  • withStd: 默认为真。将数据标准化到单位标准差。

  • withMean: 默认为假。是否变换为0均值。 (此种方法将产出一个稠密输出,所以不适用于稀疏输入。)

  • StandardScaler需要fit数据,获取每一维的均值和标准差,来缩放每一维特征。 StandardScaler是一个Estimator,它可以fit数据集产生一个StandardScalerModel,用来计算汇总统计。

  • 然后产生的模可以用来转换向量至统一的标准差以及(或者)零均值特征。注意如果特征的标准差为零,则该特征在向量中返回的默认值为0.0。

    import org.apache.spark.ml.linalg.Vectors     val dataFrame = spark.createDataFrame(Seq(  (0, Vectors.dense(1.0, 0.5, -1.0)),  (1, Vectors.dense(2.0, 1.0, 1.0)),  (2, Vectors.dense(4.0, 10.0, 2.0))  )).toDF("id", "features")     dataFrame.show    +---+--------------+  | id|      features|  +---+--------------+  |  0|[1.0,0.5,-1.0]|  |  1| [2.0,1.0,1.0]|  |  2|[4.0,10.0,2.0]|  +---+--------------+    val scaler = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures").setWithStd(true).setWithMean(false)  val scalerModel = scaler.fit(dataFrame)  val scaledData = scalerModel.transform(dataFrame)  scaledData.show    +---+--------------+--------------------+  | id|      features|      scaledFeatures|  +---+--------------+--------------------+  |  0|[1.0,0.5,-1.0]|[0.65465367070797...|  |  1| [2.0,1.0,1.0]|[1.30930734141595...|  |  2|[4.0,10.0,2.0]|[2.61861468283190...|  +---+--------------+--------------------+    scala>     scaledData.rdd.foreach(println)  [0,[1.0,0.5,-1.0],[0.6546536707079772,0.09352195295828246,-0.6546536707079771]]  [1,[2.0,1.0,1.0],[1.3093073414159544,0.18704390591656492,0.6546536707079771]]  [2,[4.0,10.0,2.0],[2.618614682831909,1.8704390591656492,1.3093073414159542]]    val scaler = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures").setWithStd(true).setWithMean(true)  val scalerModel = scaler.fit(dataFrame)  val scaledData = scalerModel.transform(dataFrame)  scaledData.show    scala>     scaledData.rdd.foreach(println)  [0,[1.0,0.5,-1.0],[-0.8728715609439697,-0.6234796863885498,-1.0910894511799618]]  [1,[2.0,1.0,1.0],[-0.2182178902359925,-0.5299577334302673,0.2182178902359924]]  [2,[4.0,10.0,2.0],[1.0910894511799618,1.1534374198188169,0.8728715609439696]复制代码

    1.6 正则化(Normalizer) --面向行

    范数是一种强化了的距离概念,它在定义上比距离多了一条数乘的运算法则。有时候为了便于理解,我们可以把范数当作距离来理解。

  • L1范数是我们经常见到的一种范数,它的定义如下:

  • L2范数是我们最常见最常用的范数了,我们用的最多的度量距离欧氏距离就是一种L2范数,它的定义如下:

  • 是L无穷范数,它主要被用来度量向量元素的最大值,与L0一样,通常情况下表示为:

    import org.apache.spark.ml.feature.Normalizer     正则化每个向量到1阶范数  将每一行的规整为1阶范数为1的向量,1阶范数即所有值绝对值之和  val normalizer = new Normalizer().setInputCol("features") .setOutputCol("normFeatures").setP(1.0)  l1NormData.show()  +---+--------------+------------------+  | id| features| normFeatures|  +---+--------------+------------------+  | 0|[1.0,0.5,-1.0]| [0.4,0.2,-0.4]|  | 1| [2.0,1.0,1.0]| [0.5,0.25,0.25]|  | 2|[4.0,10.0,2.0]|[0.25,0.625,0.125]|  +---+--------------+------------------+     正则化每个向量到无穷阶范数,向量的无穷阶范数即向量中所有值中的最大值  val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)  lInfNormData.show()     +---+--------------+--------------+  | id| features| normFeatures|  +---+--------------+--------------+  | 0|[1.0,0.5,-1.0]|[1.0,0.5,-1.0]|  | 1| [2.0,1.0,1.0]| [1.0,0.5,0.5]|  | 2|[4.0,10.0,2.0]| [0.4,1.0,0.2]|  +---+--------------+--------------+复制代码

    1.7 最大最小值缩放 MinMaxScaler --面向列 (value-Emin/(Emax-Emin))*[max-min]+min

    Emin最小值为每一列最小值  Emax最小值为每一列最大值  MinMaxScaler作用同样是每一列,即每一维特征。将每一维特征线性地映射到指定的区间,通常是[0, 1]。    MinMaxScaler计算数据集的汇总统计量,并产生一个MinMaxScalerModel。    注意因为零值转换后可能变为非零值,所以即便为稀疏输入,输出也可能为稠密向量。    该模型可以将独立的特征的值转换到指定的范围内。  它也有两个参数可以设置:    min: 默认为0。指定区间的下限。  max: 默认为1。指定区间的上限。  import org.apache.spark.ml.feature.MinMaxScaler    val dataFrame = spark.createDataFrame(Seq(  (0, Vectors.dense(1.0, 0.5, -1.0)),  (1, Vectors.dense(2.0, 1.0, 1.0)),  (2, Vectors.dense(4.0, 10.0, 2.0))  )).toDF("id", "features")     val scaler = new MinMaxScaler() .setInputCol("features").setOutputCol("scaledFeatures")     // Compute summary statistics and generate MinMaxScalerModel  val scalerModel = scaler.fit(dataFrame)     // rescale each feature to range [min, max].  val scaledData = scalerModel.transform(dataFrame)  scaledData.select("features", "scaledFeatures").show      每维特征线性地映射,最小值映射到0,最大值映射到1。  +--------------+-----------------------------------------------------------+  |features |scaledFeatures |  +--------------+-----------------------------------------------------------+  |[1.0,0.5,-1.0]|[0.0,0.0,0.0] |  |[2.0,1.0,1.0] |[0.3333333333333333,0.05263157894736842,0.6666666666666666]|  |[4.0,10.0,2.0]|[1.0,1.0,1.0] |  +--------------+-----------------------------------------------------------+复制代码

    1.8 最大值-绝对值缩放MaxAbsScaler(面向列-value除以绝对值最大值)

    MaxAbsScaler将每一维的特征变换到[-1,1]闭区间上,通过除以每一维特征上的最大的绝对值,它不会平移整个分布,也不会破坏原来每一个特征向量的稀疏性。因为它不会转移/集中数据,所以不会破坏数据的稀疏性。

    import org.apache.spark.ml.feature.MaxAbsScaler     val scaler = new MaxAbsScaler()  .setInputCol("features")  .setOutputCol("scaledFeatures")     val scalerModel = scaler.fit(dataFrame)   1]  val scaledData = scalerModel.transform(dataFrame)  scaledData.select("features", "scaledFeatures").show()     // 每一维的绝对值的最大值为[4, 10, 2]  +--------------+----------------+  | features| scaledFeatures|  +--------------+----------------+  |[1.0,0.5,-1.0]|[0.25,0.05,-0.5]|  | [2.0,1.0,1.0]| [0.5,0.1,0.5]|  |[4.0,10.0,2.0]| [1.0,1.0,1.0]|  +--------------+----------------+复制代码

1.9 独热编码(OneHotEncoderEstimator)- 面向列

把每一列的所有可能编码成向量形式:如: 0被编码为:(1,0) 也即一个值变成一个向量。

import org.apache.spark.ml.feature.OneHotEncoderEstimatorval df = spark.createDataFrame(Seq(  (0.0, 1.0),  (1.0, 0.0),  (2.0, 1.0),  (0.0, 2.0),  (0.0, 1.0),  (2.0, 0.0))).toDF("categoryIndex1", "categoryIndex2")val encoder = new OneHotEncoderEstimator().setInputCols(Array("categoryIndex1", "categoryIndex2")) .setOutputCols(Array("categoryVec1", "categoryVec2"))val model = encoder.fit(df)val encoded = model.transform(df)encoded.show()最终结果:+--------------+--------------+-------------+-------------+|categoryIndex1|categoryIndex2| categoryVec1| categoryVec2|+--------------+--------------+-------------+-------------+|           0.0|           1.0|(2,[0],[1.0])|(2,[1],[1.0])||           1.0|           0.0|(2,[1],[1.0])|(2,[0],[1.0])||           2.0|           1.0|    (2,[],[])|(2,[1],[1.0])||           0.0|           2.0|(2,[0],[1.0])|    (2,[],[])||           0.0|           1.0|(2,[0],[1.0])|(2,[1],[1.0])||           2.0|           0.0|    (2,[],[])|(2,[0],[1.0])|+--------------+--------------+-------------+-------------+把每一列的所有可能编码成向量形式:如:第一行0被编码为:(1,0,0)val encoder = new OneHotEncoderEstimator().setInputCols(Array("categoryIndex1", "categoryIndex2")) .setOutputCols(Array("categoryVec1", "categoryVec2")).setDropLast(false)val model = encoder.fit(df)val encoded = model.transform(df)encoded.show()+--------------+--------------+-------------+-------------+|categoryIndex1|categoryIndex2| categoryVec1| categoryVec2|+--------------+--------------+-------------+-------------+|           0.0|           1.0|(3,[0],[1.0])|(3,[1],[1.0])||           1.0|           0.0|(3,[1],[1.0])|(3,[0],[1.0])||           2.0|           1.0|(3,[2],[1.0])|(3,[1],[1.0])||           0.0|           2.0|(3,[0],[1.0])|(3,[2],[1.0])||           0.0|           1.0|(3,[0],[1.0])|(3,[1],[1.0])||           2.0|           0.0|(3,[2],[1.0])|(3,[0],[1.0])|+--------------+--------------+-------------+-------------+复制代码

1.9 字符串-索引变换

根据字符串出现的频率来定义标签列,出现频率最高者为0,因此可以看到0,1,2.....,但是若出现训练的模型中没有测试的值,将会报错,此时需要setHandleInvalid,跳过即可。

import org.apache.spark.ml.feature.StringIndexerval df = spark.createDataFrame(  Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))).toDF("id", "category")val indexer = new StringIndexer().setInputCol("category").setOutputCol("categoryIndex")val model =indexer.fit(df)val indexed = model.transform(df)indexed.show()scala> indexed.show+---+--------+-------------+| id|category|categoryIndex|+---+--------+-------------+|  0|       a|          0.0||  1|       b|          2.0||  2|       c|          1.0||  3|       a|          0.0||  4|       a|          0.0||  5|       c|          1.0|+---+--------+-------------+训练模型中没有d时,会报错,所以需要设置setHandleInvalid val df_test = spark.createDataFrame(  Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"),(5, "d"))).toDF("id", "category") val indexer_test = new StringIndexer().setInputCol("category").setOutputCol("categoryIndex").setHandleInvalid("skip") val model =indexer_test.fit(df)  val indexed_test = model.transform(df_test)复制代码

结语

今天是我的CSDN技术专栏突破50篇的日子,值得庆祝!

秦凯新 于深圳 2018 11 18 1:47

你可能感兴趣的文章
中国电信“商密云存储系统”通过国家商用密码产品鉴定
查看>>
用区块链技术养走地鸡,我认真的
查看>>
指数哥伦布编码
查看>>
mysql用SQLyog导入数据时报错(文件太大)
查看>>
Linux下查看用户列表
查看>>
svn图标显示问题
查看>>
卷积神经网络在图像分割中的进化史:从R-CNN到Mask R-CNN
查看>>
OpenSSH详解
查看>>
JavaScript Tips
查看>>
继续上章节的ospf重分布实验演示一
查看>>
RHEL6 64位ASM方式安装oracle 11gR2(二)
查看>>
玩转日志第一步,通过fluentd转存nginx日志
查看>>
awk 应用
查看>>
SCOM 2016监控IIS 网页服务
查看>>
通用权限管理系统组件 (GPM - General Permissions Manager) 中最简单的例子程序,如何时间通讯录管理...
查看>>
Ajax
查看>>
端口基础常识大全贴
查看>>
Cisco交换机的经典配置(2)
查看>>
稳扎稳打Silverlight(24) - 2.0通信之Socket, 开发一个多人聊天室
查看>>
毕业论文一次性修改所有字母和数字的字体
查看>>