kafka入门 安装使用 flume连接kafka以及kafka API
kafka入门为什么使用消息中间件(MQ)消息中间件中的术语Apache Kafkakafka安装flume连接导入单分区数据量大是使用多分区提高效率kafka Producer/Consumer API为什么使用消息中间件(MQ)异步调用同步变异步应用解耦提供基于数据的接口层流量削峰缓解瞬时高流量压力消息中间件中的术语Broker:消息服务器,提供核心服务Producer:消息生产者Consum
·
kafka入门
为什么使用消息中间件(MQ)
-
异步调用
- 同步变异步
-
应用解耦
- 提供基于数据的接口层
-
流量削峰
- 缓解瞬时高流量压力
消息中间件中的术语
- Broker:消息服务器,提供核心服务
- Producer:消息生产者
- Consumer:消息消费者
- Topic:主题,发布订阅模式下的消息统一汇集地
- Queue:队列,P2P模式下的消息队列
kafka消息带有时间戳,一般消息数据存放七天
- 常见的消息中间件
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
- Redis
……
Apache Kafka
- Kafka是一种高吞吐量的分布式发布-订阅消息系统,专为超高吞吐量的实时日志采集、实时数据同步、实时数据计算等场景来设计
- 快速,单Broker每秒几百MB读取
- 不停机扩展集群
- 消息副本冗余
- 实时数据管道
- 使用Scala编写
kafka安装
- 首先需要安装zookeeper
[root@jzy1 opt]# tar zxf kafka_2.11-2.0.0.tgz
[root@jzy1 opt]# mv kafka_2.11-2.0.0 soft/kafka211
[root@jzy1 opt]# cd soft/kafka211/config/
[root@jzy1 config]# vi server.properties
broker.id=0 不重复
log.dirs=/opt/soft/kafka211/datas
zookeeper.connect=jzy1:2181 修改localhost为ip地址,zookeeper地址,多个要用隔开
listeners=PLAINTEXT://jzy1:9092 解开注释加入ip地址
配置环境变量
#Kafka
[root@jzy1 bin]# vi /etc/profile
export KAFKA_HOME=/opt/soft/kafka211
export PATH=$PATH:$KAFKA_HOME/bin
# 启动
kafka-server-start.sh /opt/soft/kafka211/config/server.properties
- 检查测试
//创建一个demo主题 有一个分区和一个副本的队列
kafka-topics.sh --create --zookeeper jzy1:2181 --replication-factor 1 --partitions 1 --topic demo
//检查队列是否创建成功
kafka-topics.sh --zookeeper jzy1:2181 --list
//向你的消息队列中生产消息
kafka-console-producer.sh --broker-list jzy1:9092 --topic demo
//消费消息
kafka-console-consumer.sh --bootstrap-server jzy1:9092 --from-beginning --topic demo
//--from-beginning 表示从头开始看
- kafka彻底删除topic
# 配置server.properties
delete.topic.enable=true
#删除指令
kafka-topics.sh --delete --zookeeper jzy:2181 --topic demo
设置永久保存某主题
kafka-topics.sh --zookeeper jzy:2181 --alter --topic 主题名 --config retention.ms=-1
flume连接导入
单分区
- flume配置文件如下:
a5.channels=c5
a5.sources=s5
a5.sinks=k5
a5.sources.s5.type=spooldir
a5.sources.s5.spoolDir=/opt/datas
a5.sources.s5.interceptors=head_filter
a5.sources.s5.interceptors.head_filter.type=regex_filter
a5.sources.s5.interceptors.head_filter.regex=^event_id.*
a5.sources.s5.interceptors.head_filter.excludeEvents=true
a5.sinks.k5.type=org.apache.flume.sink.kafka.KafkaSink
a5.sinks.k5.kafka.bootstrap.servers=jzy1:9092
a5.sinks.k5.kafka.topic=msg_event
a5.channels.c5.type=memory
a5.channels.c5.capacity=10000
a5.channels.c5.transactionCapacity=10000
a5.sinks.k5.channel=c5
a5.sources.s5.channels=c5
- 按如下步骤执行
#建主题 一个副本 一个分区
kafka-topics.sh --create --zookeeper jzy1:2181 --topic msgEvent --replication-factor 1 --partitions 1
#建消费者
kafka-console-consumer.sh --bootstrap-server jzy1:9092 --from-beginning --topic msgEvent
运行flume
flume-ng agent -n a5 -c conf -f /opt/flumeconf/conf_0806_kafka.properties
#按分区查看数据量
kafka-run-class.sh kafka.tools.GetOffsetShell --topic msgEvent --time -1 --broker-list jzy1:9092 --partitions 0
数据量大时使用多分区提高效率
# 创建"mypart"主题 副本数1 设置三个分区
[root@jzy1 ~]# kafka-topics.sh --create --zookeeper 192.168.56.21:2181 --topic mypart --replication-factor 1 --partitions 3
Created topic "mypart".
# 消费
[root@jzy1 ~]# kafka-console-consumer.sh --broker-list jzy1:9092 --topic mypart --from-beginning
# 运行flume配置文件
flume-ng agent -n a5 -c conf -f /opt/flumeconf/conf_0807_kafka.properties
flume配置文件如下
a5.channels=c5
a5.sources=s5
a5.sinks=k5
a5.sources.s5.type=spooldir
a5.sources.s5.spoolDir=/opt/datas
a5.sinks.k5.type=org.apache.flume.sink.kafka.KafkaSink
a5.sinks.k5.kafka.bootstrap.servers=jzy1:9092
a5.sinks.k5.kafka.topic=mypart
a5.channels.c5.type=memory
a5.channels.c5.capacity=10000
a5.channels.c5.transactionCapacity=10000
a5.sinks.k5.channel=c5
a5.sources.s5.channels=c5
- 运行后可以使用如下命令查看各个分区产生的数据量
[root@jzy1 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --topic mypart -time -1 --broker-list jzy1:9092 --partitions 0
mypart:0:6997
[root@jzy1 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --topic mypart -time -1 --broker-list jzy1:9092 --partitions 1
mypart:1:6998
[root@jzy1 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --topic mypart -time -1 --broker-list jzy1:9092 --partitions 2
mypart:2:6998
kafka Producer/Consumer API
-
依赖jar包 Apache Kafka
-
kafka producer API
# kafka创建"mytest"主题
kafka-topics.sh --create --zookeeper jzy1:2181 --topic mytest --replication-factor 1 --partitions 1
# kafka producer API
public static void main(String[] args) {
Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"jzy1:9092");
prop.put(ProducerConfig.ACKS_CONFIG,"all");
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer pro = new KafkaProducer(prop);
for (int i = 0; i < 10; i++) {
pro.send(new ProducerRecord<String,String>("mytest","test"+i,i+""));
}
pro.close();
}
//按照分区查看数据
[root@jzy1 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --topic mytest --time -1 --broker-list jzy1:9092 --partitions 0
mytest:0:10
- kafka consumer API
#kafka consumer API
#重置游标 这里消费的是mytest中的消息 需要对test1重置游标
[root@jzy1 ~]# kafka-consumer-groups.sh --bootstrap-server jzy1:9092 --group test1 --reset-offsets --all-topics --to-earliest --execute
public static void main(String[] args) {
Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"jzy1:9092");
prop.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> cons = new KafkaConsumer<String, String>(prop);
cons.subscribe(Arrays.asList("mytest"));
while(true){
ConsumerRecords<String,String> records=cons.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.partition()+":"+record.offset()+":"+record.key()+":"+record.value());
}
}
sumerRecords<String,String> records=cons.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.partition()+":"+record.offset()+":"+record.key()+":"+record.value());
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)