Kafka的坑: 消费者无法消费消息
问题:今天使用Kafka做一个小DEMO,但运行的时候,可以在控制台上看到生产者发送的消息,无法看到消费者在消费消息,但通过命令行可以看到消费者消费的消息。生产者应该是没有问题的,给出消费者的代码:/*** Kafka消费者*/public class KafkaConsumer extends Thread{private String topic;public...
·
问题:今天使用Kafka做一个小DEMO,但运行的时候,可以在IDEA控制台上看到生产者发送的消息,无法在IDEA看到消费者在消费消息,但通过连接Linux在命令行可以看到消费者消费的消息。
生产者应该是没有问题的,给出消费者的代码:
/**
* Kafka消费者
*/
public class KafkaConsumer extends Thread{
private String topic;
public KafkaConsumer(String topic) {
this.topic = topic;
}
private ConsumerConnector createConnector(){
Properties properties = new Properties();
properties.put("zookeeper.connect", KafkaProperties.ZK);
properties.put("group.id",KafkaProperties.GROUP_ID);
properties.put("rebalance.max.retries", "10");
properties.put("rebalance.backoff.ms", "2000");
return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}
@Override
public void run() {
ConsumerConnector consumer = createConnector();
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
//一直就卡在这里,没有办法获取接受到的消息流
Map<String, List<KafkaStream<byte[], byte[]>>> messageStream = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = messageStream.get(topic).get(0);
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
while (iterator.hasNext()) {
String message = new String(iterator.next().message());
System.out.println("rec: " + message);
}
}
}
通过源码跟踪:
进入上面这个方法里面的最后一行
最后的错误定位在上面这个图里面,大致意思是:“不知道注册broker版本号,仅仅支持1和2”
解决办法:通过上面的定位已经指定是哪里错误了,是kafka注册进zookeeper中的这个路径 /broker/ids/0 里面的版本号错误,我登陆zk的客户端,将里面的version更换成1在重试,发现就成功了。
疑问:zk下面的这个目录是第一次启动kafka时候创建的,不知道为什么,我第一次启动时候的version竟然是4,kafka我用的版本是kafka_2.11_0.11.0.0,是我配置还是哪里的问题呀?
更多推荐
所有评论(0)