第二章、Flink 批处理 API(正在更新)
第二章、Flink 批处理 API掌握Flink批处理SourceOperator掌握Flink批处理TransformationOperator掌握Flink批处理SinkOperator了解Flink的累加器掌握Flink的广播变量掌握Flink的分布式缓存1. API和编程模型1.1APIFlink提供了多个层次的API供开发者使用,越往上抽象程度越高,使用起来越方便;越往下越底层,使用起
第二章、Flink 批处理 API
-
掌握Flink批处理SourceOperator
-
掌握Flink批处理TransformationOperator
-
掌握Flink批处理SinkOperator
-
了解Flink的累加器
-
掌握Flink的广播变量
-
掌握Flink的分布式缓存
1. API和编程模型
1.1API
Flink提供了多个层次的API供开发者使用,越往上抽象程度越高,使用起来越方便;越往下越
底层,使用起来难度越大
https://ci.apache.org/projects/flink/flink-docs-release-1.10/concepts/programming-model.html#levels-of-abstraction
1.2编程模型
Flink应用程序结构主要包含:Source、Transformation、Sink,:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/api_concepts.html#anatomy-of-a-flink-program
Fink DataSet API编程基本步骤:
1、获取执行环境(ExecutionEnvironment)
2.加载、创建初始化数据集(DataSet)
3.对数据集进行各种转换(Transformation)操作,生成新的DataSet
4.指定计算的结果存储在哪个位置
Streaming Processing:https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/#stream-processing
https://ci.apache.org/projects/flink/flink-docs-release-1.10/concepts/programming-model.html#programs-and-dataflows
1.3词频统计WordCount
分别使用java语言和scala语言,基于Flink流式分析引擎对批处理数据进行离线分析,实现大数据经典程序:词频统计WordCount。
1.3.1java版本
从本地文件系统加载文本文件数据,按照单词统计次数,最终按照词频降序排序。
package batcch.com.tunan.flink.start;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.SortPartitionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* 基于Flink引擎实现批处理词频统计WordCount:过滤filter、排序sort等操作
*/
public class BatchWorkCount {
public static void main(String[] args) throws Exception {
//1,执行环境-env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//2、数据源-source
DataSource<String> inputDataSet = env.readTextFile("datas\\wordcount.data");
//3、数据转换-transformation
AggregateOperator<Tuple2<String, Integer>> aggregateDataSet = inputDataSet
//过滤操作
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String line) throws Exception {
return null != line && line.trim().length() > 0;
}
})
//单词分割
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.trim().split("\\W+");
for (String word : words) {
out.collect(word);
}
}
})
//转换二元组,表示单词出现一次
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
})
//分组聚合
.groupBy(0)//只能针对元组类型数据进行数据聚合
.sum(1);
/*
(hadoop,7)
(hive,3)
(flink,1)
(hbase,1)
(spark,9)
*/
//aggregateDataSet.print();
// TODO: 对计算结果,进行降序排序, 设置Operator并行度为1,进行全局排序
/*
(spark,9)
(hadoop,7)
(hive,3)
(flink,1)
(hbase,1)
*/
SortPartitionOperator<Tuple2<String, Integer>> sortDataSet = aggregateDataSet
.sortPartition(1, Order.DESCENDING)
.setParallelism(1);
//4、数据终端-sink
/*
System.out.println(); -> print()
System.err.println(); -> printToErr() ,控制台打印红色字体
*/
sortDataSet.printToErr();
//5、触发执行
}
}
1.3.2 Scala版本
从本地文件加载文本文件数据,按照单词统计次数,最终按照降序排序。
package cn.itcast.flink.start
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._
/**
* 使用Scala语言实现词频统计:WordCount
*/
object FlinkWordCount {
def main(args: Array[String]): Unit = {
// 1. 执行环境-env
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
// 2. 数据源-source
val textDataSet: DataSet[String] = env.readTextFile("datas/wordcount.data")
// 3. 数据转换-transformation
val countDataSet: DataSet[(String, Int)] = textDataSet
// 第一步、过滤数据
.filter(line => null != line && line.trim.length > 0)
// 第二步、将每行数据分割为单词
.flatMap(line => line.trim.toLowerCase.split("\\W+"))
// 第三步、转换为二元组,表示单词出现一次
.map(word => (word, 1))
// 第四步、按照单词分词,累加统计个数
.groupBy(0).sum(1)
// 第五步、按照词频count降序排序
.sortPartition(1, Order.DESCENDING).setParallelism(1)
//4.数据输出-sink
countDataSet.print()
}
}
2. 数据源 Source
2.1基于集合的Source
-
API
一般用于学习测试时模拟数据使用
- env.fromElements(可变参数);
- env.fromColletion(各种集合)
- env.generateSequence(开始,结束)
-
代码演示
package cn.itcast.flink.source; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import java.util.Arrays; /* * DataSet API批处理中数据源:基于集合Source * 1.env.fromElements(可变参数); * 2.env.fromColletion(各种集合) * 3.env.generateSequence(开始,结束) * */ public class BatchSourceCollectionDemo { public static void main(String[] args) throws Exception { //1、执行环境-env ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //2、数据源-source //方式一、可变参数列表 DataSource<String> dataSet01 = env.fromElements("hadoop", "flink", "spark"); dataSet01.printToErr(); //方式二、集合、如列表list DataSource<String> dataSet02 = env.fromCollection(Arrays.asList("hadoop", "spark", "flink")); dataSet02.printToErr(); //方式三、序列 DataSource<Long> dataSet03 = env.generateSequence(1, 10); dataSet03.printToErr(); } }
2.2基于文件的Source
-
API
1.env.readTextFile(本地文件/HDFS文件); //压缩文件也可以
2.env.readCsvFile泛型
Configuration parameters = new Configuration();
parameters.setBoolean(“recursive.file.enumeration”, true);//设置是否递归读取目录
3.env.readTextFile(“目录”).withParameters(parameters); -
代码演示:
package cn.itcast.flink.source; import lombok.Data; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.configuration.Configuration; /** * DataSet API 批处理中数据源:基于文件Source * 1.env.readTextFile(本地文件/HDFS文件); //压缩文件也可以 * 2.env.readCsvFile[泛型]("本地文件/HDFS文件") * Configuration parameters = new Configuration(); * parameters.setBoolean("recursive.file.enumeration", true);//设置是否递归读取目录 * 3.env.readTextFile("目录").withParameters(parameters); */ public class BatchSourceFlieDemo { public static void main(String[] args) throws Exception { //1\执行环境-env ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //2.数据源-source //文本文件、 DataSource<String> dataSet01 = env.readTextFile("datas/wordcount.data"); dataSet01.printToErr(); //csv文件 DataSource<Rating> dataSet02 = env .readCsvFile("datas/u.data") .fieldDelimiter("\t") .pojoType(Rating.class, "userId", "movieId", "rating", "timestamp"); dataSet02.printToErr(); //目录 Configuration parameters = new Configuration(); parameters.setBoolean("recursive.file.enumeration",true); DataSource<String> dataSet03 = env.readTextFile("datas/subDatas").withParameters(parameters); dataSet03.printToErr(); } @Data public static class Rating{ public Integer userId ; public Integer movieId ; public Double rating ; public Long timestamp ; } }
3.数据终端Sink
-
API
1.ds.print直接输出到控制台
2.ds.printToErr()直接输出到控制台,用红色
3.ds.collect将分布式数据收集为本地集合
4.ds.setParallelism(1).writeAsText(“本地/HDFS的path”,WriteMode.OVERWEITE)
-
注意
在输出到path的时候,可以在前面设置并行度,如果
并行度>1,则path为目录
并行度=1,则path为文件名
-
代码演示
package cn.itcast.flink.sink; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSink; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.core.fs.FileSystem; import scala.Tuple2; import java.util.List; /** * DataSet API 批处理中数据终端:基于文件Sink * 1.ds.print 直接输出到控制台 * 2.ds.printToErr() 直接输出到控制台,用红色 * 3.ds.collect 将分布式数据收集为本地集合 * 4.ds.setParallelism(1).writeAsText("本地/HDFS的path",WriteMode.OVERWRITE) * * 注意: 在输出到path的时候,可以在前面设置并行度,如果 * 并行度>1,则path为目录 * 并行度=1,则path为文件名 */ public class BatchSinkFlieDemo { public static void main(String[] args) throws Exception { //1,执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //2.数据源-source:基于文件数据Source DataSource<String> dataSet = env.fromElements("mapreduce", "spark", "flink"); //3.数据终端-sink dataSet.print(); dataSet.printToErr(); //DataSet转换为本地集合,比如列表List List<String> list = dataSet.collect(); System.out.println(list); //保存数据到文本文件 dataSet.writeAsText("datas/sink.txt").setParallelism(1);//文件的并行度为1,写入文件 //保存至scv文件,数据类型必须为二元组 dataSet //todo:使用加强版映射函数:RichMapFunction:加强版的能获取分区号之类的 .map(new RichMapFunction<String, Tuple2<Integer, String>>() { @Override public Tuple2<Integer, String> map(String value) throws Exception { //获取分区索引 int index = getRuntimeContext().getIndexOfThisSubtask(); return new Tuple2<>(index, value); } }) .writeAsCsv("datas/sink-csv.txt", FileSystem.WriteMode.OVERWRITE) .setParallelism(1); //4.执行-execute env.execute(BatchSinkFlieDemo.class.getSimpleName()); } }
4. 数据转换 Transformation
官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/batch/
4.1英文版API解释
4.2中文版API解释
4.3基本函数
4.3.1map
- API
map:将函数作用在集合中的每一个元素上,并返回作用后的结果
-
需求
将click.log中的每一条日志转为javaBean对象
{ "browserType": "360浏览器", "categoryID": 2, "channelID": 3, "city": "昌平", "country": "china", "entryTime": 1577890860000, "leaveTime": 1577898060000, "network": "移动", "produceID": 11, "province": "北京", "source": "必应跳转", "userID": 18 }
封装JSON数据实体类ClickLog:
package cn.itcast.flink.transformation; import lombok.Data; @Data public class ClickLog { //频道ID private long channelId; //产品的类别ID private long categoryId; //产品ID private long produceId; //用户的ID private long userId; //国家 private String country; //省份 private String province; //城市 private String city; //网络方式 private String network; //来源方式 private String source; //浏览器类型 private String browserType; //进入网站时间 private Long entryTime; //离开网站时间 private Long leaveTime; }
-
代码实现
package cn.itcast.flink.transformation; import com.alibaba.fastjson.JSON; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.MapOperator; public class Testclinklog { public static void main(String[] args) throws Exception { //1.执行环境-env ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //2.数据源-source DataSource<String> logDataSet = env.readTextFile("datas/click.log"); //3,数据转换-Transformation //todo:a.使用map函数,将每条日志数据转换为实体类对象 MapOperator<String, ClickLog> clickLogDataSet = logDataSet.map(new MapFunction<String, ClickLog>() { @Override public ClickLog map(String log) throws Exception { //使用fastjoin类库提供API解析 ClickLog clickLog = JSON.parseObject(log, ClickLog.class); return clickLog; } }); clickLogDataSet.printToErr(); } }
4.3.2flatMap
-
API
flatMap:将集合中的每个元素变成一个或多个元素,并返回扁片规划之后的结果
-
需求
将每一条clinkLog转换为如下三个维度
(年-月-日-时,1)
(年-月-日,1)
(年-月,1) -
代码实现:
//todo :b:使用flatMap函数,针对日期时间entryTime,获取不同形式时间日期
/*
* entryTime=1577876460000 -》Long 类型
*
* 年月:yyyy-MM
* 年月日:yyyy-MM-dd
* 年月日时:yyyy-MM-dd:HH
*
* */
FlatMapOperator<ClickLog, String> timeDataSet = clickLogDataSet.flatMap(new FlatMapFunction<ClickLog, String>() {
@Override
public void flatMap(ClickLog clickLog, Collector<String> out) throws Exception {
Long entryTime = clickLog.getEntryTime();
//使用工具类:lang3包DateFormatUtils
String month = DateFormatUtils.format(entryTime, "yyyy-MM");
out.collect(month);
String day = DateFormatUtils.format(entryTime, "yyyy-MM-dd");
out.collect(day);
String hour = DateFormatUtils.format(entryTime, "yyyy-MM-dd:HH");
out.collect(hour);
}
});
timeDataSet.printToErr();
4.3.3 filter
-
API
fliter:按照指定的条件对集合中的元素进行过滤,过滤出返回true/符合条件的元素
-
需求:
过滤出clickLog中使用谷歌浏览器访问的日志
-
代码实现
// TODO: c. 使用filter函数,过滤出符合条件数据:谷歌浏览器
FilterOperator<ClickLog> filterDataSet = clickLogDataSet.filter(new FilterFunction<ClickLog>() {
@Override
public boolean filter(ClickLog clickLog) throws Exception {
return "谷歌浏览器".equals(clickLog.getBrowserType());
}
});
filterDataSet.printToErr();
4.3.4groupBy
-
API
groupBy:对集合中的元素按照制定的key进行分组
-
需求
对ClickLog按照浏览器类型记为1并分组
-
代码实现
//将ClickLog数据类型转换为二元组:Key->BrowserType,Value -> 1 MapOperator<ClickLog, Tuple2<String, Integer>> tupleDataSet = clickLogDataSet.map(new MapFunction<ClickLog, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(ClickLog clickLog) throws Exception { return Tuple2.of(clickLog.getBrowserType(), 1); } }); //todo:d.使用groupBy函数,针对二元组数据进行分组 UnsortedGrouping<Tuple2<String, Integer>> groupDataset = tupleDataSet.groupBy(0);
4.3.5sum
-
API
sum:按照指定的字段对集合中的元素进行求和
-
需要:
统计各个浏览器类型的访问量
-
代码实现
//todo:e使用sum函数,对组内数据进行求和 AggregateOperator<Tuple2<String, Integer>> sumDataSet = groupDataset.sum(1); /* (qq浏览器,29) (火狐,24) (360浏览器,23) (谷歌浏览器,24) * */ sumDataSet.printToErr();
4.3.6min和minBy/max和maxBy
-
API
min只会求出最小的那个字段,其他的字段不管
minBy会求出最小的那个字段和对应的其他的字段
max和maxBy同理 -
需求
求最少的访问量以及对应的浏览器类型
-
代码实现:
// TODO: f. 使用min函数,指定字段最小值, 只会求出最小的那个字段,其他的字段不管 AggregateOperator<Tuple2<String, Integer>> minDataSet = sumDataSet.min(1); minDataSet.printToErr();//(360浏览器,23) // TODO: g. 使用minBy函数,minBy会求出最小的那个字段和对应的其他的字段 ReduceOperator<Tuple2<String, Integer>> minByDataSet = sumDataSet.minBy(1); minByDataSet.printToErr();// (360浏览器,23)
4.3.7 aggregate
-
API
aggregate:按照指定的聚合函数和字段对集合中的元素进行聚合,如SUM,MIN.MAX
-
需求:
使用aggregate完成sum和min
//todo:h.使用aggregate函数对分组数据进行聚合操作
AggregateOperator<Tuple2<String, Integer>> aggregateDataSet = groupDataset.aggregate(Aggregations.SUM, 1);
aggregateDataSet.printToErr();
4.3.8reduce和reduceGroup
-
API
reduce:对集合中的元素进行聚合
reduceGroup:对集合中的元素先进行预聚合在合并结果
-
需求:
使用reduce和reduceGroup完成求sum
-
代码实现
//todo:i.使用reduce函数,对分区过数据进行聚合操作
ReduceOperator<Tuple2<String, Integer>> reduceDataSet = groupDataset.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
String key = t1.f0;//二元组中的key
int sum = t1.f1 + t2.f1;//二元组中的值的和
return Tuple2.of(key, sum);
}
});
reduceDataSet.printToErr();
//todo:j.使用reduceGroup函数,对分区过数据进行聚合操作,推荐使用此函数
GroupReduceOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> reduceGroupDataSet = groupDataset.reduceGroup(
new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public void reduce(Iterable<Tuple2<String, Integer>> iter,
Collector<Tuple2<String, Integer>> out) throws Exception {
//对组内数据进行聚合操作
//todo:定义临时变量
String key = null;
Integer tmp = 0;
for (Tuple2<String, Integer> item : iter) {
key = item.f0;
tmp += item.f1;
}
out.collect(Tuple2.of(key,tmp));
}
});
reduceDataSet.printToErr();
4.3.9union
-
API
union:将两个集合进行合并单不会去重
-
需求:
读取click.log和click2.log并使用union合并结果
-
代码实现:
//todo:k.使用union函数,将2个DataSet数据集进行合并
MapOperator<String, ClickLog> dataSet01 = env.readTextFile("datas/input/click1.log")
.map(new MapFunction<String, ClickLog>() {
@Override
public ClickLog map(String log) throws Exception {
return JSON.parseObject(log, ClickLog.class);
}
});
MapOperator<String, ClickLog> dataSet02 = env.readTextFile("datas/input/click2.log")
.map(new MapFunction<String, ClickLog>() {
@Override
public ClickLog map(String log) throws Exception {
return JSON.parseObject(log, ClickLog.class);
}
});
UnionOperator<ClickLog> unionDataSet = dataSet01.union(dataSet02);
System.out.println("dataSet01数据集条目数: " + dataSet01.count());
System.out.println("dataSet02数据集条目数: " + dataSet02.count());
System.out.println("unionDataSet数据集条目数: " + unionDataSet.count());
4.3.10distinct
-
API
distinct:对集合中的元素进行去重
-
需求:
对合并后的结果进行去重
-
代码实现:
// TODO: l. 使用distinct函数,进行数据去重
DistinctOperator<ClickLog> distinctDataSet = unionDataSet.distinct();// 如果是整条数据相同,进行去重
// 指定字段,当值相同时,进行去重
DistinctOperator<ClickLog> resultDataSet = unionDataSet.distinct("browserType");
System.out.println("指定browserType字段,数据去重以后条目数: " + resultDataSet.count());
更多推荐
所有评论(0)