日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python服务端编程_Python WebSocket服务端编程代码完成gtalk机器人

發布時間:2023/12/29 python 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python服务端编程_Python WebSocket服务端编程代码完成gtalk机器人 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本文python源碼為實現,Python WebSocket服務端編程代碼完成gtalk機器人的全部代碼段。需要用到python sys、sleekxmpp、reactor等python模塊及方法,在代碼中導入部分可以查看到詳細方法。

python websocket庫,運作機制及客戶端和服務端的 API 實現。Python WebSocket的具體操作方法和工作原理也可以查看相關文章來學習。

import sys

from twisted.internet import reactor

from twisted.python import log

from autobahn.websocket import WebSocketServerFactory, WebSocketServerProtocol

import sleekxmpp

reload(sys)

sys.setdefaultencoding('utf8')

class gtalkBot(sleekxmpp.ClientXMPP):

"""

A basic SleekXMPP bot that will log in, send a message,

and then log out.

"""

def __init__(self, jid, password):

sleekxmpp.ClientXMPP.__init__(self, jid, password)

self.recipient = 'wxg4net@gmail.com'

self.status_text = ''

self.add_event_handler("session_start", self.start)

self.add_event_handler("message", self.muc_message)

self.add_event_handler('presence_available',

self.handle_presence_available)

self.add_event_handler('presence_unavailable',

self.handle_presence_unavailable)

self.add_event_handler('presence_dnd',

self.handle_presence_dnd)

def start(self, event):

self.send_presence()

self.get_roster()

def handle_presence_unavailable(self, presence):

if cmp(presence['from'].bare, self.recipient) == 0:

self.status_text = presence['status']

def handle_presence_available(self, presence):

if cmp(presence['from'].bare, self.recipient) == 0:

self.status_text = presence['status']

def handle_presence_dnd(self, presence):

if cmp(presence['from'].bare, self.recipient) == 0:

self.status_text = presence['status']

def muc_message(self, msg):

fwho = msg['from'].bare

body = msg['body']

if cmp(body,'-qsbk') == 0:

self.send_message(mto=fwho,

mbody='當前服務已關閉',

mtype='chat')

elif cmp(body,'-help') == 0:

self.send_message(mto=fwho,

mbody='當前服務開發中...',

mtype='chat')

elif cmp(fwho, self.recipient) == 0:

factory.broadcast(str(body))

else:

self.send_message(mto=fwho,

mbody='當前服務開發中...',

mtype='chat')

def sendMessageTome(self, message):

self.send_message(mto="wxg4net@gmail.com",

mbody=message,

mtype='chat')

#python websocket

class BroadcastServerProtocol(WebSocketServerProtocol):

def __init__(self):

self.istip = True

self.Heartbeat()

def Heartbeat(self):

reactor.callLater(1, self.Heartbeat)

def onOpen(self):

self.factory.register(self)

def onMessage(self, msg, binary):

xmpp.sendMessageTome(msg)

if self.istip:

self.sendMessage('你好,websocket python的當前狀態是:'+str(xmpp.status_text)+'。---祝你生活愉快!')

self.istip = False

def connectionLost(self, reason):

WebSocketServerProtocol.connectionLost(self, reason)

self.factory.unregister(self)

class BroadcastServerFactory(WebSocketServerFactory):

protocol = BroadcastServerProtocol

def __init__(self):

WebSocketServerFactory.__init__(self)

self.clients = []

def register(self, client):

if not client in self.clients:

self.clients.append(client)

def unregister(self, client):

if client in self.clients:

self.clients.remove(client)

def broadcast(self, message):

if self.clients:

for c in self.clients:

c.sendMessage(message)

#www.iplaypy.com

if __name__ == '__main__':

1a58

log.startLogging(sys.stdout)

xmpp = gtalkBot("phzggzs@gmail.com", "*******")

xmpp.register_plugin('xep_0030') # Service Discovery

xmpp.register_plugin('xep_0199') # XMPP Ping

if xmpp.connect():

xmpp.process(threaded=True)

factory = BroadcastServerFactory()

reactor.listenTCP(9090, factory)

reactor.run()

玩蛇網文章,轉載請注明出處和文章網址:https://www.iplaypy.com/code/c2700.html

相關文章 Recommend

總結

以上是生活随笔為你收集整理的python服务端编程_Python WebSocket服务端编程代码完成gtalk机器人的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。