Kafka系列之SpringBoot集成Kafka

本文介绍如何在springboot项目中集成kafka收发message。

pom依赖

springboot相关的依赖我们就不提了,和kafka相关的只依赖一个spring-kafka集成包

	<dependency>
		<groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

Kafka相关的yaml配置

spring:
  kafka:
    bootstrap-servers: 30.46.35.29:9092
    producer:
      retries: 3
      acks: -1
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      compression-type: lz4
      properties:
        linger.ms: 1
        'interceptor.classes': com.tencent.qidian.ma.commontools.trace.kafka.TracingProducerInterceptor
    consumer:
      heartbeat-interval: 3000
      max-poll-records: 100
      enable-auto-commit: false
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        session.timeout.ms: 30000
    listener:
      concurrency: 3
      type: batch
      ack-mode: manual_immediate

生产者配置

1)通过@Configuration、@EnableKafka,声明Config并且打开KafkaTemplate能力。

2)生成bean,@Bean

常见配置参考:

package com.somnus.config.kafka;

@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Resource
    private KafkaProperties kafkaProperties;

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(ProducerConfig.RETRIES_CONFIG, kafkaProperties.getProducer().getRetries());
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaProperties.getProducer().getBatchSize());
        props.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProperties.getProducer().getProperties().get("linger.ms"));
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaProperties.getProducer().getBufferMemory());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaProperties.getProducer().getKeySerializer());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaProperties.getProducer().getValueSerializer());
        return props;
    }

    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

消费端配置

1)通过@Configuration、@EnableKafka,声明Config并且打开KafkaTemplate能力。

2)生成bean,@Bean

常见配置参考:

package com.tencent.qidian.ma.maaction.web.config.kafka;

import jakarta.annotation.Resource;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener.Type;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties.AckMode;

/**
 * KafkaBeanConfiguration
 */
@Configuration
@EnableKafka
public class KafkaBeanConfiguration {

    @Resource
    private ConsumerFactory consumerFactory;

    @Resource
    private KafkaProperties kafkaProperties;

    @Bean(name = "kafkaListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(kafkaProperties.getListener().getAckMode());
        factory.setConcurrency(kafkaProperties.getListener().getConcurrency());
        if (kafkaProperties.getListener().getType().equals(Type.BATCH)) {
            factory.setBatchListener(true);
        }
        return factory;
    }
	
	// 此bean为了后续演示使用,参考消费演示中的containerFactory属性配置
	@Bean(name = "tenThreadsKafkaListenerContainerFactory1")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    tenThreadsKafkaListenerContainerFactory1() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(kafkaProperties.getListener().getAckMode());
        factory.setConcurrency(10);
        if (kafkaProperties.getListener().getType().equals(Type.BATCH)) {
            factory.setBatchListener(true);
        }
        return factory;
    }

	public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }


    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,kafkaProperties.getConsumer().getMaxPollRecords());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaProperties.getConsumer().getEnableAutoCommit());
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaProperties.getConsumer().getProperties().get("session.timeout.ms"));
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getConsumer().getKeyDeserializer());
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getConsumer().getValueDeserializer());
        return propsMap;
    }
  }

SpringBoot 集成 KafkaTemplate 发送Kafka消息


	@Resource
    private ObjectMapper mapper;
    
 	@Resource
    private KafkaTemplate<String, String> kafkaTemplate;
    
	try {
			Order order = new Order();
			String message = mapper.writeValueAsString(order);
            CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("order", message);
            future.thenAccept(result -> {
                if (result.getRecordMetadata() != null) {
                    log.debug("send message:{} with offset:{}", message, result.getRecordMetadata().offset());
                }
            }).exceptionally(exception -> {
                log.error("KafkaProducer send message failure,topic={},data={}", topic, message, exception);
                return null;
            });
        } catch (Exception e) {
            log.error("KafkaProducer send message exception,topic={},message={}", topic, message, e);
        }

SpringBoot 集成 @KafkaListener 消费Kafka消息

max.poll.interval.ms

默认为5分钟
如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,会将其踢出消费组,将分区分配给别的consumer消费,触发rebalance 。
如果你的消费者节点总是在重启完不久就不消费了,可以考虑检查改配置项或者优化你的消费者的消费速度等等。

max.poll.records

max-poll-records是Kafka consumer的一个配置参数,表示consumer一次从Kafka broker中拉取的最大消息数目,默认值为500条。在Kafka中,一个消费者组可以有多个consumer实例,每个consumer实例负责消费一个或多个partition的消息,每个consumer实例一次从broker中可以拉取一个或多个消息。

max-poll-records参数的作用就是控制每次拉取消息的最大数目,以实现消费弱化和控制内存资源的需求。

