问题:今天使用Kafka做一个小DEMO,但运行的时候,可以在IDEA控制台上看到生产者发送的消息,无法在IDEA看到消费者在消费消息,但通过连接Linux在命令行可以看到消费者消费的消息。

生产者应该是没有问题的,给出消费者的代码:

/**
 * Kafka消费者
 */
public class KafkaConsumer extends Thread{
    private String topic;
    public KafkaConsumer(String topic) {
        this.topic = topic;
    }
    private ConsumerConnector createConnector(){
        Properties properties = new Properties();
        properties.put("zookeeper.connect", KafkaProperties.ZK);
        properties.put("group.id",KafkaProperties.GROUP_ID);
        properties.put("rebalance.max.retries", "10");
        properties.put("rebalance.backoff.ms", "2000");
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }
    @Override
    public void run() {
        ConsumerConnector consumer = createConnector();
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, 1);
        //一直就卡在这里,没有办法获取接受到的消息流
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStream =  consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = messageStream.get(topic).get(0);   
        ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
        while (iterator.hasNext()) {
            String message = new String(iterator.next().message());
            System.out.println("rec: " + message);
        }
    }
}

通过源码跟踪:
在这里插入图片描述
在这里插入图片描述
进入上面这个方法里面的最后一行在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

最后的错误定位在上面这个图里面,大致意思是:“不知道注册broker版本号,仅仅支持1和2”
解决办法:通过上面的定位已经指定是哪里错误了,是kafka注册进zookeeper中的这个路径 /broker/ids/0 里面的版本号错误,我登陆zk的客户端,将里面的version更换成1在重试,发现就成功了。
疑问:zk下面的这个目录是第一次启动kafka时候创建的,不知道为什么,我第一次启动时候的version竟然是4,kafka我用的版本是kafka_2.11_0.11.0.0,是我配置还是哪里的问题呀?

在这里插入图片描述

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