emitter-Client
生活随笔
收集整理的這篇文章主要介紹了
emitter-Client
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
生成子key: 通配符號 #
訂閱:通配符 +
package main
import (
"encoding/json"
"fmt"
"os"
"time"
emitter "github.com/emitter-io/go"
)
type MyBody struct {
Id uint64
Body string
}
var localID uint64
func main() {
localID = 0
var mykey string
rMsg := &MyBody{}
o := emitter.NewClientOptions()
o.SetOnMessageHandler(func(_ emitter.Emitter, msg emitter.Message) {
fmt.Printf("Received message: %s
", msg.Payload())
err := json.Unmarshal(msg.Payload(), rMsg)
if err != nil {
fmt.Println("parse json data err!")
}
fmt.Printf("Received message: %d
", rMsg.Id)
if (localID + 1) != rMsg.Id {
fmt.Println("seq err!")
os.Exit(1)
}
localID = rMsg.Id
//fmt.Printf("Received message: %d
", rMsg.Id)
})
o.SetOnKeyGenHandler(func(_ emitter.Emitter, key emitter.KeyGenResponse) {
fmt.Printf("get key: %s
", key)
mykey = key.Key
})
o.ClientID = "tttt_123456"
o.AddBroker("tcp://192.168.162.34:49250")
o.AddBroker("tcp://192.168.163.200:8081")
o.AddBroker("tcp://119.3.108.139:8081")
c := emitter.NewClient(o)
sToken := c.Connect()
if sToken.Wait() && sToken.Error() != nil {
panic("Error on Client.Connect(): " + sToken.Error().Error())
}
getKey := emitter.NewKeyGenRequest()
getKey.Channel = "shuguo/#/"
getKey.Key = "m1dBYWPaHcabBXb-p4YJkqKWYO_-TSrt" // 200server
getKey.TTL = 0
getKey.Type = "rwlsp"
sToken = c.GenerateKey(getKey)
if sToken.Wait() && sToken.Error() != nil {
panic("Error on client generateKey(): " + sToken.Error().Error())
}
time.Sleep(2 * time.Second)
//c.Subscribe(mykey, "shuguo/unicast/user@111/")
c.Subscribe(mykey, "shuguo/aaa/unicast/+/")
time.Sleep(1 * time.Second)
c.Publish(mykey, "shuguo/unicast/user@111", "hello")
//c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff/aaf/dip/ggf", "hello")
//c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff/aaf/dip", "world")
//c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff/aaf", "!")
//c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff@8795", "2020")
select {}
}
用 emitter 的client庫封裝了 paho.mqtt.golang ,直接用paho.mqtt.golang 庫 也可以對接 emitter 服務端(攜帶subKey)
package main
import (
"fmt"
"log"
"os"
"time"
"github.com/eclipse/paho.mqtt.golang"
)
var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("TOPIC: %s
", msg.Topic())
fmt.Printf("MSG: %s
", msg.Payload())
}
func main() {
mqtt.DEBUG = log.New(os.Stdout, "", 0)
mqtt.ERROR = log.New(os.Stdout, "", 0)
opts := mqtt.NewClientOptions().AddBroker("tcp://192.168.162.34:49250").SetClientID("vm-11111")
opts.SetKeepAlive(2 * time.Second)
opts.SetDefaultPublishHandler(f)
opts.SetPingTimeout(1 * time.Second)
c := mqtt.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
if token := c.Subscribe("1EOr2RdDssJyOyu_GuRvI0j68c0fH9Hz/shuguo/unicast/user@111/", 0, nil); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
for i := 0; i < 5; i++ {
text := fmt.Sprintf("this is msg #%d!", i)
token := c.Publish("1EOr2RdDssJyOyu_GuRvI0j68c0fH9Hz/shuguo/unicast/user@111/", 0, false, text)
token.Wait()
}
time.Sleep(3 * time.Second)
if token := c.Unsubscribe("1EOr2RdDssJyOyu_GuRvI0j68c0fH9Hz/shuguo/unicast/user@111/"); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
c.Disconnect(250)
time.Sleep(1 * time.Second)
}
總結
以上是生活随笔為你收集整理的emitter-Client的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: oracle 父子排序,父子项排序 求大
- 下一篇: JDBC连接数据库的五个步骤、Mybat