第二章、Flink 批处理 API

  • 掌握Flink批处理SourceOperator

  • 掌握Flink批处理TransformationOperator

  • 掌握Flink批处理SinkOperator

  • 了解Flink的累加器

  • 掌握Flink的广播变量

  • 掌握Flink的分布式缓存

1. API和编程模型

1.1API

​ Flink提供了多个层次的API供开发者使用,越往上抽象程度越高,使用起来越方便;越往下越
底层,使用起来难度越大

image-20210128225655062

https://ci.apache.org/projects/flink/flink-docs-release-1.10/concepts/programming-model.html#levels-of-abstraction

image-20210128225733613

1.2编程模型

Flink应用程序结构主要包含:Source、Transformation、Sink,:

image-20210128225923443

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/api_concepts.html#anatomy-of-a-flink-program

Fink DataSet API编程基本步骤:

1、获取执行环境(ExecutionEnvironment)

image-20210128230104627

2.加载、创建初始化数据集(DataSet)

image-20210128230155845

3.对数据集进行各种转换(Transformation)操作,生成新的DataSet

image-20210128230304353

4.指定计算的结果存储在哪个位置

image-20210128230506929

Streaming Processing:https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/#stream-processing

image-20210128230537881

https://ci.apache.org/projects/flink/flink-docs-release-1.10/concepts/programming-model.html#programs-and-dataflows

1.3词频统计WordCount

​ 分别使用java语言和scala语言,基于Flink流式分析引擎对批处理数据进行离线分析,实现大数据经典程序:词频统计WordCount。

1.3.1java版本

​ 从本地文件系统加载文本文件数据,按照单词统计次数,最终按照词频降序排序。

package batcch.com.tunan.flink.start;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.SortPartitionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * 基于Flink引擎实现批处理词频统计WordCount:过滤filter、排序sort等操作
 */
public class BatchWorkCount {
    public static void main(String[] args) throws Exception {
        //1,执行环境-env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //2、数据源-source
        DataSource<String> inputDataSet = env.readTextFile("datas\\wordcount.data");
        //3、数据转换-transformation
        AggregateOperator<Tuple2<String, Integer>> aggregateDataSet = inputDataSet
                //过滤操作
                .filter(new FilterFunction<String>() {
                    @Override
                    public boolean filter(String line) throws Exception {
                        return null != line && line.trim().length() > 0;
                    }
                })
                //单词分割
                .flatMap(new FlatMapFunction<String, String>() {
                    @Override
                    public void flatMap(String line, Collector<String> out) throws Exception {
                        String[] words = line.trim().split("\\W+");
                        for (String word : words) {
                            out.collect(word);
                        }
                    }
                })
                //转换二元组,表示单词出现一次
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String word) throws Exception {
                        return Tuple2.of(word, 1);
                    }
                })
                //分组聚合
                .groupBy(0)//只能针对元组类型数据进行数据聚合
                .sum(1);
                        /*
                (hadoop,7)
                (hive,3)
                (flink,1)
                (hbase,1)
                (spark,9)
                */
                //aggregateDataSet.print();
                // TODO: 对计算结果,进行降序排序, 设置Operator并行度为1,进行全局排序
                /*
                (spark,9)
                (hadoop,7)
                (hive,3)
                (flink,1)
                (hbase,1)
                */

        SortPartitionOperator<Tuple2<String, Integer>> sortDataSet = aggregateDataSet
                .sortPartition(1, Order.DESCENDING)
                .setParallelism(1);

        //4、数据终端-sink

        /*
        System.out.println(); -> print()
        System.err.println(); -> printToErr() ,控制台打印红色字体
        */
        sortDataSet.printToErr();

        //5、触发执行
    }
}

1.3.2 Scala版本

​ 从本地文件加载文本文件数据,按照单词统计次数,最终按照降序排序。

package cn.itcast.flink.start

import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._

/**
 * 使用Scala语言实现词频统计:WordCount
 */
