日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Apache Kafka-Spring Kafka生产消费@KafkaListener源码解析

發布時間:2025/3/21 javascript 45 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Apache Kafka-Spring Kafka生产消费@KafkaListener源码解析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • 概述
  • Spring-kafka生產者源碼流程
  • Spring-kafka消費者源碼流程(`@EnableKafka`和`@KafkaListener` )
    • Flow
    • KafkaListenerAnnotationBeanPostProcessor


概述

【依賴】

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

【配置】

#kafka spring.kafka.bootstrap-servers=10.11.114.247:9092 spring.kafka.producer.acks=1 spring.kafka.producer.retries=3 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializerspring.kafka.consumer.group-id=zfprocessor_group spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages=com.artisan.common.entity.messages spring.kafka.consumer.max-poll-records=500 spring.kafka.consumer.fetch-min-size=10 spring.kafka.consumer.fetch-max-wait=10000msspring.kafka.listener.missing-topics-fatal=false spring.kafka.listener.type=batch spring.kafka.listener.ack-mode=manuallogging.level.org.springframework.kafka=ERROR logging.level.org.apache.kafka=ERROR

Spring-kafka生產者源碼流程

ListenableFuture<SendResult<Object, Object>> result = kafkaTemplate.send(TOPICA.TOPIC, messageMock);

主要的源碼流程如下


Spring-kafka消費者源碼流程(@EnableKafka和@KafkaListener )

消費的話,比較復雜

@KafkaListener(topics = TOPICA.TOPIC ,groupId = CONSUMER_GROUP_PREFIX + TOPICA.TOPIC)public void onMessage(MessageMock messageMock){logger.info("【接受到消息][線程:{} 消息內容:{}]", Thread.currentThread().getName(), messageMock);}

劃重點,主要關注


Flow

作為引子,我們繼續來梳理下源碼

繼續


繼續


KafkaBootstrapConfiguration的主要功能是創建兩個bean

KafkaListenerAnnotationBeanPostProcessor

實現了如下接口

implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton

主要功能就是監聽@KafkaListener注解 。 bean的后置處理器 需要重寫 postProcessAfterInitialization

