Spring消息之STOMP
一、STOMP 簡(jiǎn)介
直接使用WebSocket(或SockJS)就很類似于使用TCP套接字來編寫Web應(yīng)用。因?yàn)闆]有高層級(jí)的線路協(xié)議(wire protocol),因此就需要我們定義應(yīng)用之間所發(fā)送消息的語義,還需要確保連接的兩端都能遵循這些語義。
就像HTTP在TCP套接字之上添加了請(qǐng)求-響應(yīng)模型層一樣,STOMP在WebSocket之上提供了一個(gè)基于幀的線路格式(frame-based wire format)層,用來定義消息的語義。
與HTTP請(qǐng)求和響應(yīng)類似,STOMP幀由命令、一個(gè)或多個(gè)頭信息以及負(fù)載所組成。例如,如下就是發(fā)送數(shù)據(jù)的一個(gè)STOMP幀:
>>> SEND
transaction:tx-0
destination:/app/marco
content-length:20
{"message":"Marco!"}
在這個(gè)例子中,STOMP命令是send,表明會(huì)發(fā)送一些內(nèi)容。緊接著是三個(gè)頭信息:一個(gè)表示消息的的事務(wù)機(jī)制,一個(gè)用來表示消息要發(fā)送到哪里的目的地,另外一個(gè)則包含了負(fù)載的大小。然后,緊接著是一個(gè)空行,STOMP幀的最后是負(fù)載內(nèi)容。
二、服務(wù)端實(shí)現(xiàn)
1、啟用STOMP功能
STOMP 的消息根據(jù)前綴的不同分為三種。如下,以 /app 開頭的消息都會(huì)被路由到帶有@MessageMapping 或 @SubscribeMapping 注解的方法中;以/topic 或 /queue 開頭的消息都會(huì)發(fā)送到STOMP代理中,根據(jù)你所選擇的STOMP代理不同,目的地的可選前綴也會(huì)有所限制;以/user開頭的消息會(huì)將消息重路由到某個(gè)用戶獨(dú)有的目的地上。
@Configuration
@EnableWebSocketMessageBroker
@PropertySource("classpath:resources.properties")
public class WebSocketStompConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Value("${rabbitmq.host}")
private String host;
@Value("${rabbitmq.port}")
private Integer port;
@Value("${rabbitmq.userName}")
private String userName;
@Value("${rabbitmq.password}")
private String password;
/**
* 將 "/stomp" 注冊(cè)為一個(gè) STOMP 端點(diǎn)。這個(gè)路徑與之前發(fā)送和接收消息的目的地路徑有所
* 不同。這是一個(gè)端點(diǎn),客戶端在訂閱或發(fā)布消息到目的地路徑前,要連接到該端點(diǎn)。
*
* @param registry
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/stomp").withSockJS();
}
/**
* 如果不重載它的話,將會(huì)自動(dòng)配置一個(gè)簡(jiǎn)單的內(nèi)存消息代理,用它來處理以"/topic"為前綴的消息
*
* @param registry
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//基于內(nèi)存的STOMP消息代理
registry.enableSimpleBroker("/queue", "/topic");
//基于RabbitMQ 的STOMP消息代理
/* registry.enableStompBrokerRelay("/queue", "/topic")
.setRelayHost(host)
.setRelayPort(port)
.setClientLogin(userName)
.setClientPasscode(password);*/
registry.setApplicationDestinationPrefixes("/app", "/foo");
registry.setUserDestinationPrefix("/user");
}
}
View Code
2、處理來自客戶端的STOMP消息
服務(wù)端處理客戶端發(fā)來的STOMP消息,主要用的是@MessageMapping 注解。如下:
@MessageMapping("/marco")
@SendTo("/topic/marco")
public Shout stompHandle(Shout shout){
LOGGER.info("接收到消息:" + shout.getMessage());
Shout s = new Shout();
s.setMessage("Polo!");
return s;
}
2.1、@MessageMapping 指定目的地是“/app/marco”(“/app”前綴是隱含的,因?yàn)槲覀儗⑵渑渲脼閼?yīng)用的目的地前綴)。
2.2、方法接收一個(gè)Shout參數(shù),因?yàn)镾pring的某一個(gè)消息轉(zhuǎn)換器會(huì)將STOMP消息的負(fù)載轉(zhuǎn)換為Shout對(duì)象。Spring 4.0提供了幾個(gè)消息轉(zhuǎn)換器,作為其消息API的一部分:
2.3、尤其注意,這個(gè)處理器方法有一個(gè)返回值,這個(gè)返回值并不是返回給客戶端的,而是轉(zhuǎn)發(fā)給消息代理的,如果客戶端想要這個(gè)返回值的話,只能從消息代理訂閱。@SendTo 注解重寫了消息代理的目的地,如果不指定@SendTo,幀所發(fā)往的目的地會(huì)與觸發(fā)處理器方法的目的地相同,只不過會(huì)添加上“/topic”前綴。
2.4、如果客戶端就是想要服務(wù)端直接返回消息呢?聽起來不就是HTTP做的事情!即使這樣,STOMP 仍然為這種一次性的響應(yīng)提供了支持,用的是@SubscribeMapping注解,與HTTP不同的是,這種請(qǐng)求-響應(yīng)模式是異步的...
@SubscribeMapping("/getShout")
public Shout getShout(){
Shout shout = new Shout();
shout.setMessage("Hello STOMP");
return shout;
}
3、發(fā)送消息到客戶端
3.1 在處理消息之后發(fā)送消息
正如前面看到的那樣,使用@MessageMapping 或者@SubscribeMapping 注解可以處理客戶端發(fā)送過來的消息,并選擇方法是否有返回值。
如果 @MessageMapping 注解的控制器方法有返回值的話,返回值會(huì)被發(fā)送到消息代理,只不過會(huì)添加上"/topic"前綴。可以使用@SendTo 重寫消息目的地;
如果 @SubscribeMapping 注解的控制器方法有返回值的話,返回值會(huì)直接發(fā)送到客戶端,不經(jīng)過代理。如果加上@SendTo 注解的話,則要經(jīng)過消息代理。
3.2 在應(yīng)用的任意地方發(fā)送消息
spring-websocket 定義了一個(gè)SimpMessageSendingOperations 接口(或者使用SimpMessagingTemplate ),可以實(shí)現(xiàn)自由的向任意目的地發(fā)送消息,并且訂閱此目的地的所有用戶都能收到消息。
@Autowired
private SimpMessageSendingOperations simpMessageSendingOperations;
/**
* 廣播消息,不指定用戶,所有訂閱此的用戶都能收到消息
* @param shout
*/
@MessageMapping("/broadcastShout")
public void broadcast(Shout shout) {
simpMessageSendingOperations.convertAndSend("/topic/shouts", shout);
}
3.3 為指定用戶發(fā)送消息
3.2介紹了如何廣播消息,訂閱目的地的所有用戶都能收到消息。如果消息只想發(fā)送給特定的用戶呢?spring-websocket 介紹了兩種方式來實(shí)現(xiàn)這種功能,一種是 基于@SendToUser注解和Principal參數(shù),一種是SimpMessageSendingOperations接口的convertAndSendToUser方法。
基于@SendToUser注解和Principal參數(shù)
@SendToUser 表示要將消息發(fā)送給指定的用戶,會(huì)自動(dòng)在消息目的地前補(bǔ)上"/user"前綴。如下,最后消息會(huì)被發(fā)布在 /user/queue/notifications-username。但是問題來了,這個(gè)username是怎么來的呢?就是通過 principal 參數(shù)來獲得的。那么,principal 參數(shù)又是怎么來的呢?需要在spring-websocket 的配置類中重寫 configureClientInboundChannel 方法,添加上用戶的認(rèn)證。
/**
* 1、設(shè)置攔截器
* 2、首次連接的時(shí)候,獲取其Header信息,利用Header里面的信息進(jìn)行權(quán)限認(rèn)證
* 3、通過認(rèn)證的用戶,使用 accessor.setUser(user); 方法,將登陸信息綁定在該 StompHeaderAccessor 上,在Controller方法上可以獲取 StompHeaderAccessor 的相關(guān)信息
* @param registration
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new ChannelInterceptorAdapter() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
//1、判斷是否首次連接
if (StompCommand.CONNECT.equals(accessor.getCommand())){
//2、判斷用戶名和密碼
String username = accessor.getNativeHeader("username").get(0);
String password = accessor.getNativeHeader("password").get(0);
if ("admin".equals(username) && "admin".equals(password)){
Principal principal = new Principal() {
@Override
public String getName() {
return userName;
}
};
accessor.setUser(principal);
return message;
}else {
return null;
}
}
//不是首次連接,已經(jīng)登陸成功
return message;
}
});
}
spring-websocket 用戶認(rèn)證
@MessageMapping("/shout")
@SendToUser("/queue/notifications")
public Shout userStomp(Principal principal, Shout shout) {
String name = principal.getName();
String message = shout.getMessage();
LOGGER.info("認(rèn)證的名字是:{},收到的消息是:{}", name, message);
return shout;
}
convertAndSendToUser方法
除了convertAndSend()以外,SimpMessageSendingOperations 還提供了convertAndSendToUser()方法。按照名字就可以判斷出來,convertAndSendToUser()方法能夠讓我們給特定用戶發(fā)送消息。
@MessageMapping("/singleShout")
public void singleUser(Shout shout, StompHeaderAccessor stompHeaderAccessor) {
String message = shout.getMessage();
LOGGER.info("接收到消息:" + message);
Principal user = stompHeaderAccessor.getUser();
simpMessageSendingOperations.convertAndSendToUser(user.getName(), "/queue/shouts", shout);
}
如上,這里雖然我還是用了認(rèn)證的信息得到用戶名。但是,其實(shí)大可不必這樣,因?yàn)閏onvertAndSendToUser 方法可以指定要發(fā)送給哪個(gè)用戶。也就是說,完全可以把用戶名的當(dāng)作一個(gè)參數(shù)傳遞給控制器方法,從而繞過身份認(rèn)證!convertAndSendToUser 方法最終會(huì)把消息發(fā)送到 /user/sername/queue/shouts 目的地上。
4、處理消息異常
在處理消息的時(shí)候,有可能會(huì)出錯(cuò)并拋出異常。因?yàn)镾TOMP消息異步的特點(diǎn),發(fā)送者可能永遠(yuǎn)也不會(huì)知道出現(xiàn)了錯(cuò)誤。@MessageExceptionHandler標(biāo)注的方法能夠處理消息方法中所拋出的異常。我們可以把錯(cuò)誤發(fā)送給用戶特定的目的地上,然后用戶從該目的地上訂閱消息,從而用戶就能知道自己出現(xiàn)了什么錯(cuò)誤啦...
@MessageExceptionHandler(Exception.class)
@SendToUser("/queue/errors")
public Exception handleExceptions(Exception t){
t.printStackTrace();
return t;
}
三、客戶端實(shí)現(xiàn)
1、JavaScript 依賴
STOMP 依賴sockjs.js 和stomp.min.js。stomp.min.js的下載鏈接:http://www.bootcdn.cn/stomp.js/
<script type="text/javascript" src="http://cdn.bootcss.com/sockjs-client/1.1.1/sockjs.js"></script>
<script type="text/javascript" src="/js/stomp.min.js"></script>
2、JavaScript 客戶端實(shí)現(xiàn)
/*STOMP*/
var url = 'http://localhost:8080/stomp';
var sock = new SockJS(url);
var stomp = Stomp.over(sock);
var strJson = JSON.stringify({'message': 'Marco!'});
//默認(rèn)的和STOMP端點(diǎn)連接
/*stomp.connect("guest", "guest", function (franme) {
});*/
var headers={
username:'admin',
password:'admin'
};
stomp.connect(headers, function (frame) {
//發(fā)送消息
//第二個(gè)參數(shù)是一個(gè)頭信息的Map,它會(huì)包含在STOMP的幀中
//事務(wù)支持
var tx = stomp.begin();
stomp.send("/app/marco", {transaction: tx.id}, strJson);
tx.commit();
//訂閱服務(wù)端消息 subscribe(destination url, callback[, headers])
stomp.subscribe("/topic/marco", function (message) {
var content = message.body;
var obj = JSON.parse(content);
console.log("訂閱的服務(wù)端消息:" + obj.message);
}, {});
stomp.subscribe("/app/getShout", function (message) {
var content = message.body;
var obj = JSON.parse(content);
console.log("訂閱的服務(wù)端直接返回的消息:" + obj.message);
}, {});
/*以下是針對(duì)特定用戶的訂閱*/
var adminJSON = JSON.stringify({'message': 'ADMIN'});
/*第一種*/
stomp.send("/app/singleShout", {}, adminJSON);
stomp.subscribe("/user/queue/shouts",function (message) {
var content = message.body;
var obj = JSON.parse(content);
console.log("admin用戶特定的消息1:" + obj.message);
});
/*第二種*/
stomp.send("/app/shout", {}, adminJSON);
stomp.subscribe("/user/queue/notifications",function (message) {
var content = message.body;
var obj = JSON.parse(content);
console.log("admin用戶特定的消息2:" + obj.message);
});
/*訂閱異常消息*/
stomp.subscribe("/user/queue/errors", function (message) {
console.log(message.body);
});
//若使用STOMP 1.1 版本,默認(rèn)開啟了心跳檢測(cè)機(jī)制(默認(rèn)值都是10000ms)
stomp.heartbeat.outgoing = 20000;
stomp.heartbeat.incoming = 0; //客戶端不從服務(wù)端接收心跳包
});
View Code
演示源代碼鏈接:https://github.com/JMCuixy/SpringWebSocket
總結(jié)
以上是生活随笔為你收集整理的Spring消息之STOMP的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Flink SQL Client实现CD
- 下一篇: 怎么创建具有真实纹理的CG场景岩石?