91精品国产综合久久四虎久久_国产成人午夜高潮毛片_99er视频精品免费观看_2020亚洲熟女在线观看_日本女优人体写真_国内黄色毛片_年轻的老师中文版在线_丰满女邻居做爰_久久久久久精品成人免费图片

擴(kuò)展Spark ML來(lái)構(gòu)建你自己的模型和變換器類(lèi)型
如何從wordcount作為例子開(kāi)始入手(你也許以為你不用再看到wordcount例子了)。
編者注:你可以在Holden Karau和Rachel Warren合作的《高性能Spark:擴(kuò)展和優(yōu)化Apache Spark的最佳實(shí)踐》一文中了解更多的相關(guān)內(nèi)容。

更多內(nèi)容可以參考Strata北京2017的相關(guān)議題。

盡管Spark ML管道提供了各種各樣的算法,你仍可能想要額外的功能,并且不脫離管道模型。在Spark Mllib中,這算不上什么問(wèn)題,你可以通過(guò)RDD的變換來(lái)實(shí)現(xiàn)你自己的算法,并繼續(xù)下去。對(duì)于Spark ML 管道來(lái)說(shuō),同樣的方法是可行的,但是我們會(huì)失去一些管道所具備的優(yōu)良特性,包括自動(dòng)執(zhí)行元算法的能力,例如交叉驗(yàn)證的參數(shù)搜索。在本文中,你會(huì)從標(biāo)準(zhǔn)的wordcount例子入手(在大數(shù)據(jù)方面,你是不可能真正躲開(kāi)wordcount例子的),了解到如何擴(kuò)展Spark ML 管道模型。

為了將你自己的算法加入Spark管道中來(lái),你需要實(shí)現(xiàn)Estimator或者是Transformer,它們都實(shí)現(xiàn)了PipelineStage接口。對(duì)于那些不需要訓(xùn)練的算法,你可以實(shí)現(xiàn)Transformer接口,而對(duì)于那些需要訓(xùn)練的算法,你需要實(shí)現(xiàn)Estimator接口,它們都定義在org.apache.spark.ml下(都實(shí)現(xiàn)了基類(lèi)?PipelineStage)。要說(shuō)明的是,訓(xùn)練并不是只限于復(fù)雜的機(jī)器學(xué)習(xí)模型,即使是最大最小值區(qū)間縮放器也需要訓(xùn)練來(lái)確定范圍。如果你的算法需要訓(xùn)練,它們必須以Estimator來(lái)構(gòu)建而不是Transformer。

注:直接使用PipelineStage是不可行的,因?yàn)楣艿纼?nèi)部使用了反射機(jī)制,假定了所有的管道stage要么是一個(gè)Estimator,要么就是Transformer。

除了顯而易見(jiàn)的transform和fit方法,所有的管道的stage需要提供transformSchema,以及一個(gè)copy構(gòu)造器或者實(shí)現(xiàn)一個(gè)可以為你提供這些功能的類(lèi)。copy是用來(lái)制作一個(gè)當(dāng)前stage的拷貝,合并入任何新指定的參數(shù),可以簡(jiǎn)稱為defaultCopy(除非你的類(lèi)對(duì)構(gòu)造器有特別的考慮)。

class HardCodedWordCountStage(override val uid: String) extends Transformer {

def this() = this(Identifiable.randomUID(“hardcodedwordcount”))

def copy(extra: ParamMap): HardCodedWordCountStage = {

defaultCopy(extra)

}

一個(gè)管道stage的起始以及拷貝代理如下:transformSchema?必須基于任何參數(shù)和一個(gè)輸入模式產(chǎn)生你的管道stage的期望輸出??紤]到已有字段可能會(huì)被使用到,大部分管道stage只增加新的字段,很少的一些會(huì)去掉之前的一些字段。這有時(shí)候會(huì)導(dǎo)致輸出的結(jié)果包含比下游所需的數(shù)據(jù)多,反而會(huì)降低性能。如果發(fā)現(xiàn)你的管道中有這樣的問(wèn)題,那么你可以創(chuàng)建你自己的stage來(lái)去掉不需要的字段。

除了產(chǎn)生輸出模式之外,transformSchema 方法還應(yīng)該驗(yàn)證輸入模式是否適合于該stage(例如,輸入列是否是期望的類(lèi)型)。

這里也是你應(yīng)該對(duì)stage的參數(shù)進(jìn)行驗(yàn)證的地方。一個(gè)簡(jiǎn)單的輸入為字符串輸出為向量的并且寫(xiě)死編碼的輸出和輸入列的transformSchema如下所示:

