日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

kiwi包源码解析

發(fā)布時(shí)間:2023/12/8 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kiwi包源码解析 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

公司kiwi-util包源碼分析

代碼是監(jiān)聽消息中間件的工具包。最開始招我進(jìn)來(lái)的主管寫的,現(xiàn)在已經(jīng)是總監(jiān)級(jí)別,不再寫代碼。記得16年夏天入職時(shí)候還是我還是小白一枚,感謝主管給了我機(jī)會(huì)。來(lái)了之后才發(fā)現(xiàn)真的是大神一枚,在此收下我的膝蓋。

使用java監(jiān)聽activeMQ的相關(guān)鏈接為:

https://blog.csdn.net/zbw18297786698/article/details/52994746

話不多說(shuō),開始解析。

一 初始化加載配置信息

代碼監(jiān)聽mq消息時(shí)配置的消息隊(duì)列如下,啟動(dòng)時(shí)候,加載該配置:

@Component("BootSpringListener") public class BootSpringListener implements ApplicationListener {@Overridepublic void onApplicationEvent(ApplicationEvent event) {if (event instanceof ContextRefreshedEvent) {final String key = UUID.randomUUID().toString();try {Resource[] resource = new PathMatchingResourcePatternResolver().getResources("classpath*:bussiness/" + bussinessSide + "/message*.xml");logger.info("[key={}][init messageService resource size={}]", key, resource.length);for (Resource res : resource) {messageService.configure(res.getInputStream());}} catch (Exception e) {logger.error("[key={}][exception={}]", key, e.getMessage(), e);}logger.info("init messageService");}} <?xml version="1.0" encoding="UTF-8" standalone="yes"?> <MessageConfigs><client name="cartoonactivemq"><factory>com.*.kiwi.message.impl.activemq.ActiveMQMessageClientFactory</factory><properties><property key="messageSwitch" value="${cartoon.message.switch}"/><property key="maxConnections" value="5"/><property key="maximumActive" value="100"/><property key="username" value="${cartoon.activemq.username}"/><property key="password" value="${cartoon.activemq.password}"/><property key="url" value="${cartoon.activemq.url}"/></properties></client><destination><name>${activemq.reading.message.prefix}picturebook_property_modify</name><type>queue</type><client>readingactivemq</client><processor>com.*.search.api.core.service.search.message.mongo.PartialMongoUpdateProcessor</processor><mode>listen</mode><properties><property key="threadPoolSize" value="100"/></properties></destination> </MessageConfigs>

二 加載配置步驟:

(1)解析xml: MessageConfigs configs = jaxbBinder.fromXML(is);

這里的類 MessageConfigs 為使用XmlRootElement將類映射到XML元素,具體參考:

http://desert3.iteye.com/blog/1570092

(2)從client中初始化客戶端連接池【詳細(xì)見代碼注釋A處】,并將每個(gè)destination中都加入在xml中對(duì)應(yīng)的客戶端,并通過(guò)destination的initialize方法真正開始監(jiān)聽消息【詳細(xì)見代碼注釋B處】

(3)destination中含有消息隊(duì)列的基本配置,消息隊(duì)列名稱、消息處理器、線程池size大小,以及封裝的consumer。而consumer則是根據(jù)destination的信息來(lái)構(gòu)造,并初始化。Consumer consumer = client.createConsumer(this);這里this即為destination。

(3)destination.initialize 會(huì)調(diào)用consumer.start

public void configure(InputStream is) {try {JaxbBinder jaxbBinder = new JaxbBinder(MessageConfigs.class);MessageConfigs configs = jaxbBinder.fromXML(is);//初始化客戶端、連接池for (ClientConfig clientConfig : configs.getClientConfig()) {//使用類加載器創(chuàng)建對(duì)象實(shí)例MessageClientFactory messageClientFactory = (MessageClientFactory) Class.forName(clientConfig.getFactory()).newInstance();//A.此處保存mq連接的基本信息MessageClient client = messageClientFactory.createMessageClient(clientConfig.getProperties());client.setProperties(clientConfig.getProperties());client.setName(clientConfig.getName());client.initialize();clientMap.put(client.getName(), client);}//初始化每一個(gè)消息目的地(Queue或topic,及其生產(chǎn)者和消費(fèi)者)for (DestinationConfig destinationConfig : configs.getDestinationConfigs()) {MessageClient client = this.clientMap.get(destinationConfig.getClient());DestinationImpl destination = new DestinationImpl(destinationConfig.getName(), destinationConfig.getType(), client);destination.setProcessor(destinationConfig.getProcessor());destination.setMode(destinationConfig.getMode());destination.setProperties(destinationConfig.getProperties());destination.setClient(client);destination.setTransacted(destinationConfig.isTransacted());destination.setEnableRecover(destinationConfig.isEnableRecover());//B.真正開始監(jiān)聽消息destination.initialize();this.destinationMap.put(destination.getName(), destination);}} catch (Exception e) {throw new RuntimeException("message configure error: ", e);} finally {if (is != null) {try {is.close();} catch (IOException e) {throw new RuntimeException("message configure error: ", e);}}}}

###三 創(chuàng)建Consumer以及session

Client根據(jù)destination構(gòu)造Consumer類,注意這里創(chuàng)建session的方式

session = connection.createSession(destination.isTransacted(), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);

即為逐條消息確認(rèn),每消費(fèi)一條確認(rèn)一條(INDIVIDUAL_ACKNOWLEDGE)),且不開啟事務(wù)

public Consumer createConsumer(Destination destination) {try {connection = connectionFactory.getConnectionFactory().createConnection();//注意這里sessisession = connection.createSession(destination.isTransacted(), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);javax.jms.Destination dest = null;if(Destination.TYPE_QUEUE.equals(destination.getType())) {dest = session.createQueue(destination.getName());} else if(Destination.TYPE_TOPIC.equals(destination.getType())) {dest = session.createTopic(destination.getName());}MessageConsumer messageConsumer = null;if(isDurable && dest instanceof Topic) {messageConsumer = session.createDurableSubscriber((Topic)dest, clientId +"-"+ destination.getName());} else {messageConsumer = session.createConsumer(dest);}ActiveMQConsumer consumer = new ActiveMQConsumer();consumer.setConsumer(messageConsumer);consumer.setSession(session);consumer.setDestination(destination);consumer.setConnection(connection);String threadPoolSizeString = destination.getProperties().get("threadPoolSize");if(threadPoolSizeString != null) {consumer.setThreadPoolSize(Integer.parseInt(threadPoolSizeString));}return consumer;} catch (JMSException e) {logger.error("init producer for {} error", destination.getName(), e);}return null;}

###四 consumer類詳解

consumer基礎(chǔ):

consumer類已經(jīng)與destination綁定,一個(gè)queue對(duì)應(yīng)了一個(gè)consumer類對(duì)象的實(shí)例。

consumer類實(shí)現(xiàn)了callback【各種processor 調(diào)用finished時(shí)候回調(diào) 】,監(jiān)聽消息隊(duì)列MessageListener 【onMessage接口】,以及conumser【start、stop】

當(dāng)消息抵達(dá)時(shí),consumer的onMessage接口被調(diào)用,觸發(fā)處理邏輯:

(1)設(shè)置回調(diào)為當(dāng)前類,finished時(shí)候觸發(fā),如果finished為false則session回滾或recover【根據(jù)是否配置事務(wù)來(lái)設(shè)置】

(2)實(shí)例化該處理器,并提交至線程池

@Overridepublic void onMessage(Message message) {this.processingMessageCount.incrementAndGet();try {logger.info("[module:message] [action:start] [id:{}]", message.getJMSMessageID());Object msg = null;try {messageText = JsonBinder.buildNormalBinder().toJson(msg);} catch (Exception e) {}logger.info("[module:message] [action:parse] [id:{}] [content:{}]", message.getJMSMessageID(), messageText);if (destination.getProcessor() != null) {Class compileClazz = Class.forName(destination.getProcessor());Processor processorObject = (Processor) compileClazz.newInstance();Map<String, Object> context = new HashMap<String, Object>();context.put("original", message);//設(shè)置回調(diào)為當(dāng)前consumer類 processorObject.setCallback(this);processorObject.setContext(context);processorObject.setMessage(msg);//threadPoolExecutor.execute(processorObject);logger.debug("[module:message] [action:execute] [id:{}] [content:{}]", message.getJMSMessageID(), messageText);logger.info("[module:message] [action:end] [id:{}] [content:{}]", message.getJMSMessageID(), messageText);} catch (JMSException e) {logger.error("consume message {} error!", message.toString(), e);} finally {while (this.processingMessageCount.get() >= maxProcessingMessageCount) {try {Thread.sleep(1000);} catch (InterruptedException e) {logger.error("consume message {} error!", message.toString(), e);}}}}

總結(jié)

以上是生活随笔為你收集整理的kiwi包源码解析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。