参考Kafka中的max-poll-records和listener.concurrency配置


 	@Resource
    private ObjectMapper mapper;
    
	@KafkaListener(
            id = "order_consumer",
            topics = "order",
            groupId = "g_order_consumer_group",
            //可配置containerFactory参数,使用指定的containerFactory,不配置默认使用名称是kafkaListenerContainerFactory的bean
            //containerFactory = "kafkaListenerContainerFactory1",
            properties = {"max.poll.interval.ms:300000", "max.poll.records:1"}
    )
    // 可以只有ConsumerRecords<String, String> records参数。ack参数非必需,ack.acknowledge()是为了防消息丢失
    public void consume(ConsumerRecords<String, String> records, Acknowledgment ack) {
        for (ConsumerRecord<String, String> record : records) {
            String msg = record.value();
            log.info("Consume msg:{}", msg);
            try {
                Order order = mapper.readValue(val, Order.class);
                // 处理业务逻辑
            } catch (Exception e) {
                log.error("Consume failed, msg:{}", val, e);
            }
        }
        ack.acknowledge();
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/772748.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

TensorFlow安装CPU版本和GPU版本

文章目录 前言一、TensorFlow安装CPU版本1.新建虚拟环境2.激活虚拟环境3.下载tensorflow4.验证是否下载成功 二、TensorFlow安装GPU版本1.新建虚拟环境2.激活虚拟环境3.安装tensorflow-gpu4.验证是否下载成功 前言 下载的Anaconda是Anaconda3-2024.02-1-Windows-x86_64版本 一…

latex 报错解决①aligned ②begin document

1. 是aligned&#xff0c;不是align&#xff01;&#xff01; 网上写的公式大多是这样的 \begin{equation}\label{eq:2} \begin{align} Q\left( {s,t} \right) a{s^2} 2bst c{t^2} 2ds 2et f \end{align} \end{equation}但是报错&#xff1a; ! Package amsmath Erro…

大语言模型测评工具-ChatHub和ChatAll

背景 现在国内外拥有上百个大语言模型&#xff0c;在AI业务中&#xff0c;我们需要在其中选择一个合适业务模型&#xff0c;就需要对这些模型进行测试。手工去测试这么多模型效率一定不高&#xff0c;今天就介绍两个提高测评模型效率的工具 ChatHub和ChatAll。 介绍 ChatHub…

k8s-第十节-Ingress

Ingress 介绍 Ingress 为外部访问集群提供了一个 统一 入口&#xff0c;避免了对外暴露集群端口&#xff1b;功能类似 Nginx&#xff0c;可以根据域名、路径把请求转发到不同的 Service。可以配置 https 跟 LoadBalancer 有什么区别&#xff1f; LoadBalancer 需要对外暴露…

《侃侃而谈 · 为什么动笔》

&#x1f4e2; 大家好&#xff0c;我是 【战神刘玉栋】&#xff0c;有10多年的研发经验&#xff0c;致力于前后端技术栈的知识沉淀和传播。 &#x1f497; &#x1f33b; CSDN入驻一周&#xff0c;希望大家多多支持&#xff0c;后续会继续提升文章质量&#xff0c;绝不滥竽充数…

Greenplum(一)【MPP 架构 数据类型】

1、Greenplum 入门 Greenplum 是基于 MPP 架构的一款分布式分析型数据库&#xff0c;具备关系型数据库的特点&#xff0c;因为它处理的是结构化的数据&#xff0c;同时具备大数据分布式的特点。 1.1、MPP 架构 MPP&#xff08;Massively Parallel Processing&#xff09;架构是…

同方威视受邀盛装亮相2024长三角快递物流展(杭州)助力行业物畅其流

同方威视技术股份有限公司携安全检测产品和综合解决方案&#xff0c;盛装亮相2024长三角快递物流展&#xff08;杭州&#xff09; 展位号&#xff1a;3C馆A07-1 时间&#xff1a;2024年7月8-10日 地址&#xff1a;杭州国际博览中心&#xff08;浙江省杭州市萧山区奔竞大道35…

实现前端项目自动构建和部署(Gitee Go)

前言 相信所有的前端开发者都希望将自己的代码部署在服务器上让所有人都能访问到&#xff0c;但是却不知道如何进行部署。其实要是实现代码上线非常简单&#xff0c;我们只需要将build之后的代码上传到服务器&#xff0c;然后通过Nginx起一个服务指向我们build后的代码就可以了…

解析MySQL核心技术:视图的实用指南与实践案例

在数据库管理中&#xff0c;MySQL视图&#xff08;View&#xff09;是一种强大的功能&#xff0c;利用它可以简化复杂查询、提高数据安全性以及增强代码的可维护性。本篇文章将详细介绍MySQL视图的相关知识&#xff0c;包括视图的创建、修改、删除、使用场景以及常见的最佳实践…

WAIC热点聚焦|具身智能简介:AI新浪潮的领跑者

WAIC热点聚焦|具身智能简介&#xff1a;AI新浪潮的领跑者 引言 随着"具身智能"&#xff08;Embodied Intelligence&#xff09;的火热讨论&#xff0c;2024年标志着人机交互新时代的开启。在大模型技术的推动下&#xff0c;机器人响应语音指令成为现实&#xff0c;…

如何自动筛选螺丝不良品?

四角螺丝是一种特殊设计的螺丝&#xff0c;其螺纹头部呈四个平行的角状结构&#xff0c;与传统的六角螺丝相比具有独特的外观和功能。这种设计使得四角螺丝在安装和拆卸时更容易使用&#xff0c;并提供了更好的扭矩传递效率。四角螺丝头部呈现四个平行的角&#xff0c;与常见的…

系统级应用锁的实现方法

前言: 应用锁是一种常见的需求&#xff0c; 下面提供一个个人认为还比较完美的解决方法。本篇从两个方面详述应用锁的实现方法。 一. 流程图 二. 实现细节 一.流程图 二. 实现效果及细节

RocketMQ复杂过滤尝试

需求 消息实体&#xff0c;根据实体中的一个字段&#xff0c;决定推给多个业务系统。 例&#xff1a;一个点位信息Bean&#xff0c;这个点位信息&#xff0c;设备、能源、安全都有用&#xff0c;那么点位信息表中有适用模块标识。 点位新增 需要通知所有勾选业务系统 tag -…

摄像机反求跟踪软件/插件 Mocha Pro 2024 v11.0.2 CE Win

AE/PR/OFX/达芬奇/AVX插件 | 摄像机反求跟踪软件Mocha Pro 2024 v11.0.2 CE Win-PR模板网 Mocha Pro 软件(插件)&#xff0c;用于平面运动跟踪、3D跟踪、动态观察、对象移除、图像稳定和PowerMesh有机扭曲跟踪等功能。整合了SynthEyes核心的3D跟踪算法&#xff0c;能够快速自动…

Pluck-CMS-Pluck-4.7.16 远程代码执行漏洞(CVE-2022-26965)

前言 CVE-2022-26965 是一个影响 Pluck CMS 4.7.16 版本的远程代码执行&#xff08;RCE&#xff09;漏洞。该漏洞允许经过身份验证的用户通过 /admin.php?actionthemeinstall 的主题上传功能执行任意代码。 漏洞细节 在 Pluck CMS 的管理界面中&#xff0c;管理员可以上传主…

【数据结构】(C语言):堆(二叉树的应用)

堆&#xff1a; 此处堆为二叉树的应用&#xff0c;不是计算机中用于管理动态内存的堆。形状是完全二叉树。堆分两种&#xff1a;最大堆&#xff0c;最小堆。最大堆&#xff1a;每个节点比子树所有节点的数值都大&#xff0c;根节点为最大值。最小堆&#xff1a;每个节点比子树…

千万不要用国产BI,不然你会发现它性价比奇高——以奥威BI软件为例

在信息技术日新月异的今天&#xff0c;企业对于商业智能&#xff08;BI&#xff09;软件的选择往往陷入了一个误区&#xff1a;盲目追求国际品牌&#xff0c;却忽视了身边那些性价比极高的国产精品。如果你不慎踏入了“千万不要用国产BI”的陷阱&#xff0c;那么奥威BI软件将是…

PHP家政服务预约单开版微信小程序系统源码

&#x1f3e0; —— 便捷生活&#xff0c;从指尖开始&#x1f4aa; &#x1f308;【开篇&#xff1a;家政新风尚&#xff0c;一键触达】 在忙碌的生活节奏中&#xff0c;你是否渴望拥有一个温馨、整洁的家&#xff0c;却又苦于找不到合适的家政服务&#xff1f;现在&#xff…

C++_03

1、构造函数 1.1 什么是构造函数 类的构造函数是类的一种特殊的成员函数&#xff0c;它会在每次创建类的新对象时执行。 每次构造的是构造成员变量的初始化值&#xff0c;内存空间等。 构造函数的名称与类的名称是完全相同的&#xff0c;并且不会返回任何类型&#xff0c;也不…

对标GPT-4o!不锁区、支持手机、免费使用,Moshi来啦!

7月4日凌晨&#xff0c;法国知名开源AI研究实验室Kyutai在官网发布了&#xff0c;具备看、听、说多模态大模型——Moshi。 Moshi功能与OpenAI在5月14日展示的最新模型GPT-4o差不多&#xff0c;可以听取人的语音提问后进行实时推理回答内容。但GPT-4o的语音模式要在秋天才能全面…