override def transformSchema(schema: StructType): StructType = {

// Check that the input type is a string

val idx = schema.fieldIndex(“happy_pandas”)

val field = schema.fields(idx)

if (field.dataType != StringType) {

throw new Exception(s”Input type ${field.dataType} did not match input type StringType”)

}

// Add the return field

schema.add(StructField(“happy_panda_counts”, IntegerType, false))

}

不需要訓(xùn)練的算法可以通過(guò)Transformer接口非常容易地實(shí)現(xiàn)。由于這是最簡(jiǎn)單的管道stage,你可以從實(shí)現(xiàn)一個(gè)簡(jiǎn)單的transformer開(kāi)始,計(jì)算在輸入列中單詞的數(shù)量。

def transform(df: Dataset[_]): DataFrame = {

val wordcount = udf { in: String => in.split(” “).size }

df.select(col(“*”),

wordcount(df.col(“happy_pandas”)).as(“happy_panda_counts”))

}

為了獲得大部分的管道接口,你可能會(huì)想要使你的管道stage可以通過(guò)參數(shù)接口來(lái)達(dá)到可配置化。

盡管參數(shù)接口是公開(kāi)的,不幸的是,常用的Spark中的默認(rèn)參數(shù)都是私有的,所以你最后不得不寫(xiě)大段重復(fù)的代碼。除了允許用戶指定的值,參數(shù)也可以包含一些基本的驗(yàn)證邏輯(例如,正則化的參數(shù)必須是一個(gè)非負(fù)值)。兩個(gè)最常用的參數(shù)是輸入列和輸出列,可以十分簡(jiǎn)單地加到你的模型上去。

除了字符串參數(shù),其他的類(lèi)型也可以使用。包括字符串列表來(lái)接收停止詞,或浮點(diǎn)數(shù)來(lái)接收停止詞。

class ConfigurableWordCount(override val uid: String) extends Transformer {

final val inputCol= new Param[String](this, “inputCol”, “The input column”)

final val outputCol = new Param[String](this, “outputCol”, “The output column”)

; def setInputCol(value: String): this.type = set(inputCol, value)

def setOutputCol(value: String): this.type = set(outputCol, value)

def this() = this(Identifiable.randomUID(“configurablewordcount”))

def copy(extra: ParamMap): HardCodedWordCountStage = {

defaultCopy(extra)

}

override def transformSchema(schema: StructType): StructType = {

// Check that the input type is a string

val idx = schema.fieldIndex($(inputCol))

val field = schema.fields(idx)

if (field.dataType != StringType) {

throw new Exception(s”Input type ${field.dataType} did not match input type StringType”)

}

// Add the return field

schema.add(StructField($(outputCol), IntegerType, false))

}

def transform(df: Dataset[_]): DataFrame = {

val wordcount = udf { in: String => in.split(” “).size }

df.select(col(“*”), wordcount(df.col($(inputCol))).as($(outputCol)))

}

}

不需要訓(xùn)練的算法可以通過(guò)Estimator接口來(lái)實(shí)現(xiàn),盡管對(duì)于許多算法而言,?org.apache.spark.ml.Predictor?或者?org.apache.spark.ml.classificationClassifier?這些幫助類(lèi)更容易實(shí)現(xiàn)。Estimator?和?Transformer接口的主要區(qū)別是,它不再直接在輸入上進(jìn)行變換操作,而是會(huì)首先在一個(gè)train 方法里面進(jìn)行一個(gè)步驟——訓(xùn)練。一個(gè)字符串索引器是你可以實(shí)現(xiàn)的最簡(jiǎn)單的estimator之一。盡管在Spark中可以直接使用了,它仍然是用于說(shuō)明如何使用estimator接口的非常好的例子。

trait SimpleIndexerParams extends Params {

final val inputCol= new Param[String](this, “inputCol”, “The input column”)

final val outputCol = new Param[String](this, “outputCol”, “The output column”)

}

class SimpleIndexer(override val uid: String) extends Estimator[SimpleIndexerModel] with SimpleIndexerParams {

def setInputCol(value: String) = set(inputCol, value)

def setOutputCol(value: String) = set(outputCol, value)

def this() = this(Identifiable.randomUID(“simpleindexer”))

override def copy(extra: ParamMap): SimpleIndexer = {

defaultCopy(extra)

}

override def transformSchema(schema: StructType): StructType = {

// Check that the input type is a string

val idx = schema.fieldIndex($(inputCol))

val field = schema.fields(idx)

if (field.dataType != StringType) {

throw new Exception(s”Input type ${field.dataType} did not match input type StringType”)

}

// Add the return field

schema.add(StructField($(outputCol), IntegerType, false))

}

override def fit(dataset: Dataset[_]): SimpleIndexerModel = {

import dataset.sparkSession.implicits._

val words = dataset.select(dataset($(inputCol)).as[String]).distinct

.collect()

new SimpleIndexerModel(uid, words)

; }

}

