kiwi包源码解析
公司kiwi-util包源碼分析
代碼是監(jiān)聽消息中間件的工具包。最開始招我進(jìn)來(lái)的主管寫的,現(xiàn)在已經(jīng)是總監(jiān)級(jí)別,不再寫代碼。記得16年夏天入職時(shí)候還是我還是小白一枚,感謝主管給了我機(jī)會(huì)。來(lái)了之后才發(fā)現(xiàn)真的是大神一枚,在此收下我的膝蓋。
使用java監(jiān)聽activeMQ的相關(guān)鏈接為:
https://blog.csdn.net/zbw18297786698/article/details/52994746
話不多說(shuō),開始解析。
一 初始化加載配置信息
代碼監(jiān)聽mq消息時(shí)配置的消息隊(duì)列如下,啟動(dòng)時(shí)候,加載該配置:
@Component("BootSpringListener") public class BootSpringListener implements ApplicationListener {@Overridepublic void onApplicationEvent(ApplicationEvent event) {if (event instanceof ContextRefreshedEvent) {final String key = UUID.randomUUID().toString();try {Resource[] resource = new PathMatchingResourcePatternResolver().getResources("classpath*:bussiness/" + bussinessSide + "/message*.xml");logger.info("[key={}][init messageService resource size={}]", key, resource.length);for (Resource res : resource) {messageService.configure(res.getInputStream());}} catch (Exception e) {logger.error("[key={}][exception={}]", key, e.getMessage(), e);}logger.info("init messageService");}} <?xml version="1.0" encoding="UTF-8" standalone="yes"?> <MessageConfigs><client name="cartoonactivemq"><factory>com.*.kiwi.message.impl.activemq.ActiveMQMessageClientFactory</factory><properties><property key="messageSwitch" value="${cartoon.message.switch}"/><property key="maxConnections" value="5"/><property key="maximumActive" value="100"/><property key="username" value="${cartoon.activemq.username}"/><property key="password" value="${cartoon.activemq.password}"/><property key="url" value="${cartoon.activemq.url}"/></properties></client><destination><name>${activemq.reading.message.prefix}picturebook_property_modify</name><type>queue</type><client>readingactivemq</client><processor>com.*.search.api.core.service.search.message.mongo.PartialMongoUpdateProcessor</processor><mode>listen</mode><properties><property key="threadPoolSize" value="100"/></properties></destination> </MessageConfigs>二 加載配置步驟:
(1)解析xml: MessageConfigs configs = jaxbBinder.fromXML(is);
這里的類 MessageConfigs 為使用XmlRootElement將類映射到XML元素,具體參考:
http://desert3.iteye.com/blog/1570092
(2)從client中初始化客戶端連接池【詳細(xì)見代碼注釋A處】,并將每個(gè)destination中都加入在xml中對(duì)應(yīng)的客戶端,并通過(guò)destination的initialize方法真正開始監(jiān)聽消息【詳細(xì)見代碼注釋B處】
(3)destination中含有消息隊(duì)列的基本配置,消息隊(duì)列名稱、消息處理器、線程池size大小,以及封裝的consumer。而consumer則是根據(jù)destination的信息來(lái)構(gòu)造,并初始化。Consumer consumer = client.createConsumer(this);這里this即為destination。
(3)destination.initialize 會(huì)調(diào)用consumer.start
public void configure(InputStream is) {try {JaxbBinder jaxbBinder = new JaxbBinder(MessageConfigs.class);MessageConfigs configs = jaxbBinder.fromXML(is);//初始化客戶端、連接池for (ClientConfig clientConfig : configs.getClientConfig()) {//使用類加載器創(chuàng)建對(duì)象實(shí)例MessageClientFactory messageClientFactory = (MessageClientFactory) Class.forName(clientConfig.getFactory()).newInstance();//A.此處保存mq連接的基本信息MessageClient client = messageClientFactory.createMessageClient(clientConfig.getProperties());client.setProperties(clientConfig.getProperties());client.setName(clientConfig.getName());client.initialize();clientMap.put(client.getName(), client);}//初始化每一個(gè)消息目的地(Queue或topic,及其生產(chǎn)者和消費(fèi)者)for (DestinationConfig destinationConfig : configs.getDestinationConfigs()) {MessageClient client = this.clientMap.get(destinationConfig.getClient());DestinationImpl destination = new DestinationImpl(destinationConfig.getName(), destinationConfig.getType(), client);destination.setProcessor(destinationConfig.getProcessor());destination.setMode(destinationConfig.getMode());destination.setProperties(destinationConfig.getProperties());destination.setClient(client);destination.setTransacted(destinationConfig.isTransacted());destination.setEnableRecover(destinationConfig.isEnableRecover());//B.真正開始監(jiān)聽消息destination.initialize();this.destinationMap.put(destination.getName(), destination);}} catch (Exception e) {throw new RuntimeException("message configure error: ", e);} finally {if (is != null) {try {is.close();} catch (IOException e) {throw new RuntimeException("message configure error: ", e);}}}}###三 創(chuàng)建Consumer以及session
Client根據(jù)destination構(gòu)造Consumer類,注意這里創(chuàng)建session的方式:
session = connection.createSession(destination.isTransacted(), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);即為逐條消息確認(rèn),每消費(fèi)一條確認(rèn)一條(INDIVIDUAL_ACKNOWLEDGE)),且不開啟事務(wù)。
public Consumer createConsumer(Destination destination) {try {connection = connectionFactory.getConnectionFactory().createConnection();//注意這里sessisession = connection.createSession(destination.isTransacted(), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);javax.jms.Destination dest = null;if(Destination.TYPE_QUEUE.equals(destination.getType())) {dest = session.createQueue(destination.getName());} else if(Destination.TYPE_TOPIC.equals(destination.getType())) {dest = session.createTopic(destination.getName());}MessageConsumer messageConsumer = null;if(isDurable && dest instanceof Topic) {messageConsumer = session.createDurableSubscriber((Topic)dest, clientId +"-"+ destination.getName());} else {messageConsumer = session.createConsumer(dest);}ActiveMQConsumer consumer = new ActiveMQConsumer();consumer.setConsumer(messageConsumer);consumer.setSession(session);consumer.setDestination(destination);consumer.setConnection(connection);String threadPoolSizeString = destination.getProperties().get("threadPoolSize");if(threadPoolSizeString != null) {consumer.setThreadPoolSize(Integer.parseInt(threadPoolSizeString));}return consumer;} catch (JMSException e) {logger.error("init producer for {} error", destination.getName(), e);}return null;}###四 consumer類詳解
consumer基礎(chǔ):
consumer類已經(jīng)與destination綁定,一個(gè)queue對(duì)應(yīng)了一個(gè)consumer類對(duì)象的實(shí)例。
consumer類實(shí)現(xiàn)了callback【各種processor 調(diào)用finished時(shí)候回調(diào) 】,監(jiān)聽消息隊(duì)列MessageListener 【onMessage接口】,以及conumser【start、stop】
當(dāng)消息抵達(dá)時(shí),consumer的onMessage接口被調(diào)用,觸發(fā)處理邏輯:
(1)設(shè)置回調(diào)為當(dāng)前類,finished時(shí)候觸發(fā),如果finished為false則session回滾或recover【根據(jù)是否配置事務(wù)來(lái)設(shè)置】
(2)實(shí)例化該處理器,并提交至線程池
@Overridepublic void onMessage(Message message) {this.processingMessageCount.incrementAndGet();try {logger.info("[module:message] [action:start] [id:{}]", message.getJMSMessageID());Object msg = null;try {messageText = JsonBinder.buildNormalBinder().toJson(msg);} catch (Exception e) {}logger.info("[module:message] [action:parse] [id:{}] [content:{}]", message.getJMSMessageID(), messageText);if (destination.getProcessor() != null) {Class compileClazz = Class.forName(destination.getProcessor());Processor processorObject = (Processor) compileClazz.newInstance();Map<String, Object> context = new HashMap<String, Object>();context.put("original", message);//設(shè)置回調(diào)為當(dāng)前consumer類 processorObject.setCallback(this);processorObject.setContext(context);processorObject.setMessage(msg);//threadPoolExecutor.execute(processorObject);logger.debug("[module:message] [action:execute] [id:{}] [content:{}]", message.getJMSMessageID(), messageText);logger.info("[module:message] [action:end] [id:{}] [content:{}]", message.getJMSMessageID(), messageText);} catch (JMSException e) {logger.error("consume message {} error!", message.toString(), e);} finally {while (this.processingMessageCount.get() >= maxProcessingMessageCount) {try {Thread.sleep(1000);} catch (InterruptedException e) {logger.error("consume message {} error!", message.toString(), e);}}}}總結(jié)
- 上一篇: lisp角度转换弪度_角度和弧度换算(角
- 下一篇: 蜘蛛大战之 站点LOGO(SEO)