为什么使用消息中间件(MQ)

  • 异步调用

    • 同步变异步
  • 应用解耦

    • 提供基于数据的接口层
  • 流量削峰

    • 缓解瞬时高流量压力

消息中间件中的术语

  • Broker:消息服务器,提供核心服务
  • Producer:消息生产者
  • Consumer:消息消费者
  • Topic:主题,发布订阅模式下的消息统一汇集地
  • Queue:队列,P2P模式下的消息队列

kafka消息带有时间戳,一般消息数据存放七天

  • 常见的消息中间件
    • ActiveMQ
    • RabbitMQ
    • RocketMQ
    • Kafka
    • Redis
      ……

Apache Kafka

  • Kafka是一种高吞吐量的分布式发布-订阅消息系统,专为超高吞吐量的实时日志采集、实时数据同步、实时数据计算等场景来设计
    • 快速,单Broker每秒几百MB读取
    • 不停机扩展集群
    • 消息副本冗余
    • 实时数据管道
  • 使用Scala编写
    在这里插入图片描述

kafka安装

  • 首先需要安装zookeeper
[root@jzy1 opt]# tar zxf kafka_2.11-2.0.0.tgz 
[root@jzy1 opt]# mv kafka_2.11-2.0.0 soft/kafka211
    
[root@jzy1 opt]# cd soft/kafka211/config/
[root@jzy1 config]# vi server.properties
    broker.id=0 不重复
    log.dirs=/opt/soft/kafka211/datas
    zookeeper.connect=jzy1:2181   修改localhost为ip地址,zookeeper地址,多个要用隔开
	listeners=PLAINTEXT://jzy1:9092 解开注释加入ip地址
		

配置环境变量
#Kafka
[root@jzy1 bin]# vi /etc/profile
export KAFKA_HOME=/opt/soft/kafka211
export PATH=$PATH:$KAFKA_HOME/bin


    
# 启动
kafka-server-start.sh /opt/soft/kafka211/config/server.properties 
  • 检查测试
//创建一个demo主题 有一个分区和一个副本的队列
kafka-topics.sh --create --zookeeper jzy1:2181 --replication-factor 1 --partitions 1 --topic demo
//检查队列是否创建成功
kafka-topics.sh --zookeeper jzy1:2181 --list
//向你的消息队列中生产消息
kafka-console-producer.sh --broker-list jzy1:9092 --topic demo
//消费消息
kafka-console-consumer.sh --bootstrap-server jzy1:9092 --from-beginning --topic demo
    //--from-beginning 表示从头开始看
  • kafka彻底删除topic
# 配置server.properties
delete.topic.enable=true
    
#删除指令
kafka-topics.sh --delete --zookeeper jzy:2181 --topic demo

设置永久保存某主题

kafka-topics.sh --zookeeper jzy:2181 --alter --topic 主题名 --config retention.ms=-1

flume连接导入

单分区

  • flume配置文件如下:
a5.channels=c5
a5.sources=s5
a5.sinks=k5 

a5.sources.s5.type=spooldir
a5.sources.s5.spoolDir=/opt/datas
a5.sources.s5.interceptors=head_filter
a5.sources.s5.interceptors.head_filter.type=regex_filter
a5.sources.s5.interceptors.head_filter.regex=^event_id.*
a5.sources.s5.interceptors.head_filter.excludeEvents=true

a5.sinks.k5.type=org.apache.flume.sink.kafka.KafkaSink
a5.sinks.k5.kafka.bootstrap.servers=jzy1:9092
a5.sinks.k5.kafka.topic=msg_event

a5.channels.c5.type=memory
a5.channels.c5.capacity=10000
a5.channels.c5.transactionCapacity=10000

a5.sinks.k5.channel=c5
a5.sources.s5.channels=c5
  • 按如下步骤执行
#建主题 一个副本 一个分区
kafka-topics.sh --create --zookeeper jzy1:2181 --topic msgEvent --replication-factor 1 --partitions 1

#建消费者
kafka-console-consumer.sh --bootstrap-server jzy1:9092 --from-beginning --topic msgEvent

运行flume
flume-ng agent -n a5 -c conf -f /opt/flumeconf/conf_0806_kafka.properties



#按分区查看数据量
kafka-run-class.sh kafka.tools.GetOffsetShell --topic msgEvent --time -1 --broker-list jzy1:9092 --partitions 0

数据量大时使用多分区提高效率

# 创建"mypart"主题 副本数1 设置三个分区 
[root@jzy1 ~]# kafka-topics.sh --create --zookeeper 192.168.56.21:2181 --topic mypart --replication-factor 1 --partitions 3
Created topic "mypart".

# 消费
[root@jzy1 ~]# kafka-console-consumer.sh --broker-list jzy1:9092 --topic mypart --from-beginning
    
# 运行flume配置文件    
flume-ng agent -n a5 -c conf -f /opt/flumeconf/conf_0807_kafka.properties 

flume配置文件如下

a5.channels=c5
a5.sources=s5
a5.sinks=k5

a5.sources.s5.type=spooldir
a5.sources.s5.spoolDir=/opt/datas


a5.sinks.k5.type=org.apache.flume.sink.kafka.KafkaSink
a5.sinks.k5.kafka.bootstrap.servers=jzy1:9092
a5.sinks.k5.kafka.topic=mypart

a5.channels.c5.type=memory
a5.channels.c5.capacity=10000
a5.channels.c5.transactionCapacity=10000

a5.sinks.k5.channel=c5
a5.sources.s5.channels=c5
  • 运行后可以使用如下命令查看各个分区产生的数据量
    
[root@jzy1 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --topic mypart -time -1 --broker-list jzy1:9092 --partitions 0
mypart:0:6997
[root@jzy1 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --topic mypart -time -1 --broker-list jzy1:9092 --partitions 1
mypart:1:6998
[root@jzy1 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --topic mypart -time -1 --broker-list jzy1:9092 --partitions 2
mypart:2:6998


kafka Producer/Consumer API

  • 依赖jar包 Apache Kafka
    在这里插入图片描述

  • kafka producer API

# kafka创建"mytest"主题
kafka-topics.sh --create --zookeeper jzy1:2181 --topic mytest --replication-factor 1 --partitions 1
# kafka producer API
public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"jzy1:9092");
        prop.put(ProducerConfig.ACKS_CONFIG,"all");
					prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        Producer pro = new KafkaProducer(prop);
        for (int i = 0; i < 10; i++) {
            pro.send(new ProducerRecord<String,String>("mytest","test"+i,i+""));
        }
        pro.close();
    }



//按照分区查看数据
[root@jzy1 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --topic mytest --time -1 --broker-list jzy1:9092 --partitions 0
mytest:0:10


  • kafka consumer API
#kafka consumer API
     
#重置游标  这里消费的是mytest中的消息 需要对test1重置游标
[root@jzy1 ~]# kafka-consumer-groups.sh --bootstrap-server jzy1:9092 --group test1 --reset-offsets --all-topics --to-earliest --execute

    
public static void main(String[] args) {   
Properties prop = new Properties();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"jzy1:9092");
        prop.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String,String> cons = new KafkaConsumer<String, String>(prop);
        cons.subscribe(Arrays.asList("mytest"));
        while(true){
            ConsumerRecords<String,String> records=cons.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.partition()+":"+record.offset()+":"+record.key()+":"+record.value());
            }
 }
sumerRecords<String,String> records=cons.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.partition()+":"+record.offset()+":"+record.key()+":"+record.value());
            }
 }
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