object FlinkWordCount {
  def main(args: Array[String]): Unit = {
    // 1. 执行环境-env
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    // 2. 数据源-source
    val textDataSet: DataSet[String] = env.readTextFile("datas/wordcount.data")
    // 3. 数据转换-transformation
    val countDataSet: DataSet[(String, Int)] = textDataSet
      // 第一步、过滤数据
      .filter(line => null != line && line.trim.length > 0)
      // 第二步、将每行数据分割为单词
      .flatMap(line => line.trim.toLowerCase.split("\\W+"))
      // 第三步、转换为二元组,表示单词出现一次
      .map(word => (word, 1))
      // 第四步、按照单词分词,累加统计个数
      .groupBy(0).sum(1)
      // 第五步、按照词频count降序排序
      .sortPartition(1, Order.DESCENDING).setParallelism(1)
    //4.数据输出-sink
    countDataSet.print()
  }
}

2. 数据源 Source

2.1基于集合的Source

  • API

    一般用于学习测试时模拟数据使用

    1. env.fromElements(可变参数);
    2. env.fromColletion(各种集合)
    3. env.generateSequence(开始,结束)
  • 代码演示

    package cn.itcast.flink.source;
    
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    
    import java.util.Arrays;
    
    /*
    * DataSet API批处理中数据源:基于集合Source
    * 1.env.fromElements(可变参数);
    * 2.env.fromColletion(各种集合)
    * 3.env.generateSequence(开始,结束)
    * */
    public class BatchSourceCollectionDemo {
        public static void main(String[] args) throws Exception {
            //1、执行环境-env
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            //2、数据源-source
            //方式一、可变参数列表
            DataSource<String> dataSet01 = env.fromElements("hadoop", "flink", "spark");
            dataSet01.printToErr();
            //方式二、集合、如列表list
            DataSource<String> dataSet02 = env.fromCollection(Arrays.asList("hadoop", "spark", "flink"));
            dataSet02.printToErr();
            //方式三、序列
            DataSource<Long> dataSet03 = env.generateSequence(1, 10);
            dataSet03.printToErr();
        }
    }
    
    

    2.2基于文件的Source

  • API

    1.env.readTextFile(本地文件/HDFS文件); //压缩文件也可以
    2.env.readCsvFile泛型
    Configuration parameters = new Configuration();
    parameters.setBoolean(“recursive.file.enumeration”, true);//设置是否递归读取目录
    3.env.readTextFile(“目录”).withParameters(parameters);

  • 代码演示:

    package cn.itcast.flink.source;
    
    import lombok.Data;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.configuration.Configuration;
    
    
    /**
     * DataSet API 批处理中数据源:基于文件Source
     * 1.env.readTextFile(本地文件/HDFS文件); //压缩文件也可以
     * 2.env.readCsvFile[泛型]("本地文件/HDFS文件")
     * Configuration parameters = new Configuration();
     * parameters.setBoolean("recursive.file.enumeration", true);//设置是否递归读取目录
     * 3.env.readTextFile("目录").withParameters(parameters);
     */
    public class BatchSourceFlieDemo {
        public static void main(String[] args) throws Exception {
            //1\执行环境-env
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            //2.数据源-source
            //文本文件、
            DataSource<String> dataSet01 = env.readTextFile("datas/wordcount.data");
            dataSet01.printToErr();
    
            //csv文件
            DataSource<Rating> dataSet02 = env
                    .readCsvFile("datas/u.data")
                    .fieldDelimiter("\t")
                    .pojoType(Rating.class, "userId", "movieId", "rating", "timestamp");
            dataSet02.printToErr();
    
            //目录
            Configuration parameters = new Configuration();
            parameters.setBoolean("recursive.file.enumeration",true);
            DataSource<String> dataSet03 = env.readTextFile("datas/subDatas").withParameters(parameters);
            dataSet03.printToErr();
        }
    
        @Data
        public static class Rating{
            public Integer userId ;
            public Integer movieId ;
            public Double rating ;
            public Long timestamp ;
        }
    }
    
    

