今天在使用spring cloud整合消息队列时,遇到个问题。本机虚拟机上部署了一套kafka集群  zookeeper集群也部署在同台虚拟机上

在虚拟机内使用kafka-console-producer.sh、kafka-console-consumer.sh 创建广播方、消费方能正常的收发消息。

但是在本机物理机上使用sping cloud 时就始终无法发送消息到kafka上去

pom文件

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

yml配置文件

server:
  port: 8050
spring:
  application:
    name: cloud-kafka-provider
  cloud:
    stream:
      default:
        binder: kafka
        producer:
          useNativeEncoding: true
      default-binder: kafka
      kafka:
        binder:
          brokers: 192.168.126.129:9090,192.168.126.129:9091,192.168.126.129:9092
          zk-nodes: 192.168.126.129:2181,192.168.126.129:2182,192.168.126.129:2183

消息发送代码:

@EnableBinding
public class DefaultProviderService {

    @Autowired
    private BinderAwareChannelResolver resolver;

    public void sendMsg(String msg, String topic) {
        resolver.resolveDestination(topic).send(new GenericMessage<>(msg.getBytes()));
    }
}
 

 

错误信息

Exception thrown when sending a message with key='null' and payload=***** to topic

 

之前一直未能发现问题的所作,后来在查看其他网友博客的时候看到他降低日志级别解决了他的问题。我也将项目日志级别降低到debug模式

发现原来一直在报错  一直提示无法连接到kafka 

java kafka Connection refused: no further information

 

通过这个日志信息 最终找到问题所在点。 原因是我的kafka集群  在server.properties配置文件中的配置有点问题。因为我配置了每个节点kafka的port信息  并没有为节点配置host,最终修改配置如下

port:9090
host.name=192.168.126.129
 

重启服务,消息正常发送接收。

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