日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java activemq 断线_java - 防止ActiveMQ重新连接失败时自动退出 - SO中文参考 - www.soinside.com...

發布時間:2023/11/30 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java activemq 断线_java - 防止ActiveMQ重新连接失败时自动退出 - SO中文参考 - www.soinside.com... 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

我有一個小型的spring-boot應用程序,該應用程序連接到ActiveMQ上的一個或多個主題,這些主題在啟動時在應用程序的application.properties文件中設置-然后將這些消息發送到數據庫。

這一切都很好,但是在嘗試實施故障轉移時遇到一些問題-基本上,應用程序將嘗試重新連接,但是在重試一定次數后,應用程序進程將自動退出,從而防止重試(理想情況下,我希望該應用程序能夠永久重試,直到被手動終止或ActiveMQ再次可用為止。我嘗試將連接URL(使用application.properties中的maxReconnectAttempts)中的連接選項(例如url.options)顯式設置為-1/0/99999,但這些都似乎不正確,因為行為是相同的每一次。通過查看有關Apache's own reference page的建議,我也希望該行為也可以作為默認行為。

[如果有人有任何建議強制不要退出該應用程序,我將非常感謝!我認為很重要的代碼如下:@Configuration

public class AmqConfig {

private static final Logger LOG = LogManager.getLogger(AmqConfig.class);

private static final String LOG_PREFIX = "[AmqConfig] ";

private String clientId;

private static ArrayList amqUrls = new ArrayList<>();

private static String amqConnectionUrl;

private static Integer numSubs;

private static ArrayList destinations = new ArrayList<>();

@Autowired

DatabaseService databaseService;

public AmqConfig (@Value("${amq.urls}") String[] amqUrl,

@Value("${amq.options}") String amqOptions,

@Value("${tocCodes}") String[] tocCodes,

@Value("${amq.numSubscribers}") Integer numSubs,

@Value("${clientId}") String clientId) throws UnknownHostException {

Arrays.asList(amqUrl).forEach(url -> {

amqUrls.add("tcp://" + url);

});

String amqServerAddress = "failover:(" + String.join(",", amqUrls) + ")";

String options = Strings.isNullOrEmpty(amqOptions) ? "" : "?" + amqOptions;

this.amqConnectionUrl = amqServerAddress + options;

this.numSubs = Optional.ofNullable(numSubs).orElse(4);

this.clientId = Strings.isNullOrEmpty(clientId) ? InetAddress.getLocalHost().getHostName() : clientId;

String topic = "Consumer." + this.clientId + ".VirtualTopic.Feed";

if (tocCodes.length > 0){

Arrays.asList(tocCodes).forEach(s -> destinations.add(topic + "_" + s));

} else { // no TOC codes = connecting to default feed

destinations.add(topic);

}

}

@Bean

public ActiveMQConnectionFactory connectionFactory() throws JMSException {

LOG.info("{}Connecting to AMQ at {}", LOG_PREFIX, amqConnectionUrl);

LOG.info("{}Using client id {}", LOG_PREFIX, clientId);

ActiveMQConnectionFactory connectionFactory =

new ActiveMQConnectionFactory(amqConnectionUrl);

Connection conn = connectionFactory.createConnection();

conn.setClientID(clientId);

conn.setExceptionListener(new AmqExceptionListener());

conn.start();

destinations.forEach(destinationName -> {

try {

for (int i = 0; i < numSubs; i++) {

Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination destination = session.createQueue(destinationName);

MessageConsumer messageConsumer = session.createConsumer(destination);

messageConsumer.setMessageListener(new MessageReceiver(databaseService, destinationName));

}

} catch (JMSException e) {

LOG.error("{}Error setting up queue @ {}", LOG_PREFIX, destinationName);

LOG.error(e.getMessage());

}

});

return connectionFactory;

}

}

public class MessageReceiver implements MessageListener, ExceptionListener {

public static final Logger LOG = LogManager.getLogger(MessageReceiver.class);

private static final String LOG_PREFIX = "[Message Receiver] ";

private DatabaseService databaseService;

public MessageReceiver(DatabaseService databaseService, String destinationName){

this.databaseService = databaseService;

LOG.info("{}Creating MessageReceiver for queue with destination: {}", LOG_PREFIX, destinationName);

}

@Override

public void onMessage(Message message) {

String messageText = null;

if (message instanceof TextMessage) {

TextMessage tm = (TextMessage) message;

try {

messageText = tm.getText();

} catch (JMSException e) {

LOG.error("{} Error getting message from AMQ", e);

}

} else if (message instanceof ActiveMQMessage) {

messageText = message.toString();

} else {

LOG.warn("{}Unrecognised message type, cannot process", LOG_PREFIX);

LOG.warn(message.toString());

}

try {

databaseService.sendMessageNoResponse(messageText);

} catch (Exception e) {

LOG.error("{}Unable to acknowledge message from AMQ. Message: {}", LOG_PREFIX, messageText, e);

}

}

}

public class AmqExceptionListener implements ExceptionListener {

public static final Logger LOG = LogManager.getLogger(AmqExceptionListener.class);

private static final String LOG_PREFIX = "[AmqExceptionListener ] ";

@Override

public void onException(JMSException e){

LOG.error("{}Exception thrown by ActiveMQ", LOG_PREFIX, e);

}

}

我從應用程序中獲得的控制臺輸出僅是以下內容(很抱歉,由于沒有太大用處,]][2019-12-12 14:43:30.292] [WARN ] Transport (tcp://[address]:61616) failed , attempting to automatically reconnect: java.io.EOFException

[2019-12-12 14:43:51.098] [WARN ] Failed to connect to [tcp://[address]:61616] after: 10 attempt(s) continuing to retry.

Process finished with exit code 0

我有一個小型的spring-boot應用程序設置,該應用程序連接到ActiveMQ上的一個或多個主題,這些主題在啟動時在應用程序的application.properties文件中設置-然后將這些消息發送到...

總結

以上是生活随笔為你收集整理的java activemq 断线_java - 防止ActiveMQ重新连接失败时自动退出 - SO中文参考 - www.soinside.com...的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。