废话

刷新闻偶尔会看到一些人工智能与机器学习的新闻
感觉很高大尚的样子,就想去了解一下
了解完之后,发现这是真的高科技,非常高大尚
自己现在的实力是铁定搞不了了的
只能去玩一玩大佬们包装好的相关工具了

机器学习

机器学习可以理解成是生产算法的算法。
需要人来先做特征提取,然后在把特征向量化后交给机器去训练。

机器学习的分类

传统机器学习分为 监督学习 和 无监督学习。

Spark中使用MLlib中的决策树算法预测分析数据

分析数据来源与天池大会里很早的一个比赛数据,感兴趣的小伙伴可以去下载
https://tianchi.aliyun.com/competition/entrance/231591/introduction
Spark中使用MLlib会涉及到下面两个概念

(1)贷出函数

资源管理(创建SparkSession实例对象、关闭SparkSession对象)

(2)用户函数

真正业务逻辑实现的地方

实现思路

  1. 读取经过处理的数据(比赛数据很大,只需要挑选一部分出来,测试运行即可)
  2. 定义三大主函数,main函数,贷出函数,用户函数
  3. main函数负责调用贷出函数
  4. 贷出函数负责资源管理
  5. 用户函数负责实现业务逻辑
  6. 读取出数据之后,将数据转换为LabeledPoint类型
  7. 划分转换好的数据,将其分为训练数据与测试数据,训练数据越多越好
  8. 使用决策树算法分析训练数据得到模型
  9. 使用模型进行预测,与测试数据进行比较
  10. 评估模型预测性能,通过下面几个指标预测
  11. MSE:均方误差;RMSE:均方根误差;MAE平均绝对误差
  12. 保存模型,待下次调优
  13. 读取模型,重复9到13,直到结果满意

代码实现

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.{DecisionTree, RandomForest}
import org.apache.spark.mllib.tree.model.{DecisionTreeModel, RandomForestModel}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}

/**
  * Spark MLlib机器学习开发模块
  *   贷出设计模式(2类函数)
  *     (1)贷出函数
  *       资源管理(创建SparkSession实例对象、关闭SparkSession对象)
  *     (2)用户函数
  *         真正业务逻辑实现的地方
  *
  */
object IJCAISparkPrecision {

  /**
    * 贷出模式:贷出函数
    * @param args
    *         参数
    * @param operation
    *          用户函数
    */
  def sparkOperation(args: Array[String])(operation:SparkSession=>Unit):Unit={
    if(args.length != 2){
      println("Usage:SparkMLlibTemplateSpark <appName> <master>")
      System.exit(1)
    }
    val spark = SparkSession
      .builder()
      .appName(args(0))
      .master(args(1))
      .getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    try {
      //调用用户函数
      operation(spark)
    }catch {
      case  e:Exception=>e.printStackTrace()
    }finally {
      spark.stop()
    }
  }

  /**
    * 贷出模式中用户函数
    *     针对于机器学习来讲,数据预处理,实训模型,评估模型和测试模型
    * @param spark
    */
  def modelTrain(spark:SparkSession):Unit= {
    //读取数据
    val userVisitPayRDD: RDD[Row] = spark.read
      .option("header","true")
      .csv("file:///D:\\IJCAI\\train_user_visit_pay")
      .select("day_week","shop_id","count_visit","count_pay")
      .rdd

    //转换数据
    val lpRDD: RDD[LabeledPoint] = userVisitPayRDD.map{
      case Row(day_week:String,shop_id:String,count_visit:String,count_pay:String)=>{
        //标签
        val label: Double = count_pay.toDouble
        //特征向量
        val features: Vector = Vectors.dense(
          Array(day_week,shop_id,count_visit).map(_.toDouble)
        )
        //返回标签向量
        LabeledPoint(label,features)
      }
    }

    //偷懒,将数据划分为2份,一份用来训练,一份用来测试
    val Array(traningRDD,testingRDD) = lpRDD.randomSplit(Array(0.8,0.2),123L)

    /**
      * 使用决策树算法预测
      * def trainRegressor(
      * input: RDD[LabeledPoint],
      * categoricalFeaturesInfo: Map[Int, Int],
      *   An entry (n to k)ndicates that feature n is categorical with k categories
      *     n:表示的是  那个特征值是类别数据,写下标,从0 开始
      *     k:表示特征性类别个数,星期:7
      * impurity: String,
      *     计算特征值重要性指标,此处是回归,写死:variance
      * maxDepth: Int,
      *   最大数的深度
      * maxBins: Int
      *   树的最大分裂区间数
      * ): DecisionTreeModel = {
      */
    val dtModel: DecisionTreeModel = DecisionTree.trainRegressor(
      traningRDD,
      Map[Int, Int](0->7),
      "variance",
      16,
      32
    )

    //使用模型进行预测--得到预测值和实际值的比较
    val actualAndPredictRDD: RDD[(Double, Double)] = testingRDD.map{
      case LabeledPoint(label,features)=>{
        val predictValue = dtModel.predict(features)
        (label,predictValue)
      }
    }

    actualAndPredictRDD.take(20).foreach(println)

    //评估模型预测性能
    def modelEvaluate(apRDD: RDD[(Double, Double)]):Unit = {

      //总数
      val count = apRDD.count().toDouble
      //MSE:均方误差
      val mseValue = apRDD
        .map{case(actual,predict)=>Math.pow(actual-predict,2)}
        .sum()/count
      //RMSE:均方根误差
      val rmseValue = Math.sqrt(mseValue)

      //MAE平均绝对误差
      val maeValue = apRDD
        .map{case(actual,predict)=>Math.abs(actual-predict)}
        .sum()/count

      println(s"MSE:$mseValue,RMSE:${rmseValue},MAE:${maeValue}")

    }

    //评估:决策树回归模型性能
    modelEvaluate(actualAndPredictRDD)
 //如果模型比较好,相对满意,保存模型
    modelEvaluate.save(spark.sparkContext,"file:///D:\\IJCAI\\modelEvaluate")

    //加载模型,预测使用
    val loadRFModel: RandomForestModel = RandomForestModel.load(spark.sparkContext,"file:///D:\\IJCAI\\modelEvaluate")
    val features: Vector = Vectors.dense(
      Array("2","1241","1").map(_.toDouble)
    )
    println(s"Actual=51,Predict=${loadRFModel.predict(features)}")
  }

  //运行主函数
  def main(args: Array[String]): Unit = {
    sparkOperation(args)(modelTrain)
  }
}

代码优化

模型性能调优 ,2点考虑

1.训练数据
	(a)数据量越大越好,
	(b)特征值处理
		正则化、归一化处理
		特征性进行加权处理
		增加和减少特征值
2.算法参数(超参数)
	找到合适的超参数值

运行结果

虽然误差较大,但还是测试成功了
在这里插入图片描述

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