3.数据终端Sink

  • API

    1.ds.print直接输出到控制台

    2.ds.printToErr()直接输出到控制台,用红色

    3.ds.collect将分布式数据收集为本地集合

    4.ds.setParallelism(1).writeAsText(“本地/HDFS的path”,WriteMode.OVERWEITE)

  • 注意

    在输出到path的时候,可以在前面设置并行度,如果

    并行度>1,则path为目录

    并行度=1,则path为文件名

  • 代码演示

    package cn.itcast.flink.sink;
    
    import org.apache.flink.api.common.functions.RichMapFunction;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSink;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.core.fs.FileSystem;
    import scala.Tuple2;
    
    import java.util.List;
    
    /**
     * DataSet API 批处理中数据终端:基于文件Sink
     * 1.ds.print 直接输出到控制台
     * 2.ds.printToErr() 直接输出到控制台,用红色
     * 3.ds.collect 将分布式数据收集为本地集合
     * 4.ds.setParallelism(1).writeAsText("本地/HDFS的path",WriteMode.OVERWRITE)
     *
     * 注意: 在输出到path的时候,可以在前面设置并行度,如果
     * 并行度>1,则path为目录
     * 并行度=1,则path为文件名
     */
    public class BatchSinkFlieDemo {
        public static void main(String[] args) throws Exception {
            //1,执行环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            //2.数据源-source:基于文件数据Source
            DataSource<String> dataSet = env.fromElements("mapreduce", "spark", "flink");
            //3.数据终端-sink
            dataSet.print();
            dataSet.printToErr();
            //DataSet转换为本地集合,比如列表List
            List<String> list = dataSet.collect();
            System.out.println(list);
    
            //保存数据到文本文件
            dataSet.writeAsText("datas/sink.txt").setParallelism(1);//文件的并行度为1,写入文件
            //保存至scv文件,数据类型必须为二元组
            dataSet
                    //todo:使用加强版映射函数:RichMapFunction:加强版的能获取分区号之类的
                    .map(new RichMapFunction<String, Tuple2<Integer, String>>() {
                        @Override
                        public Tuple2<Integer, String> map(String value) throws Exception {
                            //获取分区索引
                            int index = getRuntimeContext().getIndexOfThisSubtask();
                            return new Tuple2<>(index, value);
                        }
                    })
                    .writeAsCsv("datas/sink-csv.txt", FileSystem.WriteMode.OVERWRITE)
                    .setParallelism(1);
            //4.执行-execute
            env.execute(BatchSinkFlieDemo.class.getSimpleName());
        }
    }
    
    

4. 数据转换 Transformation

官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/batch/

4.1英文版API解释

image-20210130211214623

image-20210130211223431

image-20210130211234124

4.2中文版API解释

image-20210130211318491

4.3基本函数

4.3.1map
  • API

map:将函数作用在集合中的每一个元素上,并返回作用后的结果

image-20210130211415100

  • 需求

    将click.log中的每一条日志转为javaBean对象

    {
    "browserType": "360浏览器",
    "categoryID": 2,
    "channelID": 3,
    "city": "昌平",
    "country": "china",
    "entryTime": 1577890860000,
    "leaveTime": 1577898060000,
    "network": "移动",
    "produceID": 11,
    "province": "北京",
    "source": "必应跳转",
    "userID": 18
    }
    

    封装JSON数据实体类ClickLog:

    package cn.itcast.flink.transformation;
    import lombok.Data;
    @Data
    public class ClickLog {
    //频道ID
    private long channelId;
    //产品的类别ID
    private long categoryId;
    //产品ID
    private long produceId;
    //用户的ID
    private long userId;
    //国家
    private String country;
    //省份
    private String province;
    //城市
    private String city;
    //网络方式
    private String network;
    //来源方式
    private String source;
    //浏览器类型
    private String browserType;
    //进入网站时间
    private Long entryTime;
    //离开网站时间
    private Long leaveTime;
    }
    
  • 代码实现

    package cn.itcast.flink.transformation;
    
    import com.alibaba.fastjson.JSON;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.operators.MapOperator;
    
    public class Testclinklog {
        public static void main(String[] args) throws Exception {
            //1.执行环境-env
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            //2.数据源-source
            DataSource<String> logDataSet = env.readTextFile("datas/click.log");
            //3,数据转换-Transformation
            //todo:a.使用map函数,将每条日志数据转换为实体类对象
            MapOperator<String, ClickLog> clickLogDataSet = logDataSet.map(new MapFunction<String, ClickLog>() {
                @Override
                public ClickLog map(String log) throws Exception {
                    //使用fastjoin类库提供API解析
                    ClickLog clickLog = JSON.parseObject(log, ClickLog.class);
                    return clickLog;
                }
            });
            clickLogDataSet.printToErr();
    
        }
    }
    
