一、Kafka的单机安装

1、解压
在这里插入图片描述
2、改名字

mv kafka_2.11-2.0.0.tgz   kafka211

3、进入server.properties,修改配置文件

[root@hadoop2 kafka211]# vi ./config/producer.properties

修改以下四个地方:
在这里插入图片描述
在这里插入图片描述
先在 kafka211的路径下创建kafka211-logs文件夹,日志将写在该文件中
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
4、添加环境变量,完成后source /etc/profile
在这里插入图片描述

二、kafka基本操作

1、启动zookeeper

首先启动zk:在zkpr/bin目录下: zkServer.sh start
查看zk状态:在zkpr/bin目录下: zkServer.sh status

2、启动kafka

[root@hadoop2 bin]# kafka-server-start.sh /root/software/kafka211/config/server.properties
----或者后台启动:(推荐)
[root@hadoop2 bin]# kafka-server-start.sh -daemon /root/software/kafka211/config/servet
.properties

3、查看kafka队列

[root@hadoop2 config]# kafka-topics.sh --zookeeper 192.168.21.2:2181 --list

4、创建一个topic(队列):
(类似一个打饭窗口)cd

[root@hadoop2 config]#kafka-topics.sh --zookeeper 192.168.21.2:2181 --create --topic mydemo --partitions 1 --replication-factor 1
(可以创建多个分区,但是副本必须和节点数一样,副本数创建的多于kafka集群的节点会报错)
[root@hadoop2 config]# kafka-topics.sh --zookeeper 192.168.21.2:2181 --create --topic mydemo2 --partitions 3 --replication-factor 1
 Created topic "mydemo2".

5、删除topic:

[root@hadoop2 kafka-logs]# kafka-topics.sh --zookeeper 192.168.21.2:2181 --delete --topic mydemo2

6、查看topic详情:

kafka-topics.sh --zookeeper 192.168.21.2:2181 --describe --topic mydemo

7、从控制台生产消息:(生产消息)

[root@hadoop2 config]# kafka-console-producer.sh --topic mydemo --broker-list 192.168.21.2:9092
>hello

8、从队列获取消息:(消费消息)

kafka-console-consumer.sh --topic mydemo --bootstrap-server 192.168.21.2:9092 --from-beginning
hello

9、查看消息队列里有多少条消息

[root@hadoop2 config]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.21.2:9092 --topic mydemo -time -1 offsets 1

三、kafka — Java API

public class MyProducer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.21.2:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);

        /*
        *  ACKS = 0 在producer发送数据后,不会等待broker任何响应,无法确保数据正确发送到broker中
        *  ACKS = 1 只需要得到分区副本中leader确认就可以
        *  ACKS = -1 producer要等到所有的副本全部确认,响应时间最长,数据最安全,不会丢失数据
        * */
        properties.put(ProducerConfig.ACKS_CONFIG,"0");
        KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
        Scanner scanner = new Scanner(System.in);
        boolean content = true;

        while (content){
            System.out.println("请输入内容:");
            String txt = scanner.next();

            ProducerRecord<String, String> record = new ProducerRecord<>("mydemo", txt);
            producer.send(record);
            System.out.println("是否退出:(true:退出,false:继续发送)");
            content = scanner.nextBoolean();
        }

    }
}
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