Kafka(单机安装配置、常用操作)
Kafka的单机安装配置、常用操作一、Kafka的单机安装二、kafka基本操作三、kafka — Java API一、Kafka的单机安装1、解压2、改名字mv kafka_2.11-2.0.0.tgzkafka2113、进入producer.properties,修改配置文件[root@hadoop2 kafka211]# vi ./config/producer.properties修改以下
·
Kafka的单机安装配置、常用操作
一、Kafka的单机安装
1、解压
2、改名字
mv kafka_2.11-2.0.0.tgz kafka211
3、进入server.properties,修改配置文件
[root@hadoop2 kafka211]# vi ./config/producer.properties
修改以下四个地方:

先在 kafka211的路径下创建kafka211-logs文件夹,日志将写在该文件中


4、添加环境变量,完成后source /etc/profile
二、kafka基本操作
1、启动zookeeper
首先启动zk:在zkpr/bin目录下: zkServer.sh start
查看zk状态:在zkpr/bin目录下: zkServer.sh status
2、启动kafka
[root@hadoop2 bin]# kafka-server-start.sh /root/software/kafka211/config/server.properties
----或者后台启动:(推荐)
[root@hadoop2 bin]# kafka-server-start.sh -daemon /root/software/kafka211/config/servet
.properties
3、查看kafka队列
[root@hadoop2 config]# kafka-topics.sh --zookeeper 192.168.21.2:2181 --list
4、创建一个topic(队列):
(类似一个打饭窗口)cd
[root@hadoop2 config]#kafka-topics.sh --zookeeper 192.168.21.2:2181 --create --topic mydemo --partitions 1 --replication-factor 1
(可以创建多个分区,但是副本必须和节点数一样,副本数创建的多于kafka集群的节点会报错)
[root@hadoop2 config]# kafka-topics.sh --zookeeper 192.168.21.2:2181 --create --topic mydemo2 --partitions 3 --replication-factor 1
Created topic "mydemo2".
5、删除topic:
[root@hadoop2 kafka-logs]# kafka-topics.sh --zookeeper 192.168.21.2:2181 --delete --topic mydemo2
6、查看topic详情:
kafka-topics.sh --zookeeper 192.168.21.2:2181 --describe --topic mydemo
7、从控制台生产消息:(生产消息)
[root@hadoop2 config]# kafka-console-producer.sh --topic mydemo --broker-list 192.168.21.2:9092
>hello
8、从队列获取消息:(消费消息)
kafka-console-consumer.sh --topic mydemo --bootstrap-server 192.168.21.2:9092 --from-beginning
hello
9、查看消息队列里有多少条消息
[root@hadoop2 config]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.21.2:9092 --topic mydemo -time -1 offsets 1
三、kafka — Java API
public class MyProducer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.21.2:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
/*
* ACKS = 0 在producer发送数据后,不会等待broker任何响应,无法确保数据正确发送到broker中
* ACKS = 1 只需要得到分区副本中leader确认就可以
* ACKS = -1 producer要等到所有的副本全部确认,响应时间最长,数据最安全,不会丢失数据
* */
properties.put(ProducerConfig.ACKS_CONFIG,"0");
KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
Scanner scanner = new Scanner(System.in);
boolean content = true;
while (content){
System.out.println("请输入内容:");
String txt = scanner.next();
ProducerRecord<String, String> record = new ProducerRecord<>("mydemo", txt);
producer.send(record);
System.out.println("是否退出:(true:退出,false:继续发送)");
content = scanner.nextBoolean();
}
}
}
更多推荐



所有评论(0)