class SimpleIndexerModel(

override val uid: String, words: Array[String]) extends Model[SimpleIndexerModel] with SimpleIndexerParams {

override def copy(extra: ParamMap): SimpleIndexerModel = {

defaultCopy(extra)

}

private val labelToIndex: Map[String, Double] = words.zipWithIndex.

map{case (x, y) => (x, y.toDouble)}.toMap

override def transformSchema(schema: StructType): StructType = {

// Check that the input type is a string

val idx = schema.fieldIndex($(inputCol))

val field = schema.fields(idx)

if (field.dataType != StringType) {

throw new Exception(s”Input type ${field.dataType} did not match input type StringType”)

}

// Add the return field

schema.add(StructField($(outputCol), IntegerType, false))

}

override def transform(dataset: Dataset[_]): DataFrame = {

val indexer = udf { label: String => labelToIndex(label) }

dataset.select(col(“*”),

indexer(dataset($(inputCol)).cast(StringType)).as($(outputCol)))

}

}

如果你正在實(shí)現(xiàn)一個(gè)迭代算法,你可能希望將還沒(méi)有緩存的輸入數(shù)據(jù)緩存起來(lái),或者允許用戶來(lái)指定一個(gè)持久化等級(jí)。

Predictor 接口增加了兩個(gè)最常用的參數(shù)(輸入和輸出列)作為標(biāo)記列、特征列和預(yù)測(cè)列——并且自動(dòng)地幫我們處理模式的變換。

Classifier 接口基本上如出一轍,除了它還增加了一個(gè)rawPredictionColumn ,并且提供了工具來(lái)檢測(cè)類(lèi)別的數(shù)量(getNumClasses方法)以及將輸入的?DataFrame?轉(zhuǎn)化為一個(gè)LabeledPoints的RDD(使其更容易來(lái)封裝傳統(tǒng)的Mllib分類(lèi)算法)。

如果你正在實(shí)現(xiàn)一個(gè)回歸或者聚類(lèi)接口,目前沒(méi)有公開(kāi)的基本接口可以使用,所以你需要使用通用的Estimator接口。

// Simple Bernouli Naive Bayes classifier – no sanity checks for brevity

// Example only – not for production use.

class SimpleNaiveBayes(val uid: String)

extends Classifier[Vector, SimpleNaiveBayes, SimpleNaiveBayesModel] {

def this() = this(Identifiable.randomUID(“simple-naive-bayes”))

override def train(ds: Dataset[_]): SimpleNaiveBayesModel = {

import ds.sparkSession.implicits._

ds.cache()

// Note: you can use getNumClasses and extractLabeledPoints to get an RDD instead

// Using the RDD approach is common when integrating with legacy machine learning code

// or iterative algorithms which can create large query plans.

// Here we use Datasets since neither of those apply.

// Compute the number of documents

val numDocs = ds.count

// Get the number of classes.

// Note this estimator assumes they start at 0 and go to numClasses

val numClasses = getNumClasses(ds)

// Get the number of features by peaking at the first row

val numFeatures: Integer = ds.select(col($(featuresCol))).head

.get(0).asInstanceOf[Vector].size

// Determine the number of records for each class

val groupedByLabel = ds.select(col($(labelCol)).as[Double]).groupByKey(x => x)

val classCounts = groupedByLabel.agg(count(“*”).as[Long])

.sort(col(“value”)).collect().toMap

// Select the labels and features so we can more easily map over them.

// Note: we do this as a DataFrame using the untyped API because the Vector

// UDT is no longer public.

val df = ds.select(col($(labelCol)).cast(DoubleType), col($(featuresCol)))

// Figure out the non-zero frequency of each feature for each label and

// output label index pairs using a case clas to make it easier to work with.

val labelCounts: Dataset[LabeledToken] = df.flatMap {

case Row(label: Double, features: Vector) =>

features.toArray.zip(Stream from 1)

.filter{vIdx => vIdx._2 == 1.0}

.map{case (v, idx) => LabeledToken(label, idx)}

}

// Use the typed Dataset aggregation API to count the number of non-zero

// features for each label-feature index.

val aggregatedCounts: Array[((Double, Integer), Long)] = labelCounts

.groupByKey(x => (x.label, x.index))

.agg(count(“*”).as[Long]).collect()

val theta = Array.fill(numClasses)(new Array[Double](numFeatures))

// Compute the denominator for the general prioirs

val piLogDenom = math.log(numDocs + numClasses)

// Compute the priors for each class

val pi = classCounts.map{case(_, cc) =>

math.log(cc.toDouble) – piLogDenom }.toArray

// For each label/feature update the probabilities

aggregatedCounts.foreach{case ((label, featureIndex), count) =>

// log of number of documents for this label + 2.0 (smoothing)

val thetaLogDenom = math.log(

classCounts.get(label).map(_.toDouble).getOrElse(0.0) + 2.0)

theta(label.toInt)(featureIndex) = math.log(count + 1.0) – thetaLogDenom

}

// Unpersist now that we are done computing everything

ds.unpersist()

// Construct a model

new SimpleNaiveBayesModel(uid, numClasses, numFeatures, Vectors.dense(pi),

new DenseMatrix(numClasses, theta(0).length, theta.flatten, true))

}

override def copy(extra: ParamMap) = {

defaultCopy(extra)

}

}

