rocketmq 初探(三)
生活随笔
收集整理的這篇文章主要介紹了
rocketmq 初探(三)
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
大家好,我是烤鴨:
????上一篇介紹了注冊中心,這一篇看下broker?;?rocketmq 4.9 版本。
BrokerStartup#BrokerController
按照代碼的先后順序擼源碼:
BrokerController.createBrokerController
public static BrokerController createBrokerController(String[] args) {// ...try {// ...final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();// salve節(jié)點(diǎn)的messageMmeory 比例由40% 降低至 30%if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);}// ...switch (messageStoreConfig.getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:// 0 是masterbrokerConfig.setBrokerId(MixAll.MASTER_ID);break;case SLAVE:if (brokerConfig.getBrokerId() <= 0) {System.out.printf("Slave's brokerId must be > 0");System.exit(-3);}break;default:break;}// DLeger模式,brokerId 為 -1if (messageStoreConfig.isEnableDLegerCommitLog()) {brokerConfig.setBrokerId(-1);}// ...final BrokerController controller = new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);// remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties);// 初始化boolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}// 注冊 shutdown 事件Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {private volatile boolean hasShutdown = false;private AtomicInteger shutdownTimes = new AtomicInteger(0);@Overridepublic void run() {synchronized (this) {log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());if (!this.hasShutdown) {this.hasShutdown = true;long beginTime = System.currentTimeMillis();controller.shutdown();long consumingTimeTotal = System.currentTimeMillis() - beginTime;log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);}}}}, "ShutdownHook"));return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null; }BrokerController.initialize()
public boolean initialize() throws CloneNotSupportedException {// 加載本地配置文件(topic,consumerOffset等信息),加載不到加載 .bak文件boolean result = this.topicConfigManager.load();result = result && this.consumerOffsetManager.load();result = result && this.subscriptionGroupManager.load();result = result && this.consumerFilterManager.load();// 初始化 messageStore(持久化)if (result) {try {this.messageStore =new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,this.brokerConfig);if (messageStoreConfig.isEnableDLegerCommitLog()) {DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);}this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);//load pluginMessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);this.messageStore = MessageStoreFactory.build(context, this.messageStore);this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));} catch (IOException e) {result = false;log.error("Failed to initialize", e);}}// commitlog、consumer和topic關(guān)系、索引恢復(fù)(臨時(shí)文件存在的話)result = result && this.messageStore.load();if (result) {this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);// netty 配置NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getSendMessageThreadPoolNums(),this.brokerConfig.getSendMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.sendThreadPoolQueue,new ThreadFactoryImpl("SendMessageThread_"));// 不同的線程池,注冊到對應(yīng)的processorthis.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getPullMessageThreadPoolNums(),this.brokerConfig.getPullMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.pullThreadPoolQueue,new ThreadFactoryImpl("PullMessageThread_"));// ...// 線程池注冊到processor,后續(xù)仔細(xì)說下this.registerProcessor();final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();final long period = 1000 * 60 * 60 * 24;// 延遲1天,每天記錄昨天存取的消息數(shù)量this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.getBrokerStats().record();} catch (Throwable e) {log.error("schedule record error.", e);}}}, initialDelay, period, TimeUnit.MILLISECONDS);// 每隔5s檢查是否更新consumer和offset數(shù)據(jù)this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerOffsetManager.persist();} catch (Throwable e) {log.error("schedule persist consumerOffset error.", e);}}}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);// 每隔10s檢查是否更新consumer過濾規(guī)則數(shù)據(jù)this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerFilterManager.persist();} catch (Throwable e) {log.error("schedule persist consumer filter error.", e);}}}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);// 每隔3分鐘檢查,開啟consumer消費(fèi)過慢后移除該consumer(默認(rèn)關(guān)閉)this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.protectBroker();} catch (Throwable e) {log.error("protectBroker error.", e);}}}, 3, 3, TimeUnit.MINUTES);// 每秒打印send\pull\query\transaction隊(duì)列大小和的最慢的消費(fèi)耗時(shí)this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printWaterMark();} catch (Throwable e) {log.error("printWaterMark error.", e);}}}, 10, 1, TimeUnit.SECONDS);// 主broker同步從broker的時(shí)候重試this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());} catch (Throwable e) {log.error("schedule dispatchBehindBytes error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);// 沒配置注冊中心的話,會(huì)每隔2分鐘去拉取(url為默認(rèn)讀取系統(tǒng)變量 rocketmq.namesrv.domain:8080/rocketmq)if (this.brokerConfig.getNamesrvAddr() != null) {this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.brokerOuterAPI.fetchNameServerAddr();} catch (Throwable e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);}// DLeger模式,從節(jié)點(diǎn)不需要定期更新 HA主節(jié)點(diǎn)地址if (!messageStoreConfig.isEnableDLegerCommitLog()) {if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());this.updateMasterHAServerAddrPeriodically = false;} else {this.updateMasterHAServerAddrPeriodically = true;}} else {// 每分鐘打印主節(jié)點(diǎn)和從節(jié)點(diǎn)的offset的不同this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printMasterAndSlaveDiff();} catch (Throwable e) {log.error("schedule printMasterAndSlaveDiff error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);}}// tls模式需要證書建立sslif (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {// Register a listener to reload SslContexttry {fileWatchService = new FileWatchService(new String[] {TlsSystemConfig.tlsServerCertPath,TlsSystemConfig.tlsServerKeyPath,TlsSystemConfig.tlsServerTrustCertPath},new FileWatchService.Listener() {boolean certChanged, keyChanged = false;@Overridepublic void onChanged(String path) {if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {log.info("The trust certificate changed, reload the ssl context");reloadServerSslContext();}if (path.equals(TlsSystemConfig.tlsServerCertPath)) {certChanged = true;}if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {keyChanged = true;}if (certChanged && keyChanged) {log.info("The certificate and private key changed, reload the ssl context");certChanged = keyChanged = false;reloadServerSslContext();}}private void reloadServerSslContext() {((NettyRemotingServer) remotingServer).loadSslContext();((NettyRemotingServer) fastRemotingServer).loadSslContext();}});} catch (Exception e) {log.warn("FileWatchService created error, can't load the certificate dynamically");}}// 事務(wù)初始化initialTransaction();// acl鑒權(quán)初始化initialAcl();// rpc鉤子初始化(acl和匿名的)initialRpcHooks();}return result; }BrokerStartup.start()
public void start() throws Exception {// 消息存儲(chǔ),包含 commitLog 和 集群模式下消息同步if (this.messageStore != null) {this.messageStore.start();}// nettyServer 的初始化,初探(二)有詳細(xì)的if (this.remotingServer != null) {this.remotingServer.start();}// 同上if (this.fastRemotingServer != null) {this.fastRemotingServer.start();}// 每500ms監(jiān)測 tls的3個(gè)證書,如果變了就重新加載(tls.server.certPath...)if (this.fileWatchService != null) {this.fileWatchService.start();}// nettyRemotingClient.satrt,一會(huì)單獨(dú)看下if (this.brokerOuterAPI != null) {this.brokerOuterAPI.start();}// 需要hold的拉取請求統(tǒng)一處理,每隔5s或1s檢測消息是否到達(dá)if (this.pullRequestHoldService != null) {this.pullRequestHoldService.start();}// 每10s掃描 producer、consumer、broker 的在線情況 if (this.clientHousekeepingService != null) {this.clientHousekeepingService.start();}// 過濾服務(wù)器,在broker機(jī)器啟動(dòng)多個(gè)filter進(jìn)程用來進(jìn)程consumer消息過濾if (this.filterServerManager != null) {this.filterServerManager.start();}// 啟用DLedger,slave 每隔10s更新配置,broker注冊到nameserverif (!messageStoreConfig.isEnableDLegerCommitLog()) {startProcessorByHa(messageStoreConfig.getBrokerRole());handleSlaveSynchronize(messageStoreConfig.getBrokerRole());this.registerBrokerAll(true, false, true);}// 隨機(jī) 30-60s,定時(shí) broker注冊到nameserverthis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);if (this.brokerStatsManager != null) {this.brokerStatsManager.start();}// 快速失敗,如果是刷盤過慢,返回 system busy,清理隊(duì)列(發(fā)送、拉取、心跳、請求)if (this.brokerFastFailure != null) {this.brokerFastFailure.start();}}小結(jié)
這篇主要是介紹了broker的init和start。
DLegerCommit開啟模式(替代原有的commitLog,可以直接讀取CommitLog的API)
初始化:
-
消費(fèi)持久化 messageStore
-
commitlog、consumer和topic關(guān)系、索引恢復(fù)(臨時(shí)文件存在的話)
-
netty配置(remotingServer 和 fastRemotingServer)
-
processor(PullMessage、SendMessage 等)
-
定時(shí)器(記錄消費(fèi)總量、更新consumer和offset數(shù)據(jù) 等)
-
acl鑒權(quán)、事務(wù)、rpchook
啟動(dòng):
- 消息持久化 messageStore
- netty server啟動(dòng)(remotingServer 和 fastRemotingServer)
- tls模式檢測證書變化
- pullRequestHold模式下,啟動(dòng)監(jiān)聽消息
- 每10s掃描 producer、consumer、broker 的在線情況
- 啟用DLedger,slave 每隔10s更新配置,broker注冊到nameserver
- 隨機(jī) 30-60s,定時(shí) broker注冊到nameserver
- 快速失敗,如果是刷盤過慢,返回 system busy,清理隊(duì)列(發(fā)送、拉取、心跳、請求)
總結(jié)
以上是生活随笔為你收集整理的rocketmq 初探(三)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 支付宝新版SDK-PC扫码支付-手机浏览
- 下一篇: 求数组中的最小子数组,时间复杂度o(n)