4.3.2flatMap
  • API

    flatMap:将集合中的每个元素变成一个或多个元素,并返回扁片规划之后的结果image-20210130214011519

  • 需求

    将每一条clinkLog转换为如下三个维度

    (年-月-日-时,1)
    (年-月-日,1)
    (年-月,1)

  • 代码实现:

        //todo :b:使用flatMap函数,针对日期时间entryTime,获取不同形式时间日期
        /*
        * entryTime=1577876460000 -》Long 类型
        *
        * 年月:yyyy-MM
        * 年月日:yyyy-MM-dd
        * 年月日时:yyyy-MM-dd:HH
        *
        * */
        FlatMapOperator<ClickLog, String> timeDataSet = clickLogDataSet.flatMap(new FlatMapFunction<ClickLog, String>() {
            @Override
            public void flatMap(ClickLog clickLog, Collector<String> out) throws Exception {
                Long entryTime = clickLog.getEntryTime();
                //使用工具类:lang3包DateFormatUtils
                String month = DateFormatUtils.format(entryTime, "yyyy-MM");
                out.collect(month);
                String day = DateFormatUtils.format(entryTime, "yyyy-MM-dd");
                out.collect(day);
                String hour = DateFormatUtils.format(entryTime, "yyyy-MM-dd:HH");
                out.collect(hour);
            }
        });
        timeDataSet.printToErr();


4.3.3 filter
  • API

    fliter:按照指定的条件对集合中的元素进行过滤,过滤出返回true/符合条件的元素

    image-20210130221320107

  • 需求:

    过滤出clickLog中使用谷歌浏览器访问的日志

  • 代码实现

// TODO: c. 使用filter函数,过滤出符合条件数据:谷歌浏览器
        FilterOperator<ClickLog> filterDataSet = clickLogDataSet.filter(new FilterFunction<ClickLog>() {
            @Override
            public boolean filter(ClickLog clickLog) throws Exception {
                return "谷歌浏览器".equals(clickLog.getBrowserType());
            }
        });
        filterDataSet.printToErr();

4.3.4groupBy
  • API

    groupBy:对集合中的元素按照制定的key进行分组

image-20210130222504664

  • 需求

    对ClickLog按照浏览器类型记为1并分组

  • 代码实现

            //将ClickLog数据类型转换为二元组:Key->BrowserType,Value -> 1
            MapOperator<ClickLog, Tuple2<String, Integer>> tupleDataSet = clickLogDataSet.map(new MapFunction<ClickLog, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(ClickLog clickLog) throws Exception {
                    return Tuple2.of(clickLog.getBrowserType(), 1);
                }
            });
            //todo:d.使用groupBy函数,针对二元组数据进行分组
            UnsortedGrouping<Tuple2<String, Integer>> groupDataset = tupleDataSet.groupBy(0);
    
4.3.5sum
  • API

    sum:按照指定的字段对集合中的元素进行求和

  • 需要:

    统计各个浏览器类型的访问量

  • 代码实现

            //todo:e使用sum函数,对组内数据进行求和
            AggregateOperator<Tuple2<String, Integer>> sumDataSet = groupDataset.sum(1);
            /*
            (qq浏览器,29)
            (火狐,24)
            (360浏览器,23)
            (谷歌浏览器,24)
            * */
            sumDataSet.printToErr();
    
