Flink 三:Flink 流处理 API
Flink 三:Flink 流处理 API主要内容结构:1.流处理相关概念数据的时效性对网站的实时监控对异常日志的监控流式计算和批量计算Batch Analytics 批量计算:统一收集数据-》存储到DB-》对数据进行批量处理,就是传统意义上使用类似于Map Reduce、Hive、Spark Batch等,对作业进行分析、处理、生成离线报表Streaming Analytics流式计算:对数据流
Flink 三:Flink 流处理 API
主要内容结构:
1.流处理相关概念
-
数据的时效性
-
对网站的实时监控
-
对异常日志的监控
-
-
流式计算和批量计算
-
Batch Analytics 批量计算:统一收集数据-》存储到DB-》对数据进行批量处理,就是传统意义上使用类似于Map Reduce、Hive、Spark Batch等,对作业进行分析、处理、生成离线报表
-
Streaming Analytics流式计算:对数据流进行处理,如果用流式分析引擎如Storm,Flink实时处理分析数据,应用较多的场景如实时大屏,实时报表
-
它们的主要区别是:
-
与批量计算,慢慢积累数据不同,流计算立刻计算,数据持续流动,完成之后就丢弃;
-
批量计算是维护一张表,对表进行实施各种计算逻辑。流式计算相反,是必须先定义好逻辑,提交到流式计算系统,这对计算作业逻辑在整个运行期间是不可更改的;
-
计算结果上,批量计算对全部数据进行计算后传输结果,流式计算是每次小批量计算后,结果可以立刻实时化展现
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.
2. DataStream
-
有边界流(bounded stream):有定义流的开始,也有定义流的结束。有界可以在摄取虽有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常称为批处理
-
无界流(unbounded stream):有定义流的开始,也有定义流的结束。他们会无休止地产生数据。无休止地数据必须持续处理,即数据被摄取后需要立刻处理。不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件例如事件发生地顺序,以便能够推断结果地完整性。
DataStream数据流有5个子类,截图如下:
3.API和编程模型
和批处理类似,Flink的流处理也支持多个层次的api并包含三个方面:
Source/Transformation/Sink
step1、Obtain an execution environment
step2、Load/create the initial data
step3、Specify transformations on this data
step4、Specify where to put the results of your computations
step5、Trigger the program execution
3.1词频统计(java语言)
使用java语言编写从TCP Socket读取数据,进行词频统计WordCount,结果打印至控制台。
package cn.itcast.flink.start; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * 基于 Flink 流计算引擎:从TCP Socket消费数据,实时词频统计WordCount */ public class StreamWordCount { public static void main(String[] args) throws Exception { // 1. 执行环境-env:流计算执行环 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 2. 数据源-source:Socket接收数据 DataStreamSource<String> inputDataStream = env.socketTextStream("node1.itcast.cn", 9999); // 3. 转换处理-transformation:调用DataSet函数,处理数据 SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream //a.过滤数据 .filter(new FilterFunction<String>() { @Override public boolean filter(String line) throws Exception { return null != line && line.trim().length() > 0; } }) //b.分割单词 .flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String line, Collector<String> out) throws Exception { String[] words = line.trim().toLowerCase().split("\\W+"); for (String word : words) { out.collect(word); } } }) //c.转换二元组,表示每个单词出现一次 .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String word) throws Exception { return Tuple2.of(word, 1); } }) //d.按照单词分组及对组内聚合操作 .keyBy(0).sum(1); //d.数据终端-sink;结果数据打印在控制台 resultDataStream.print(); env.execute(StreamWordCount.class.getSimpleName()); } }
测试端口:nc -l node1.itcast.cn 9999
3.2 词频统计(Scala 语言)
使用Scala语言编写从TCP Socket读取数据,进行词频统计WordCount,结果打印至控制台。
import org.apache.flink.streaming.api.scala._ /** * 使用Scala语言编程实现Flink实时词频统计WordCount,从TCP Socket读取数据,分析结果打印控制台。 */ object FlinkWordCount { def main(args: Array[String]): Unit = { //1.执行环境-env //val env : //val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI() env.setParallelism(2) //2-数据源-source val inputDataStream: DataStream[String] = env.socketTextStream("node1.itcast.cn", 9999) //3数据转换-transformation val resultDataStream: DataStream[(String, Int)] = inputDataStream // a. 过滤数据,如空字符串 .filter(line => null != line && line.trim.length > 0) // b. 将每行数据分割为字符 .flatMap(line => line.trim.toLowerCase().split("\\W+")) // c. 转换为二元组,表示每个单词出现一次 .map(word => (word, 1)) // d. 按照单词分组和组内聚合计数 .keyBy(0).sum(1) // 4. 数据终端-sink resultDataStream.printToErr() // 5. 触发执行-execute env.execute(FlinkWordCount.getClass.getSimpleName.stripSuffix("$")) } }
4. Source 数据源
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/datastream_api.html#data-sources
4.1 基于Socket的Source
一般用于学习测试
-
需求:
1.在node1上使用 nc -lk 9999 向指定端口发送数据 nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据 如果没有该命令可以下安装:yum install -y nc 2.使用Flink编写流处理应用程序实时统计单词数量
-
代码实现:3.1可见
4.2 基于集合的Source
一般用于学习测试,和批处理的API类似,不再演示。
package cn.itcast.flink.source.basic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Arrays; /** * Flink 流计算数据源:基于集合的Source,分别为可变参数、集合和自动生成数据 */ public class StreamSourceCollectionDemo { public static void main(String[] args) throws Exception { // 1. 执行环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2) ; // 2. 数据源 DataStreamSource<String> dataStream01 = env.fromElements("spark", "flink", "mapreduce"); dataStream01.printToErr(); DataStreamSource<String> dataStream02 = env.fromCollection(Arrays.asList("spark", "flink", "mapreduce")); dataStream02.print(); DataStreamSource<Long> dataStream03 = env.generateSequence(1, 10); dataStream03.printToErr(); // 5. 触发执行-execute env.execute(StreamSourceCollectionDemo.class.getSimpleName()); } }
4.3 基于文件的Source
一般用于学习测试,和批处理的API类似,不再演示
package cn.itcast.flink.source.basic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * Flink 流计算数据源:基于文件的Source */ public class StreamSourceFileDemo { public static void main(String[] args) throws Exception { // 1. 执行环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2) ; // 2. 数据源 DataStreamSource<String> dataStream = env.readTextFile("datas/wordcount.data"); dataStream.printToErr(); // 5. 触发执行-execute env.execute(StreamSourceFileDemo.class.getSimpleName()); } }
4.4 自定义Source:随机生成数据
-
API
一般用于学习测试,模拟生成一些数据
Flink提供数据源接口,可以实现自定义数据源,不同的接口有不同的功能,分类如下:
SourceFunction:非并行数据源(并行度parallelism=1) RichSourceFunction:多功能非并行数据源(并行度parallelism=1) ParallelSourceFunction:并行数据源(并行度parallelism>=1) RichParallelSourceFunction:多功能并行数据源(parallelism>=1),Kafka数据源使用该接口
-
需求
每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
-
要求:
-
随机生成订单ID:UUID ·
-
随机生成用户ID:0-2
-
随机生成订单金额:0-100
-
时间戳为当前系统时间:current_timestamp
-
-
代码实现
package cn.itcast.flink.source.customer; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.streaming.api.datastream.DataStreamSource;
4.5 自定义Source:MySQL
-
需求
实际开发中没经常会实时接收一些数据,要和MySQL中存储的一些规则进行匹配,那么这时候就可以使用Flink自定义数据源从MySQL中读取数据
需求:从MySQL中实时加载数据,要求MySQL中的数据有变化,也能被实时加载出来
-
准备数据
CREATE DATABASE IF NOT EXISTS db_flink ; USE db_flink ; CREATE TABLE `t_student` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, `age` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; INSERT INTO `t_student` VALUES ('1', 'jack', 18); INSERT INTO `t_student` VALUES ('2', 'tom', 19); INSERT INTO `t_student` VALUES ('3', 'rose', 20); INSERT INTO `t_student` VALUES ('4', 'tom', 19); INSERT INTO `t_student` VALUES ('5', 'jack', 18); INSERT INTO `t_student` VALUES ('6', 'rose', 20); INSERT INTO `t_student` VALUES ('9', 'zhangsan2', 19); INSERT INTO `t_student` VALUES ('10', 'lisi2', 21);
-
代码实现
package cn.itcast.flink.source; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.concurrent.TimeUnit; /** * 从MySQL中实时加载数据:要求MySQL中的数据有变化,也能被实时加载出来 */ public class StreamSourceMySQLDemo { @Data @NoArgsConstructor @AllArgsConstructor public static class Student { private Integer id; private String name; private Integer age; } /** * 自定义数据源,实时从MySQL表获取数据,实现接口RichParallelSourceFunction */ public static class MySQLSource extends RichParallelSourceFunction<Student>{ // 标识符,是否实时接收数据 private boolean isRunning = true ; private Connection conn = null; private PreparedStatement pstmt = null; private ResultSet result = null ; private Integer whereId = 0 ; @Override public void open(Configuration parameters) throws Exception { //1.加载驱动 Class.forName("com.mysql.jdbc.Driver"); //2创建连接 conn= DriverManager.getConnection( "jdbc:mysql://node1.itcast.cn:3306/?useUnicode=true&characterEncoding=utf-8&useSSL=false", "root", "123456" ); //3.创建PreparedStatement pstmt = conn.prepareStatement("select id,name,age from db_flink.t_student WHERE id > ?"); } @Override public void run(SourceContext<Student> ctx) throws Exception { while(isRunning){ //1、执行查询 pstmt.setInt(1,whereId); result = pstmt.executeQuery(); //2. 遍历查询结果,收集数据 while(result.next()){ Integer id = result.getInt("id"); String name = result.getString("name") ; Integer age = result.getInt("age") ; // 输出 ctx.collect(new Student(id, name, age)); whereId = id ; } // 每隔3秒查询一次 TimeUnit.SECONDS.sleep(3); } } @Override public void close() throws Exception { if (null!=result) result.close(); if (null!=pstmt) pstmt.close(); if (null!=conn) conn.close(); } @Override public void cancel() { isRunning = false ; } } public static void main(String[] args) throws Exception { // 1. 执行环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 2. 数据源-source DataStreamSource<Student> studentDataStream = env.addSource(new MySQLSource()); // 3. 数据终端-sink studentDataStream.printToErr(); // 5. 应用执行-execute env.execute(StreamSourceMySQLDemo.class.getSimpleName()); } }
4.6 Kafka Source
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/index.html
4.6.1 API及其版本
Flink 里已经提供了一些绑定的 Connector,例如 Kafka Source 和 Sink,Elasticsearch Sink等。读写 Kafka、ES、RabbitMQ 时可以直接使用相应 connector 的 API 即可,虽然该部分是Flink 项目源代码里的一部分,但是真正意义上不算作 Flink 引擎相关逻辑,并且该部分没有打包在二进制的发布包里面。所以在提交 Job 时候需要注意, job 代码 jar 包中一定要将相应的connetor 相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常。
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html
4.6.2 参数设置
以下参数都必须/建议设置
1.订阅的主题:topic
2.反序列化规则:deserialization 3.消费者属性-集群地址:bootstrap.servers 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理):groupId 5.消费者属性-offset重置规则,如earliest/latest...:offset
6.动态分区检测:dynamic partition detection
4.6.3 Kafka命令
启动Kafka和Zookeeper命令,针对讲师提供虚拟机: zookeeper-daemon.sh start kafka-daemon.sh start ● 查看当前服务器中的所有topic /export/server/kafka/bin/kafka-topics.sh --list --bootstrap-server node1.itcast.cn:9092 ● 创建topic /export/server/kafka/bin/kafka-topics.sh --create --topic flink-topic \ --bootstrap-server node1.itcast.cn:9092 --replication-factor 1 --partitions 3 ● 查看某个Topic的详情 /export/server/kafka/bin/kafka-topics.sh --describe --topic flink-topic \ --bootstrap-server node1.itcast.cn:9092 ● 删除topic /export/server/kafka/bin/kafka-topics.sh --delete --topic flink-topic \ --bootstrap-server node1.itcast.cn:9092 ● 发送消息 /export/server/kafka/bin/kafka-console-producer.sh --topic flink-topic \ --broker-list node1.itcast.cn:9092 ● 消费消息 /export/server/kafka/bin/kafka-console-consumer.sh --topic flink-topic \ --bootstrap-server node1.itcast.cn:9092 --from-beginning ● 修改分区 /export/server/kafka/bin/kafka-topics.sh --alter --topic flink-topic \ --bootstrap-server node1.itcast.cn:9092 --partitions 4
4.6.4 代码实现
Flink 实时从Kafka消费数据,底层调用Kafka New Consumer API,演示案例代码如下:
4.6.5 Kafka 消费起始位置
kafka可以被看成一个无限的流,里面的流事数据是短暂的存在的,如果不消费,消息就过期滚动没了。涉及这个问题:如果开始消费,就要定一下从什么位置开始。
-
第一、earliest:从最起始位置开始消费,当然不一定是从0开始,因为如果数据过期就清掉了,所以可以理解为从现存的数据里最小位置开始消费;
-
第二、latest:从最末位置开始消费
-
第三、per-partition assignment:对每个分区都指定一个offset,再从offset位置开始消费。
默认情况下,从kafka消费数据时,采用的是:latest,最新偏移量开始消费数据。
在Flink Kafka Consumer 库中,允许用户配置从每个分区的哪个位置position开始消费数据,具体说明如下所示:
演示代码:
package cn.itcast.flink.source; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.kafka.clients.CommonClientConfigs; import java.util.HashMap; import java.util.Properties; public class StreamSourceKafkaOffsetDemo { public static void main(String[] args) throws Exception { //1.执行环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); // 2. 数据源-source:从Kafka 消费数据 // a. Kafka Consumer消费者配置属性设置 Properties props = new Properties() ; props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "node1.itcast.cn:9092"); props.setProperty("group.id","test-1001"); //b.创建FlinkKafkaConsumer对象 FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>( "flink-topic", // Topic 名称 new SimpleStringSchema(), // props // ) ; // TODO: 1、Flink从topic中最初的数据开始消费 //kafkaConsumer.setStartFromEarliest() ; // TODO: 2、Flink从topic中最新的数据开始消费 //kafkaConsumer.setStartFromLatest(); // TODO: 3、Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数 //kafkaConsumer.setStartFromGroupOffsets() ; // TODO: 4、Flink从topic中指定的offset开始,这个比较复杂,需要手动指定offset HashMap<KafkaTopicPartition, Long> offsets = new HashMap<>(); offsets.put(new KafkaTopicPartition("flink-topic", 0), 28L); offsets.put(new KafkaTopicPartition("flink-topic", 1), 94L); offsets.put(new KafkaTopicPartition("flink-topic", 2), 108L); //kafkaConsumer.setStartFromSpecificOffsets(offsets); // TODO: 5、Flink从topic中指定的时间点开始消费,指定时间点之前的数据忽略 kafkaConsumer.setStartFromTimestamp(1603099781484L); // c. 添加数据源 DataStreamSource<String> kafkaDataStream = env.addSource(kafkaConsumer); // 3. 数据终端-sink kafkaDataStream.printToErr(); // 4. 触发执行-execute env.execute(StreamSourceKafkaOffsetDemo.class.getSimpleName()) ; } }
注意:开启 checkpoint 时 offset 是 Flink 通过状态 state 管理和恢复的,并不是从 kafka 的 offset 位置恢复。在 checkpoint 机制下,作业从最近一次checkpoint 恢复,本身是会回放部分 历史数据,导致部分数据重复消费,Flink 引擎仅保证计算状态的精准一次,要想做到端到端精准 一次需要依赖一些幂等的存储系统或者事务操作。
4.6.6 Kafka 分区发现
实际的生产部环境中可能有这样一些需求,比如:
-
场景一:有一个 Flink 作业需要将五份数据聚合到一起,五份数据对应五个 kafka topic,随着业务增长,新增一类数据,同时新增了一个 kafka topic,如何在不重启作业的情况下作业自动感知新的 topic。
-
场景二:作业从一个固定的 kafka topic 读数据,开始该 topic 有 6 个 partition,但随着业务的增长数据量变大,需要对 kafka partition 个数进行扩容,由 6 个扩容到 12。该情况下如何在不重启作业情况下动态感知新扩容的 partition?
针对上面的两种场景,首先在构建 FlinkKafkaConsumer 时的 properties 中设置 flink.partition-discovery.interval-millis 参数为非负值,表示开启动态发现的开关,及设置的时间间隔。此时FlinkKafkaConsumer内部会启动一个单独的线程定期去Kafka获取最新的meta信息。
-
针对场景一,还需在构建 FlinkKafkaConsumer 时,topic 的描述可以传一个正则表达式描述的 pattern。每次获取最新 kafka meta 时获取正则匹配的最新 topic 列表。
-
针对场景二,设置前面的动态发现参数,在定期获取 kafka 最新 meta 信息时会匹配新的partition。为了保证数据的正确性,新发现的 partition 从最早的位置开始读取。
5. Transformation 数据转换
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/index.html
5.1 说明
流处理的很多API和批处理类似,也包括一系列的Transformation操作,如map、flatMap、filter、sum、reduce……等等,所以这些类似的就不再一一讲解,主要讲解和批处理不一样的一些操作。
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/index.html
整体来说,流式数据上的操作可以分为四类:
-
第一类是对于单条记录的操作,比如筛除掉不符合要求的记录(Filter 操作),或者将每条记录都做一个转换(Map 操作)
-
第二类是对多条记录的操作。比如说统计一个小时内的订单总成交量,就需要将一个小时内的所有订单记录的成交量加到一起。为了支持这种类型的操作,就得通过 Window 将需要的记录关联到一起进行处理
-
第三类是对多个流进行操作并转换为单个流。例如,多个流可以通过 Union、Join 或 Connect 等操作合到一起。这些操作合并的逻辑不同,但是它们最终都会产生了一个新的统一的流,从而可以进行一些跨流的操作。
-
第四类是DataStream支持与合并对称的拆分操作,即把一个流按一定规则拆分为多个流(Split 操作),每个流是之前流的一个子集,这样我们就可以对不同的流作不同的处理。
5.2 英文版API解释
DataStream中常用API 函数,相关说明如下所示:
5.3 中文版API解释
5.4 keyBy
按照指定的key来对流中的数据进行分组,前面入门案例中已经演示过
注意:流处理中没有groupBy,而是keyBy
5.5 union和connect
-
API
union:可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First In First Out)的模式合并,且不去重。
connect:提供了和union类似的功能,用来连接两个数据流,它与union的区别在于:
-
’connect只能连接两个数据流,union可以连接多个数据流;connect所连接的两个数据流的数据类型可以不一致,union所连
-
connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。
两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。
-
需求
将两个String类型的流进行union
将一个String类型和一个Long类型的流进行connect
-
代码实现
package cn.itcast.flink.transformation; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; /** * Flink 流计算中转换函数:合并union和连接connect */ public class StreamUnionConnectDemo { public static void main(String[] args) throws Exception { // 1. 执行环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); // 2. 数据源-source DataStreamSource<String> dataStream01 = env.fromElements("A", "B", "C", "D"); DataStreamSource<String> dataStream02 = env.fromElements("aa", "bb", "cc", "dd"); DataStreamSource<Integer> dataStream03 = env.fromElements(1, 2, 3, 4); // 3. 数据转换-transformation // TODO: 相同类型DataStream进行union合并操作 DataStream<String> unionDataStream = dataStream01.union(dataStream02); unionDataStream.printToErr(); // TODO: 将2个DataStream进行连接connect操作 ConnectedStreams<String, Integer> connectDataStream = dataStream01.connect(dataStream03); SingleOutputStreamOperator<String> dataStream = connectDataStream.map( new CoMapFunction<String, Integer, String>() { @Override public String map1(String value) throws Exception { return "map1 -> " + value; } @Override public String map2(Integer value) throws Exception { return "map2 -> " + value; } } ); dataStream.print(); // 5. 应用执行 env.execute(StreamUnionConnectDemo.class.getSimpleName()); } }
5.6 split和select
-
API
Split就是将一个流分成多个流,Select就是获取分流后对应的数据
-
需求:
对流中的数据按照奇数和偶数进行分流,并获取分流后的数据
-
代码实现:
package cn.itcast.flink.transformation; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.util.Arrays; /** * Flink 流计算中转换函数:split流的拆分和select流的选择 */ public class StreamSplitSelectDemo { public static void main(String[] args) throws Exception { // 1. 执行环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 2. 数据源-source DataStreamSource<Long> dataStream = env.generateSequence(1, 20); // 3. 数据转换-transformation // TODO: 流的拆分,按照奇数和偶数拆分,使用split函数 SplitStream<Long> splitDataStream = dataStream.split(new OutputSelector<Long>() { @Override public Iterable<String> select(Long value) { String flag = value % 2 == 0 ? "even" : "odd" ; return Arrays.asList(flag); } }); // 获取拆分后的流 DataStream<Long> evenDataStream = splitDataStream.select("even"); //evenDataStream.print(); DataStream<Long> oddDataStream = splitDataStream.select("odd"); //oddDataStream.printToErr(); // TODO: 流的拆分,方式二,调用process函数 OutputTag<Long> evenTag = new OutputTag<Long>("even"){}; OutputTag<Long> oddTag = new OutputTag<Long>("odd"){}; SingleOutputStreamOperator<Long> processDataStream = dataStream.process( new ProcessFunction<Long, Long>() { @Override public void processElement(Long value, Context ctx, Collector<Long> out) throws Exception { if(value % 2 == 0){ ctx.output(evenTag, value); }else{ ctx.output(oddTag, value); } } } ); // 获取分离的流 DataStream<Long> evenStream = processDataStream.getSideOutput(evenTag); evenStream.print(); DataStream<Long> oddStream = processDataStream.getSideOutput(oddTag); oddStream.printToErr(); // 5. 应用执行-execute env.execute(StreamSplitSelectDemo.class.getSimpleName()); } }
5.7 分区
-
API
recale分区,基于上下游Operator并行度,将记录以循环的方式输出到下游Operator每个实例。
-
举例
上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度 上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。
若上游并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上 游另两个并行度将记录输出到下游另一个并行度上。
-
需求
对流中的元素使用各种分区,并输出
-
代码实现
package cn.itcast.flink.transformation; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * Flink 流计算中转换函数:对流数据进行分区,函数如下: * global、broadcast、forward、shuffle、rebalance、rescale、partitionCustom */ public class StreamRepartitionDemo { public static void main(String[] args) throws Exception { // 1. 执行环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); // 2. 数据源-source DataStreamSource<Tuple2<Integer, String>> dataStream = env.fromElements( Tuple2.of(1, "a"), Tuple2.of(2, "b"), Tuple2.of(3, "c"), Tuple2.of(4, "d") ); //dataStream.printToErr(); // 3. 数据转换-transformation // TODO: 1、global函数,将所有数据发往1个分区Partition DataStream<Tuple2<Integer, String>> globalDataStream = dataStream.global(); // globalDataStream.print(); // TODO: 2、broadcast函数, 广播数据 DataStream<Tuple2<Integer, String>> broadcastDataStream = dataStream.broadcast(); //broadcastDataStream.printToErr(); // TODO: 3、forward函数,上下游并发一样时 一对一发送 DataStream<Tuple2<Integer, String>> forwardDataStream = dataStream.forward(); //forwardDataStream.print().setParallelism(1) ; // TODO: 4、shuffle函数,随机均匀分配 DataStream<Tuple2<Integer, String>> shuffleDataStream = dataStream.shuffle(); //shuffleDataStream.printToErr(); // TODO: 5、rebalance函数,轮流分配 DataStream<Tuple2<Integer, String>> rebalanceDataStream = dataStream.rebalance(); //rebalanceDataStream.print() ; // TODO: 6、rescale函数,本地轮流分配 DataStream<Tuple2<Integer, String>> rescaleDataStream = dataStream.rescale(); //rescaleDataStream.printToErr(); // TODO: 7、partitionCustom函数,自定义分区规则 DataStream<Tuple2<Integer, String>> customDataStream = dataStream.partitionCustom( new Partitioner<Integer>() { @Override public int partition(Integer key, int numPartitions) { return key % 2; } }, 0 ); //customDataStream.print(); // 4. 数据终端-sink env.execute(StreamRepartitionDemo.class.getSimpleName()); } }
6. Sink 数据终端
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sinks
6.1 基于控制台和文件的Sink
直接参考批处理的API即可,学习测试会使用,开发中更多的是数据实时处理统计分析完之后存入MySQL/Kafka/Redis/HBase......
案例演示:将词频统计结果数据存储至文本文件中,代码如下所示:
package cn.itcast.flink.sink; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class StreamSinkFileDemo { public static void main(String[] args) throws Exception { // 1. 执行环境-env:流计算执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1) ; // 2. 数据源-source:Socket接收数据 DataStreamSource<String> inputDataStream = env.socketTextStream("node1.itcast.cn", 9999); // 3. 转换处理-transformation:调用DataSet函数,处理数据 SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream // a. 过滤数据 .filter(new FilterFunction<String>() { @Override public boolean filter(String line) throws Exception { return null != line && line.trim().length() > 0; } }) // b. 分割单词 .flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String line, Collector<String> out) throws Exception { String[] words = line.trim().split("\\W+"); for (String word : words) { out.collect(word); } } }) // c. 转换二元组,表示每个单词出现一次 .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String word) throws Exception { return Tuple2.of(word, 1); } }) // d. 按照单词分组及对组内聚合操作 .keyBy(0).sum(1); // d. 数据终端-sink:数据终端-sink:保存至文件 resultDataStream .setParallelism(1) .writeAsText("datas/stream-output.txt", FileSystem.WriteMode.OVERWRITE); // e. 执行应用-execute env.execute(StreamSinkFileDemo.class.getSimpleName()) ; } }
6.2 自定义Sink:MySQL
-
需求:
将Flink集合中的数据通过自定义Sink保存到MySQL
-
代码实现:
package cn.itcast.flink.sink.mysql; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; /** * 案例演示:自定义Sink,将数据保存至MySQL表中,继承RichSinkFunction */ public class StreamSinkMySQLDemo { @Data @NoArgsConstructor @AllArgsConstructor private static class Student{ private Integer id ; private String name ; private Integer age ; } /** * 自定义Sink,将DataStream数据写入到外部存储MySQL数据库表中 */ private static class MySQLSink extends RichSinkFunction<Student> { private Connection conn = null; private PreparedStatement pstmt = null; // 计数 private Integer counter = 0 ; @Override public void open(Configuration parameters) throws Exception { // 1. 加载驱动 Class.forName("com.mysql.jdbc.Driver"); // 2. 创建连接 conn = DriverManager.getConnection( "jdbc:mysql://node1.itcast.cn:3306/?useUnicode=true&characterEncoding=utf-8&useSSL=false", "root", "123456" ); // 3. 创建PreparedStatement pstmt = conn.prepareStatement("insert into db_flink.t_student (id, name, age) values (?, ?, ?)"); } @Override public void invoke(Student student, Context context) throws Exception { try{ // 设置参数的值 pstmt.setInt(1, student.id); pstmt.setString(2, student.name); pstmt.setInt(3, student.age); // 加入批次 pstmt.addBatch(); counter ++ ; if(counter >= 10){ pstmt.executeBatch(); // 批量插入 counter = 0 ; } }catch (Exception e){ e.printStackTrace(); } } @Override public void close() throws Exception { try{ if(counter > 0){ // 批量插入 pstmt.executeBatch(); } }catch (Exception e){ e.printStackTrace(); }finally { if(null != pstmt) pstmt.close(); if(null != conn) conn.close(); } } } public static void main(String[] args) throws Exception { // 1. 执行环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ; env.setParallelism(1); // 2. 数据源-source DataStreamSource<Student> inputDataStream = env.fromElements( new Student(13, "wangwu", 20), new Student(14, "zhaoliu", 19), new Student(15, "wangwu", 20), new Student(16, "zhaoliu", 19) ); // 3. 数据终端-sink inputDataStream.addSink(new MySQLSink()); // 4. 应用执行-execute env.execute(StreamSinkMySQLDemo.class.getSimpleName()); } }
此外,从Flink 1.11开始,提供JDBC Connector,更加方便保存数据至RDBMs表中,演示保存数据MySQL数据库表中,代码如下所示:
package cn.itcast.flink.sink.mysql; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.sql.PreparedStatement; import java.sql.SQLException; /** * Flink 流计算,官方自带Connector,将数据保存写入RDBMs数据库表中,比如MySQL表中 */ public class StreamSinkJdbcDemo { @Data @NoArgsConstructor @AllArgsConstructor private static class Student{ private Integer id ; private String name ; private Integer age ; } public static void main(String[] args) throws Exception { // 1. 执行环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 2. 数据源-source DataStreamSource<Student> studentDataStream = env.fromElements( new Student(21, "wangwu3", 20), new Student(22, "zhaoliu4", 19), new Student(23, "wangwu5", 20), new Student(24, "zhaoliu6", 19) ); // 3. 数据终端-sink studentDataStream.addSink( JdbcSink.sink( "insert into db_flink.t_student (id, name, age) values (?, ?, ?)", // new JdbcStatementBuilder<Student>(){ @Override public void accept(PreparedStatement pstmt, Student student) throws SQLException { pstmt.setInt(1, student.id); pstmt.setString(2, student.name); pstmt.setInt(3, student.age); } }, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withDriverName("com.mysql.jdbc.Driver") .withUrl("jdbc:mysql://node1.itcast.cn:3306/") .withUsername("root") .withPassword("123456") .build() ) ); // 4. 触发执行-execute env.execute(StreamSinkJdbcDemo.class.getSimpleName());
6.3 Kafka Sink
-
需求
将Flink集合中的数据通过自定义Sink保存到Kafka
-
代码实现
package cn.itcast.flink.sink.kafka; import cn.itcast.flink.source.mysql.StreamSourceMySQLDemo; import com.alibaba.fastjson.JSON; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.ProducerRecord; import javax.annotation.Nullable; import java.nio.charset.StandardCharsets; import java.util.Properties; /** * 案例演示:将数据保存至Kafka Topic中,直接使用官方提供Connector * /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic flink-topic */ public class StreamSinkKafkaDemo { @Data @NoArgsConstructor @AllArgsConstructor private static class Student{ private Integer id ; private String name ; private Integer age ; } /** * 自定义KafkaSerializationSchema实现类 */ private static class KafkaSchema implements KafkaSerializationSchema<String> { private String topic ; public KafkaSchema(String topicName){ this.topic = topicName ; } @Override public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) { ProducerRecord<byte[], byte[]> record = new ProducerRecord<>( topic, element.getBytes(StandardCharsets.UTF_8) ); return record; } } public static void main(String[] args) throws Exception { // 1. 执行环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ; env.setParallelism(1); // 2. 数据源-source DataStreamSource<StreamSourceMySQLDemo.Student> studentDataStream = env.addSource( new StreamSourceMySQLDemo.MySQLSource() ); // 3. 数据转换-transformation SingleOutputStreamOperator<String> jsonDataStream = studentDataStream.map( new MapFunction<StreamSourceMySQLDemo.Student, String>() { @Override public String map(StreamSourceMySQLDemo.Student student) throws Exception { return JSON.toJSONString(student); } } ); // 4. 数据终端-sink String topic = "flink-topic" ; // a. Kafka 生产者配置属性 Properties props = new Properties(); props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "node1.itcast.cn:9092"); // b. Kafka 数据序列化Schema信息x KafkaSerializationSchema<String> kafkaSchema = new KafkaSchema(topic); // c. 创建FlinkKafkaProducer对象 FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>( topic, // kafkaSchema, // props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE // ); // d. 添加Sink jsonDataStream.addSink(kafkaProducer); // 5. 应用执行-execute env.execute(StreamSinkKafkaDemo.class.getSimpleName()); } }
6.4 Redis Sink
-
API
通过Flink 操作Redis 其实可以通过传统的Jedis 连接池JedisPool 进行Redis 的相关操作,但 是Flink 提供了专门操作Redis 的RedisSink,使用起来更方便,而且不用考虑性能的问题,接下来将主要介绍RedisSink 如何使用。 https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
RedisSink 核心类是RedisMapper 是一个接口,使用时我们要编写自己的redis 操作类实现 这个接口中的三个方法,如下所示
1.getCommandDescription() :
设置使用的Redis 数据结构类型,和key 的名称,通过RedisCommand 设置数据结构类型
2.String getKeyFromData(T data): 设置value 中的键值对key的值 3.String getValueFromData(T data); 设置value 中的键值对value的值
使用RedisCommand设置数据结构类型时和redis结构对应关系
可以连接到不同Redis环境(单机Redis服务、集群Redis服务及Sentinel Redis服务),配置 Config:
-
需求
将Flink集合中的数据通过自定义Sink保存到Redis
-
代码实现
package cn.itcast.flink.sink.redis; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; import org.apache.flink.util.Collector; /** * 案例演示:将数据保存至Redis中,直接使用官方提供Connector * https://bahir.apache.org/docs/flink/current/flink-streaming-redis/ */ public class StreamSinkRedisDemo { /** * 自定义RedisMapper,实现其中三个方法,分别为命令、key和Value */ private static class StreamRedisMapper implements RedisMapper<Tuple2<String, Integer>> { @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET, "wordcount"); } @Override public String getKeyFromData(Tuple2<String, Integer> data) { return data.f0; } @Override public String getValueFromData(Tuple2<String, Integer> data) { return Integer.toString(data.f1); } } public static void main(String[] args) throws Exception { // 1. 执行环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ; env.setParallelism(1); // 2. 数据源-source:Socket接收数据 DataStreamSource<String> inputDataStream = env.socketTextStream("node1.itcast.cn", 9999); // 3. 转换处理-transformation:调用DataSet函数,处理数据 SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream // a. 过滤数据 .filter(new FilterFunction<String>() { @Override public boolean filter(String line) throws Exception { return null != line && line.trim().length() > 0; } }) // b. 分割单词 .flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String line, Collector<String> out) throws Exception { String[] words = line.trim().split("\\W+"); for (String word : words) { out.collect(word); } } }) // c. 转换二元组,表示每个单词出现一次 .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String word) throws Exception { return Tuple2.of(word, 1); } }) // d. 按照单词分组及对组内聚合操作 .keyBy(0).sum(1); // 4. 数据终端-sink // a. Redis 服务配置设置 FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder() .setHost("node1.itcast.cn") .setPort(6379) .setDatabase(0) .setMinIdle(1) .setMaxIdle(8) .setMaxTotal(8) .build(); // b. 创建RedisSink对象 RedisSink<Tuple2<String, Integer>> redisSink = new RedisSink<>( config, new StreamRedisMapper() ) ; // c. 添加Sink resultDataStream.addSink(redisSink); // 5. 触发执行 env.execute(StreamSinkRedisDemo.class.getSimpleName()); } }
附录一、创建Maven模块
1)、Maven 工程结构
Maven Module模块GAV三要素:
<parent> <artifactId>course-flink</artifactId> <groupId>cn.itcast.flink</groupId> <version>1.0.0</version> </parent> <artifactId>flink-day03</artifactId>
2)、POM 文件内容
Maven 工程POM文件中内容(依赖包):
<repositories> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.8</java.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> <flink.version>1.10.0</flink.version> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <mysql.version>5.1.48</mysql.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.11</artifactId> <version>1.11.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop-2-uber</artifactId> <version>2.7.5-10.0</version> </dependency> <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_${scala.binary.version}</artifactId> <version>1.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.68</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency> </dependencies> <build> <sourceDirectory>src/main/java</sourceDirectory> <testSourceDirectory>src/test/java</testSourceDirectory> <plugins> <!-- 编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> <!--<encoding>${project.build.sourceEncoding}</encoding>--> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.18.1</version> <configuration> <useFile>false</useFile> <disableXmlReport>true</disableXmlReport> <includes> <include>**/*Test.*</include> <include>**/*Suite.*</include> </includes> </configuration> </plugin> <!-- 打jar包插件(会包含所有依赖) --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <!-- zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF --> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <!-- 可以设置jar包的入口类(可选) --> <!-- <mainClass>com.itcast.flink.batch.FlinkBatchWordCount</mainClass> --> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
附录二、Flink on YARN
Flink支持增量迭代,具有对迭代自行优化的功能,因此在on yarn上提交的任务性能略好于 Spark,Flink提供2种方式在YARN上提交任务:启动1个一直运行的 Yarn session(分离模式)和在 Yarn 上运行1个 Flink 任务(客户端模式)。
Session 会话模式:
通过命令yarn-session.sh的启动方式本质上是在yarn集群上启动一个flink集群
由yarn预先给flink集群分配若干个container,在yarn的界面上只能看到一个Flink session with X TaskManagers的任务,并且只有一个Flink界面,可以从Yarn的Application Master链接进入;
Job 分离模式:
通过命令bin/flink run -m yarn-cluster启动,每次发布1个任务,本质上给每个Flink任务启动了1个集群,yarn在任务发布时启动JobManager(对应Yarn的AM)和TaskManager;
如果一个任务指定了n个TaksManager(-yn n),则会启动n+1个Container,其中一个是JobManager,发布m个应用,则有m个Flink界面,不同的任务不可能在一个Container(JVM)中,实现了资源隔离。
1、Session 会话模式
通过命令yarn-session.sh先启动集群,然后再提交作业,接着会向yarn申请一块空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到yarn中的其中一个作业执行完成后,释放了资源,下个作业才会正常提交。所有作业共享Dispatcher和ResourceManager;共享资源;适合规模小执行时间短的作业。
2、Job 分离模式
通过命令bin/flink run -m yarn-cluster提交任务,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行,适合规模大长时间运行的作业;
更多推荐
所有评论(0)