@Overridepublic Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {if (!this.nonAnnotatedClasses.contains(bean.getClass())) {// 獲取對應的classClass<?> targetClass = AopUtils.getTargetClass(bean);// 查找類是否有@KafkaListener注解Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);final boolean hasClassLevelListeners = classLevelListeners.size() > 0;final List<Method> multiMethods = new ArrayList<>();// 查找類中方法上是否有對應的@KafkaListener注解,Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {Set<KafkaListener> listenerMethods = findListenerAnnotations(method);return (!listenerMethods.isEmpty() ? listenerMethods : null);});if (hasClassLevelListeners) {Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,(ReflectionUtils.MethodFilter) method ->AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);multiMethods.addAll(methodsWithHandler);}if (annotatedMethods.isEmpty()) {this.nonAnnotatedClasses.add(bean.getClass());this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());}else {// Non-empty set of methodsfor (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {Method method = entry.getKey();for (KafkaListener listener : entry.getValue()) {// 處理@KafkaListener注解 重點看 processKafkaListener(listener, method, bean, beanName);}}this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"+ beanName + "': " + annotatedMethods);}if (hasClassLevelListeners) {processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);}}return bean;}

重點方法

protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {Method methodToUse = checkProxy(method, bean);MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();endpoint.setMethod(methodToUse);processListener(endpoint, kafkaListener, bean, methodToUse, beanName);}

繼續 processListener

protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,Object bean, Object adminTarget, String beanName) {String beanRef = kafkaListener.beanRef();if (StringUtils.hasText(beanRef)) {this.listenerScope.addListener(beanRef, bean);}// 構建 endpointendpoint.setBean(bean);endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);endpoint.setId(getEndpointId(kafkaListener));endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));endpoint.setTopics(resolveTopics(kafkaListener));endpoint.setTopicPattern(resolvePattern(kafkaListener));endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));String group = kafkaListener.containerGroup();if (StringUtils.hasText(group)) {Object resolvedGroup = resolveExpression(group);if (resolvedGroup instanceof String) {endpoint.setGroup((String) resolvedGroup);}}String concurrency = kafkaListener.concurrency();if (StringUtils.hasText(concurrency)) {endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));}String autoStartup = kafkaListener.autoStartup();if (StringUtils.hasText(autoStartup)) {endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));}resolveKafkaProperties(endpoint, kafkaListener.properties());endpoint.setSplitIterables(kafkaListener.splitIterables());KafkaListenerContainerFactory<?> factory = null;String containerFactoryBeanName = resolve(kafkaListener.containerFactory());if (StringUtils.hasText(containerFactoryBeanName)) {Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");try {factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);}catch (NoSuchBeanDefinitionException ex) {throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget+ "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()+ " with id '" + containerFactoryBeanName + "' was found in the application context", ex);}}endpoint.setBeanFactory(this.beanFactory);String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");if (StringUtils.hasText(errorHandlerBeanName)) {endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));}// 將endpoint注冊到registrarthis.registrar.registerEndpoint(endpoint, factory);if (StringUtils.hasText(beanRef)) {this.listenerScope.removeListener(beanRef);}}

繼續看 registerEndpoint

public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {Assert.notNull(endpoint, "Endpoint must be set");Assert.hasText(endpoint.getId(), "Endpoint id must be set");// Factory may be null, we defer the resolution right before actually creating the container// 把endpoint封裝為KafkaListenerEndpointDescriptorKafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);synchronized (this.endpointDescriptors) {if (this.startImmediately) { // Register and start immediatelythis.endpointRegistry.registerListenerContainer(descriptor.endpoint,resolveContainerFactory(descriptor), true);}else {// 將descriptor添加到endpointDescriptorsthis.endpointDescriptors.add(descriptor);}}}

總的來看: 得到一個含有KafkaListener基本信息的Endpoint,將Endpoint被封裝到KafkaListenerEndpointDescriptor,KafkaListenerEndpointDescriptor被添加到KafkaListenerEndpointRegistrar.endpointDescriptors中,至此這部分的流程結束了,感覺沒有下文呀。


KafkaListenerEndpointRegistrar.endpointDescriptors 這個List中的數據怎么用呢?

public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {}

KafkaListenerEndpointRegistrar 實現了 InitializingBean 接口,重寫 afterPropertiesSet,該方法會在bean實例化完成后執行

@Overridepublic void afterPropertiesSet() {registerAllEndpoints();}

繼續 registerAllEndpoints();

protected void registerAllEndpoints() {synchronized (this.endpointDescriptors) {// 遍歷KafkaListenerEndpointDescriptor for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {// 注冊 this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor));}this.startImmediately = true; // trigger immediate startup}}

繼續

public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {registerListenerContainer(endpoint, factory, false);}

go

public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,boolean startImmediately) {Assert.notNull(endpoint, "Endpoint must not be null");Assert.notNull(factory, "Factory must not be null");String id = endpoint.getId();Assert.hasText(id, "Endpoint id must not be empty");synchronized (this.listenerContainers) {Assert.state(!this.listenerContainers.containsKey(id),"Another endpoint is already registered with id '" + id + "'"); // 創建Endpoint對應的MessageListenerContainer,將創建好的MessageListenerContainer放入listenerContainersMessageListenerContainer container = createListenerContainer(endpoint, factory);this.listenerContainers.put(id, container);// 如果KafkaListener注解中有對應的group信息,則將container添加到對應的group中if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {List<MessageListenerContainer> containerGroup;if (this.applicationContext.containsBean(endpoint.getGroup())) {containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);}else {containerGroup = new ArrayList<MessageListenerContainer>();this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);}containerGroup.add(container);}if (startImmediately) {startIfNecessary(container);}}}

總結

以上是生活随笔為你收集整理的Apache Kafka-Spring Kafka生产消费@KafkaListener源码解析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。