步骤一:添加依赖项在 pom.xml 中添加以下依赖项:
<dependency>    <groupid>org.springframework.kafka</groupid>    <artifactid>spring-kafka</artifactid>    <version>2.8.0</version></dependency>
步骤二:配置 kafka在 application.yml 文件中添加以下配置:
sping:  kafka:    bootstrap-servers: localhost:9092    consumer:      group-id: my-group      auto-offset-reset: earliest    producer:      value-serializer: org.apache.kafka.common.serialization.stringserializer      key-serializer: org.apache.kafka.common.serialization.stringserializer
这里我们配置了 kafka 的服务地址为 localhost:9092,配置了一个消费者组 id 为 my-group,并设置了一个最早的偏移量来读取消息。在生产者方面,我们配置了消息序列化程序为 stringserializer。
步骤三:创建一个生产者我们现在要创建一个 kafka 生产者,以便向 kafka 服务器发送消息。我们将在此处创建一个 restful api 端点,以接收 post 请求并将消息发送到 kafka。
首先,我们将创建一个 kafkaproducerconfig 类,用于配置 kafka 生产者:
@configurationpublic class kafkaproducerconfig {    @value("${spring.kafka.bootstrap-servers}")    private string bootstrapservers;    @bean    public map<string, object> producerconfigs() {        map<string, object> props = new hashmap<>();        props.put(producerconfig.bootstrap_servers_config, bootstrapservers);        props.put(producerconfig.key_serializer_class_config, stringserializer.class);        props.put(producerconfig.value_serializer_class_config, stringserializer.class);        return props;    }    @bean    public producerfactory<string, string> producerfactory() {        return new defaultkafkaproducerfactory<>(producerconfigs());    }    @bean    public kafkatemplate<string, string> kafkatemplate() {        return new kafkatemplate<>(producerfactory());    }}
在上面的代码中,我们使用 @configuration 注解将 kafkaproducerconfig 类声明为配置类。然后,我们使用 @value 注解注入配置文件中的 bootstrap-servers 属性。
接下来,我们创建了一个 producerconfigs 方法,用于设置 kafka 生产者的配置。在这里,我们设置了 bootstrap_servers_config、key_serializer_class_config 和 value_serializer_class_config 三个属性。
然后,我们创建了一个 producerfactory 方法,用于创建 kafka 生产者工厂。在这里,我们使用了 defaultkafkaproducerfactory 类,并传递了我们的配置。
最后,我们创建了一个 kafkatemplate 方法,用于创建 kafkatemplate 实例。在这里,我们使用了刚刚创建的生产者工厂作为参数,然后返回 kafkatemplate 实例。
接下来,我们将创建一个 restful 端点,用于接收 post 请求并将消息发送到 kafka。在这里,我们将使用 @restcontroller 注解创建一个 restful 控制器:
@restcontrollerpublic class kafkacontroller {    @autowired    private kafkatemplate<string, string> kafkatemplate;    @postmapping("/send")    public void sendmessage(@requestbody string message) {        kafkatemplate.send("my-topic", message);    }}
在上面的代码中,我们使用 @autowired 注解将 kafkatemplate 实例注入到 kafkacontroller 类中。然后,我们创建了一个 sendmessage 方法,用于发送消息到 kafka。
在这里,我们使用 kafkatemplate.send 方法发送消息到 my-topic 主题。send 方法返回一个 listenablefuture 对象,用于异步处理结果。
步骤四:创建一个消费者现在,我们将创建一个 kafka 消费者,用于从 kafka 服务器接收消息。在这里,我们将创建一个消费者组,并将其配置为从 my-topic 主题读取消息。
首先,我们将创建一个 kafkaconsumerconfig 类,用于配置 kafka 消费者:
@configuration@enablekafkapublic class kafkaconsumerconfig {    @value("${spring.kafka.bootstrap-servers}")    private string bootstrapservers;    @value("${spring.kafka.consumer.group-id}")    private string groupid;    @bean    public map<string, object> consumerconfigs() {        map<string, object> props = new hashmap<>();        props.put(consumerconfig.bootstrap_servers_config, bootstrapservers);        props.put(consumerconfig.group_id_config, groupid);        props.put(consumerconfig.auto_offset_reset_config, "earliest");        props.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class);        props.put(consumerconfig.value_deserializer_class_config, stringdeserializer.class);        return props;    }    @bean    public consumerfactory<string, string> consumerfactory() {        return new defaultkafkaconsumerfactory<>(consumerconfigs());    }    @bean    public concurrentkafkalistenercontainerfactory<string, string> kafkalistenercontainerfactory() {        concurrentkafkalistenercontainerfactory<string, string> factory = new concurrentkafkalistenercontainerfactory<>();        factory.setconsumerfactory(consumerfactory());        return factory;    }}
在上面的代码中,我们使用 @configuration 注解将 kafkaconsumerconfig 类声明为配置类,并使用 @enablekafka 注解启用 kafka。
然后,我们使用 @value 注解注入配置文件中的 bootstrap-servers 和 consumer.group-id 属性。
接下来,我们创建了一个 consumerconfigs 方法,用于设置 kafka 消费者的配置。在这里,我们设置了 bootstrap_servers_config、group_id_config、auto_offset_reset_config、key_deserializer_class_config 和 value_deserializer_class_config 五个属性。
然后,我们创建了一个 consumerfactory 方法,用于创建 kafka 消费者工厂。在这里,我们使用了 defaultkafkaconsumerfactory 类,并传递了我们的配置。
最后,我们创建了一个 kafkalistenercontainerfactory 方法,用于创建一个 concurrentkafkalistenercontainerfactory 实例。在这里,我们将消费者工厂注入到 kafkalistenercontainerfactory 实例中。
接下来,我们将创建一个 kafka 消费者类 kafkaconsumer,用于监听 my-topic 主题并接收消息:
@servicepublic class kafkaconsumer {    @kafkalistener(topics = "my-topic", groupid = "my-group-id")    public void consume(string message) {        system.out.println("received message: " + message);    }}
在上面的代码中,我们使用 @kafkalistener 注解声明了一个消费者方法,用于接收从 my-topic 主题中读取的消息。在这里,我们将消费者组 id 设置为 my-group-id。
现在,我们已经完成了 kafka 生产者和消费者的设置。我们可以使用 mvn spring-boot:run 命令启动应用程序,并使用 curl 命令发送 post 请求到 http://localhost:8080/send 端点,以将消息发送到 kafka。然后,我们可以在控制台上查看消费者接收到的消息。这就是使用 spring boot 和 kafka 的基本设置。我们可以根据需要进行更改和扩展,以满足特定的需求。
以上就是spring boot怎么整合kafka的详细内容。
   
 
   