初识机器学习开发模块Spark MLlib的使用与优化
废话刷新闻偶尔会看到一些人工智能与机器学习的新闻感觉很高大尚的样子,就想去了解一下了解完之后,发现这是真的高科技,非常高大尚自己现在的实力是铁定搞不了了的只能去玩一玩大佬们包装好的相关工具了机器学习机器学习可以理解成是生产算法的算法。需要人来先做特征提取,然后在把特征向量化后交给机器去训练。机器学习的分类传统机器学习分为 监督学习 和 无监督学习。Spark中使用MLlib中...
·
废话
刷新闻偶尔会看到一些人工智能与机器学习的新闻
感觉很高大尚的样子,就想去了解一下
了解完之后,发现这是真的高科技,非常高大尚
自己现在的实力是铁定搞不了了的
只能去玩一玩大佬们包装好的相关工具了
机器学习
机器学习可以理解成是生产算法的算法。
需要人来先做特征提取,然后在把特征向量化后交给机器去训练。
机器学习的分类
传统机器学习分为 监督学习 和 无监督学习。
Spark中使用MLlib中的决策树算法预测分析数据
分析数据来源与天池大会里很早的一个比赛数据,感兴趣的小伙伴可以去下载
https://tianchi.aliyun.com/competition/entrance/231591/introduction
Spark中使用MLlib会涉及到下面两个概念
(1)贷出函数
资源管理(创建SparkSession实例对象、关闭SparkSession对象)
(2)用户函数
真正业务逻辑实现的地方
实现思路
- 读取经过处理的数据(比赛数据很大,只需要挑选一部分出来,测试运行即可)
- 定义三大主函数,main函数,贷出函数,用户函数
- main函数负责调用贷出函数
- 贷出函数负责资源管理
- 用户函数负责实现业务逻辑
- 读取出数据之后,将数据转换为LabeledPoint类型
- 划分转换好的数据,将其分为训练数据与测试数据,训练数据越多越好
- 使用决策树算法分析训练数据得到模型
- 使用模型进行预测,与测试数据进行比较
- 评估模型预测性能,通过下面几个指标预测
- MSE:均方误差;RMSE:均方根误差;MAE平均绝对误差
- 保存模型,待下次调优
- 读取模型,重复9到13,直到结果满意
代码实现
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.{DecisionTree, RandomForest}
import org.apache.spark.mllib.tree.model.{DecisionTreeModel, RandomForestModel}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
/**
* Spark MLlib机器学习开发模块
* 贷出设计模式(2类函数)
* (1)贷出函数
* 资源管理(创建SparkSession实例对象、关闭SparkSession对象)
* (2)用户函数
* 真正业务逻辑实现的地方
*
*/
object IJCAISparkPrecision {
/**
* 贷出模式:贷出函数
* @param args
* 参数
* @param operation
* 用户函数
*/
def sparkOperation(args: Array[String])(operation:SparkSession=>Unit):Unit={
if(args.length != 2){
println("Usage:SparkMLlibTemplateSpark <appName> <master>")
System.exit(1)
}
val spark = SparkSession
.builder()
.appName(args(0))
.master(args(1))
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
try {
//调用用户函数
operation(spark)
}catch {
case e:Exception=>e.printStackTrace()
}finally {
spark.stop()
}
}
/**
* 贷出模式中用户函数
* 针对于机器学习来讲,数据预处理,实训模型,评估模型和测试模型
* @param spark
*/
def modelTrain(spark:SparkSession):Unit= {
//读取数据
val userVisitPayRDD: RDD[Row] = spark.read
.option("header","true")
.csv("file:///D:\\IJCAI\\train_user_visit_pay")
.select("day_week","shop_id","count_visit","count_pay")
.rdd
//转换数据
val lpRDD: RDD[LabeledPoint] = userVisitPayRDD.map{
case Row(day_week:String,shop_id:String,count_visit:String,count_pay:String)=>{
//标签
val label: Double = count_pay.toDouble
//特征向量
val features: Vector = Vectors.dense(
Array(day_week,shop_id,count_visit).map(_.toDouble)
)
//返回标签向量
LabeledPoint(label,features)
}
}
//偷懒,将数据划分为2份,一份用来训练,一份用来测试
val Array(traningRDD,testingRDD) = lpRDD.randomSplit(Array(0.8,0.2),123L)
/**
* 使用决策树算法预测
* def trainRegressor(
* input: RDD[LabeledPoint],
* categoricalFeaturesInfo: Map[Int, Int],
* An entry (n to k)ndicates that feature n is categorical with k categories
* n:表示的是 那个特征值是类别数据,写下标,从0 开始
* k:表示特征性类别个数,星期:7
* impurity: String,
* 计算特征值重要性指标,此处是回归,写死:variance
* maxDepth: Int,
* 最大数的深度
* maxBins: Int
* 树的最大分裂区间数
* ): DecisionTreeModel = {
*/
val dtModel: DecisionTreeModel = DecisionTree.trainRegressor(
traningRDD,
Map[Int, Int](0->7),
"variance",
16,
32
)
//使用模型进行预测--得到预测值和实际值的比较
val actualAndPredictRDD: RDD[(Double, Double)] = testingRDD.map{
case LabeledPoint(label,features)=>{
val predictValue = dtModel.predict(features)
(label,predictValue)
}
}
actualAndPredictRDD.take(20).foreach(println)
//评估模型预测性能
def modelEvaluate(apRDD: RDD[(Double, Double)]):Unit = {
//总数
val count = apRDD.count().toDouble
//MSE:均方误差
val mseValue = apRDD
.map{case(actual,predict)=>Math.pow(actual-predict,2)}
.sum()/count
//RMSE:均方根误差
val rmseValue = Math.sqrt(mseValue)
//MAE平均绝对误差
val maeValue = apRDD
.map{case(actual,predict)=>Math.abs(actual-predict)}
.sum()/count
println(s"MSE:$mseValue,RMSE:${rmseValue},MAE:${maeValue}")
}
//评估:决策树回归模型性能
modelEvaluate(actualAndPredictRDD)
//如果模型比较好,相对满意,保存模型
modelEvaluate.save(spark.sparkContext,"file:///D:\\IJCAI\\modelEvaluate")
//加载模型,预测使用
val loadRFModel: RandomForestModel = RandomForestModel.load(spark.sparkContext,"file:///D:\\IJCAI\\modelEvaluate")
val features: Vector = Vectors.dense(
Array("2","1241","1").map(_.toDouble)
)
println(s"Actual=51,Predict=${loadRFModel.predict(features)}")
}
//运行主函数
def main(args: Array[String]): Unit = {
sparkOperation(args)(modelTrain)
}
}
代码优化
模型性能调优 ,2点考虑
1.训练数据
(a)数据量越大越好,
(b)特征值处理
正则化、归一化处理
特征性进行加权处理
增加和减少特征值
2.算法参数(超参数)
找到合适的超参数值
运行结果
虽然误差较大,但还是测试成功了
更多推荐
已为社区贡献6条内容
所有评论(0)