4.3.6min和minBy/max和maxBy
  • API

    min只会求出最小的那个字段,其他的字段不管
    minBy会求出最小的那个字段和对应的其他的字段
    max和maxBy同理

  • 需求

    求最少的访问量以及对应的浏览器类型

  • 代码实现:

    // TODO: f. 使用min函数,指定字段最小值, 只会求出最小的那个字段,其他的字段不管
    AggregateOperator<Tuple2<String, Integer>> minDataSet = sumDataSet.min(1);
    minDataSet.printToErr();//(360浏览器,23)
    // TODO: g. 使用minBy函数,minBy会求出最小的那个字段和对应的其他的字段
    ReduceOperator<Tuple2<String, Integer>> minByDataSet = sumDataSet.minBy(1);
    minByDataSet.printToErr();// (360浏览器,23)
    
4.3.7 aggregate
  • API

    aggregate:按照指定的聚合函数和字段对集合中的元素进行聚合,如SUM,MIN.MAX

  • 需求:

    使用aggregate完成sum和min

//todo:h.使用aggregate函数对分组数据进行聚合操作
AggregateOperator<Tuple2<String, Integer>> aggregateDataSet = groupDataset.aggregate(Aggregations.SUM, 1);
aggregateDataSet.printToErr();
4.3.8reduce和reduceGroup
  • API

    reduce:对集合中的元素进行聚合

image-20210130230107943

reduceGroup:对集合中的元素先进行预聚合在合并结果

image-20210130230632790

  • 需求:

    使用reduce和reduceGroup完成求sum

  • 代码实现

//todo:i.使用reduce函数,对分区过数据进行聚合操作
        ReduceOperator<Tuple2<String, Integer>> reduceDataSet = groupDataset.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                String key = t1.f0;//二元组中的key
                int sum = t1.f1 + t2.f1;//二元组中的值的和
                return Tuple2.of(key, sum);
            }
        });
        reduceDataSet.printToErr();

//todo:j.使用reduceGroup函数,对分区过数据进行聚合操作,推荐使用此函数
        GroupReduceOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> reduceGroupDataSet = groupDataset.reduceGroup(
                new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
                    @Override
                    public void reduce(Iterable<Tuple2<String, Integer>> iter,
                                       Collector<Tuple2<String, Integer>> out) throws Exception {
                        //对组内数据进行聚合操作
                        //todo:定义临时变量
                        String key = null;
                        Integer tmp = 0;
                        for (Tuple2<String, Integer> item : iter) {
                            key = item.f0;
                            tmp += item.f1;
                        }
                        out.collect(Tuple2.of(key,tmp));
                    }
                });
        reduceDataSet.printToErr();
4.3.9union
  • API

    ​ union:将两个集合进行合并单不会去重

image-20210130232829627

  • 需求:

    读取click.log和click2.log并使用union合并结果

  • 代码实现:

 //todo:k.使用union函数,将2个DataSet数据集进行合并
        MapOperator<String, ClickLog> dataSet01 = env.readTextFile("datas/input/click1.log")
                .map(new MapFunction<String, ClickLog>() {
                    @Override
                    public ClickLog map(String log) throws Exception {
                        return JSON.parseObject(log, ClickLog.class);
                    }
                });
        MapOperator<String, ClickLog> dataSet02 = env.readTextFile("datas/input/click2.log")
                .map(new MapFunction<String, ClickLog>() {
                    @Override
                    public ClickLog map(String log) throws Exception {
                        return JSON.parseObject(log, ClickLog.class);
                    }
                });

        UnionOperator<ClickLog> unionDataSet = dataSet01.union(dataSet02);
        System.out.println("dataSet01数据集条目数: " + dataSet01.count());
        System.out.println("dataSet02数据集条目数: " + dataSet02.count());
        System.out.println("unionDataSet数据集条目数: " + unionDataSet.count());
4.3.10distinct
  • API

    distinct:对集合中的元素进行去重

image-20210130234832193

  • 需求:

    对合并后的结果进行去重

  • 代码实现:

// TODO: l. 使用distinct函数,进行数据去重
        DistinctOperator<ClickLog> distinctDataSet = unionDataSet.distinct();// 如果是整条数据相同,进行去重
        // 指定字段,当值相同时,进行去重
        DistinctOperator<ClickLog> resultDataSet = unionDataSet.distinct("browserType");
        System.out.println("指定browserType字段,数据去重以后条目数: " + resultDataSet.count());
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