// Simplified Naive Bayes Model

case class SimpleNaiveBayesModel(

override val uid: String,

override val numClasses: Int,

override val numFeatures: Int,

val pi: Vector,

val theta: DenseMatrix) extends

ClassificationModel[Vector, SimpleNaiveBayesModel] {

override def copy(extra: ParamMap) = {

defaultCopy(extra)

}

// We have to do some tricks here because we are using Spark’s

// Vector/DenseMatrix calculations – but for your own model don’t feel

// limited to Spark’s native ones.

val negThetaArray = theta.values.map(v => math.log(1.0 – math.exp(v)))

val negTheta = new DenseMatrix(numClasses, numFeatures, negThetaArray, true)

val thetaMinusNegThetaArray = theta.values.zip(negThetaArray)

.map{case (v, nv) => v – nv}

val thetaMinusNegTheta = new DenseMatrix(

numClasses, numFeatures, thetaMinusNegThetaArray, true)

val onesVec = Vectors.dense(Array.fill(theta.numCols)(1.0))

val negThetaSum: Array[Double] = negTheta.multiply(onesVec).toArray

// Here is the prediciton functionality you need to implement – for ClassificationModels

// transform automatically wraps this – but if you might benefit from broadcasting your model or

// other optimizations you can also override transform.

def predictRaw(features: Vector): Vector = {

// Toy implementation – use BLAS or similar instead

// the summing of the three vectors but the functionality isn’t exposed.

Vectors.dense(thetaMinusNegTheta.multiply(features).toArray.zip(pi.toArray)

.map{case (x, y) => x + y}.zip(negThetaSum).map{case (x, y) => x + y}

)

}

}

注:如果你只是需要修改一個(gè)已有的算法,你可以(通過(guò)假裝在org.apache.spark項(xiàng)目中來(lái))擴(kuò)展它。

現(xiàn)在你知道如何用你自己的管道stage來(lái)擴(kuò)展Spark的ML管道API。如果你找不到頭緒,一個(gè)好的參考是Spark本身內(nèi)部的算法。盡管有時(shí)候使用了內(nèi)部的API,但是大部分情況下它們實(shí)現(xiàn)公開(kāi)接口的方式與你想要做的是同樣的。

Holden Karau

Holden Karau是一個(gè)加拿大籍跨性別者和一位積極的開(kāi)源軟件貢獻(xiàn)者。當(dāng)不在舊金山的IBM Spark技術(shù)中心作為一位軟件工程師工作期間,Holden會(huì)在全球宣講Spark,并在家里和咖啡館里提供Spark的技術(shù)指導(dǎo)時(shí)間。她經(jīng)常為Spark貢獻(xiàn)代碼,專注于PySpark和機(jī)器學(xué)習(xí)部分。在加入IBM之前,她在Alpine、Databrick、谷歌、Foursquare和亞馬遜等公司參與了許多與分布式查詢和分類(lèi)相關(guān)問(wèn)題的工作。她于滑鐵盧大學(xué)獲得計(jì)算機(jī)專業(yè)的數(shù)學(xué)學(xué)士學(xué)位。除了軟件開(kāi)發(fā)之外,她還喜歡玩火、焊接、玩踏板車(chē)、肉汁乳酪薯?xiàng)l和跳舞。

Cafe lights. (source: Pixabay).