用requests登录微信网页版,并接收发送消息
首先,網頁版微信登錄大致分為以下幾個流程(都是大家可以通過抓包得到):
1、登陸主頁后,會生成一個UUID,這是個用戶標識,在后面請求二維碼會用到
def get_uuid(self):
'''獲取uuid'''
url = 'https://login.wx.qq.com/jslogin?appid=wx782c26e4c19acffb&redirect_uri=https%3A%2F%2Fwx.qq.com%2Fcgi-bin%2Fmmwebwx-bin%2Fwebwxnewloginpage&fun=new&lang=zh_CN&_={}'.format(get_time())
response = self.session.get(url).text
self.uuid = re.findall(r'uuid = "(.*?)"',response)[0] # 文本較少,用正則匹配即可
return self.uuid
2、請求二維碼圖片
def qrcode(self):
url = 'https://login.weixin.qq.com/qrcode/{}'.format(self.uuid)
response = self.session.get(url).content # 請求得到二維碼,由于是圖片,得到字節碼即可
with open ('qrcode.jpg','wb') as f:
f.write(response) # 把二維碼保存到一張圖片里
im = Image.open('qrcode.jpg')
im.show() # 把二維碼圖片展示出來
3、掃描二維碼,得到重定向的鏈接
def get_redirect_uri(self):
while True: # 由于需要不斷請求,讓我們有時間來掃描二維碼
url = 'https://login.wx.qq.com/cgi-bin/mmwebwx-bin/login?loginicon=true&uuid={}&tip=0&r=-2109595288&_={}'.format(self.uuid,get_time())
result = self.session.get(url,allow_redirects=False)
code = re.findall(r'window.code=(.*?);',result.text)[0]
if code == '200': # 沒有掃描的時候是400,掃描之后就是200
print('已成功掃描二維碼!')
break
self.redirect_uri = re.findall(r'window.redirect_uri="(.*?)"',result.text)[0] # 得到一個鏈接,請求之后會得到一些有用的參數
這個url里面有個參數在前面請求應答中沒有的,在js中可以找到,大概是個13位的時間戳
4、請求上面得到的鏈接,得到一些必要的參數(這些參數在后面的登錄、收發消息都是必需的),注意這里一定要不允許重定向,因為請求這個url的時候,會跳轉到一個初始化的鏈接,這樣我們將不能正確獲得這些參數。
def get_require_data(self):
result = self.session.get(self.redirect_uri,allow_redirects=False).text # 這里注意一定要去掉重定向
self.skey = re.findall(r'<skey>(.*?)</skey>',result)[0] # 這里用Beautifulsoup也能解析,文本是xml格式的,用匹配標簽就可以得到
self.wxsid = re.findall(r'<wxsid>(.*?)</wxsid>',result)[0]
self.wxuin = re.findall(r'<wxuin>(.*?)</wxuin>',result)[0]
self.pass_ticket = re.findall(r'<pass_ticket>(.*?)</pass_ticket>',result)[0]
5、發送登錄請求
def login(self):
url = 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxinit?r=-2109580211&pass_ticket={}'.format(self.pass_ticket)
params = {
"BaseRequest":{
"Uin":self.wxuin,
"Sid":self.wxsid,
"Skey":self.skey,
"DeviceID":get_DeviceID(), # DeviceID這個參數在前面的應答中沒有找到,它是js生成的,通過讀js代碼,我們可以構造出來
}
}
result = self.session.post(url,data=json.dumps(params,ensure_ascii=False))
result.encoding = 'utf-8'
data = result.json()
user = data['User']
nickname = user['NickName']
username = user['UserName']
self.user_list[nickname] = username # 把自己賬號的昵稱和用戶編號保存起來,方便后面收發消息
self.synckey_list = data['SyncKey'] # synckey 用于后面同步消息
print('已成功登錄!!')
self.synckey = format_synckey(self.synckey_list['List']) # 把 synckey構造成查詢字符串的模式
至此,就完成了基本的登陸認證過程,其實大部分的登錄都差不多,可能有得是通過驗證碼和賬號密碼加密的方式,這些到后面再說
6、得到好友列表并保存起來
def get_userlist(self):
'''把用戶的好友列表保存起來,方便后面收發消息'''
url = 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxgetcontact?pass_ticket={}&r={}&seq=0&skey={}'.format(self.pass_ticket,get_time(),self.skey)
response = self.session.get(url)
response.encoding = 'utf-8'
result = response.json()
memberlist = result['MemberList']
for member in memberlist:
nickname = member['NickName']
username = member['UserName']
self.user_list[nickname] = username # 按照昵稱-用戶編號的一一對應關系保存
下面就到了比較繁瑣的環節了,收發消息:
我們在登錄的時候就得到一個synckey了,這個東西在每次收消息的請求中都要帶上,而且每次收發消息,這個值都會向服務器請求更新一次。那么我們怎么確定正在收發消息呢,微信網頁版用的是輪詢的方式發送一個請求,這個請求的響應大概是{retcode:”0″,selector:”0″}這樣的,如果selector變得不為0了,說明有消息要收發。所以我們也可以通過不斷地發送請求,然后判斷響應里的selector的值來處理。
1、更新synckey
def get_new_synckey(self): # 得到最新的synckey 發送或者接收消息后都有這個操作
url = 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxsync?sid={}&skey={}&pass_ticket={}'.format(self.wxsid,self.skey,self.pass_ticket)
data = {"BaseRequest":{"Uin":self.wxuin,"Sid":self.wxsid,"Skey":self.skey,"DeviceID":get_DeviceID()},"SyncKey":self.synckey_list,"rr":2112925520}
response = self.session.post(url,data=json.dumps(data,ensure_ascii=False))
response.encoding = 'utf8'
result = response.json()
self.synckey_list = result['SyncKey']
self.synckey = format_synckey(self.synckey_list['List'])
2、檢查selector值是否不為0
def sync_check(self): # 檢查是否有新消息
while True:
# 注意這里的synckey是在url里面,所以要進行urlencode,而且synckey改變之后,url也會變,所以url要放在while True里面
url = 'https://webpush.wx.qq.com/cgi-bin/mmwebwx-bin/synccheck?r={}&skey={}&sid={}&uin={}&deviceid={}&{}'.format(get_time(),self.skey,self.wxsid,self.wxuin,get_DeviceID(),self.synckey)
response = self.session.get(url)
response.encoding = 'utf8'
result = response.text
# print('正在輪詢是否有新消息...')
selector = re.findall(r'selector:"(.*?)"',result)[0]
if selector != '0':
self.get_msg() # 我這里發消息是在另一個線程,收消息在這個線程
self.get_new_synckey() # 每次都需要更新
3、接受消息
def get_msg(self):
'''接收消息'''
url = ' https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxsync?sid={}&skey={}&pass_ticket={}'.format(self.wxsid,self.skey,self.pass_ticket)
data = {
"BaseRequest":{
"Uin":self.wxuin,
"Sid":self.wxsid,
"Skey":self.skey,
"DeviceID":get_DeviceID()},
"SyncKey":self.synckey_list,
"rr":-2123282759}
response = self.session.post(url,data=json.dumps(data))
response.encoding = 'utf-8'
result = response.json()
self.synckey_list = result['SyncKey'] # 接受消息的時候,響應里面有這個值,在更新synckey的時候,參數和響應里都有synckey
self.synckey = format_synckey(self.synckey_list['List'])
msglist = result['AddMsgList']
for msg in msglist:
if msg['ToUserName'] == self.user_list['XXXX']: # 這里面填你自己微信的昵稱,不過加上之后不會受到群消息,大家可以試試不加這個判斷
fromName = msg['FromUserName']
for k,v in self.user_list.items():
if v == fromName:
fromNickName = k
content = msg['Content']
print('來自{}的消息:{}'.format(fromNickName,content))
4、發送消息
def send_msg(self):
url = 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxsendmsg?pass_ticket={}'.format(self.pass_ticket)
while True:
msg = input('>>>>>')
if msg == 'q':
break
data = {
"BaseRequest":{
"Uin":self.wxuin,
"Sid":self.wxsid,
"Skey":self.skey,
"DeviceID":get_DeviceID()},
"Msg":{
"Type":1,
"Content":msg,
"FromUserName":self.user_list['xxx'], # 這里填你自己的昵稱
"ToUserName":self.user_list['xxx'], # 這里填你想發送消息的好友的昵稱,你也可以用input鍵盤輸入的方式
"LocalID":get_time(),
"ClientMsgId":get_time()},
"Scene":0
}
self.session.post(url,data=(json.dumps(data,ensure_ascii=False)).encode('utf-8'))
self.get_new_synckey() # 發送消息之后也要更新synckey
5、把發送消息和接受消息寫成多線程的方式
大致流程就是這樣啦!下面展示全部代碼:
1 import time
2 import re
3 import random
4 import requests
5 import urllib3
6 import json
7 from urllib import parse
8 from PIL import Image
9 from threading import Thread
10
11 urllib3.disable_warnings()
12
13
14 def get_time():
15 return str(int(time.time()*1000))
16
17 def get_DeviceID():
18 return 'e'+str(round(random.random(),15))[2:17]
19
20 def format_synckey(synckey_list):
21 '''
22 把列表形式轉成查詢字符串
23 '''
24 tem_synckey = ''
25 for synckey in synckey_list:
26 tem_synckey += str(synckey['Key']) + '_' +str(synckey['Val']) + '|'
27 new_synckey = {'synckey':tem_synckey.rstrip('|')}
28 return parse.urlencode(new_synckey)
29
30 class WeChat():
31 def __init__(self):
32 headers = {
33 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.186 Safari/537.36'
34 } # 請求頭信息
35 proxies = {
36 'http': '192.168.105.71:80',
37 'https': '192.168.105.71:80'
38 } # 使用代理
39 self.user_list = {}
40 self.session = requests.session()
41 self.session.headers = headers
42 self.session.proxies = proxies
43 self.session.verify = False
44
45 def get_uuid(self):
46 '''獲取uuid'''
47 url = 'https://login.wx.qq.com/jslogin?appid=wx782c26e4c19acffb&redirect_uri=https%3A%2F%2Fwx.qq.com%2Fcgi-bin%2Fmmwebwx-bin%2Fwebwxnewloginpage&fun=new&lang=zh_CN&_={}'.format(get_time())
48 response = self.session.get(url).text
49 self.uuid = re.findall(r'uuid = "(.*?)"',response)[0] # 文本較少,用正則匹配即可
50 return self.uuid
51
52 def qrcode(self):
53 url = 'https://login.weixin.qq.com/qrcode/{}'.format(self.uuid)
54 response = self.session.get(url).content # 請求得到二維碼,由于是圖片,得到字節碼即可
55 with open ('qrcode.jpg','wb') as f:
56 f.write(response) # 把二維碼保存到一張圖片里
57 im = Image.open('qrcode.jpg')
58 im.show() # 把二維碼圖片展示出來
59
60 def get_redirect_uri(self):
61 while True: # 由于需要不斷請求,讓我們有時間來掃描二維碼
62 url = 'https://login.wx.qq.com/cgi-bin/mmwebwx-bin/login?loginicon=true&uuid={}&tip=0&r=-2109595288&_={}'.format(self.uuid,get_time())
63 result = self.session.get(url,allow_redirects=False)
64 code = re.findall(r'window.code=(.*?);',result.text)[0]
65 if code == '200': # 沒有掃描的時候是400,掃描之后就是200
66 print('已成功掃描二維碼!')
67 break
68 self.redirect_uri = re.findall(r'window.redirect_uri="(.*?)"',result.text)[0] # 得到一個鏈接,請求之后會得到一些有用的參數
69
70 def get_require_data(self):
71 result = self.session.get(self.redirect_uri,allow_redirects=False).text # 這里注意一定要去掉重定向
72 self.skey = re.findall(r'<skey>(.*?)</skey>',result)[0] # 這里用Beautifulsoup也能解析,文本是xml格式的,用匹配標簽就可以得到
73 self.wxsid = re.findall(r'<wxsid>(.*?)</wxsid>',result)[0]
74 self.wxuin = re.findall(r'<wxuin>(.*?)</wxuin>',result)[0]
75 self.pass_ticket = re.findall(r'<pass_ticket>(.*?)</pass_ticket>',result)[0]
76
77 def login(self):
78 url = 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxinit?r=-2109580211&pass_ticket={}'.format(self.pass_ticket)
79 params = {
80 "BaseRequest":{
81 "Uin":self.wxuin,
82 "Sid":self.wxsid,
83 "Skey":self.skey,
84 "DeviceID":get_DeviceID(), # DeviceID這個參數在前面的應答中沒有找到,它是js生成的
85 }
86 }
87 result = self.session.post(url,data=json.dumps(params,ensure_ascii=False))
88 result.encoding = 'utf-8'
89 data = result.json()
90 user = data['User']
91 nickname = user['NickName']
92 username = user['UserName']
93 self.user_list[nickname] = username # 把自己賬號的昵稱和用戶編號保存起來,方便后面收發消息
94 self.synckey_list = data['SyncKey'] # synckey 用于后面同步消息
95 print('已成功登錄!!')
96 self.synckey = format_synckey(self.synckey_list['List']) # 把 synckey構造成查詢字符串的模式
97
98
99
100 def get_userlist(self):
101 '''把用戶的好友列表保存起來,方便后面收發消息'''
102 url = 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxgetcontact?pass_ticket={}&r={}&seq=0&skey={}'.format(self.pass_ticket,get_time(),self.skey)
103 response = self.session.get(url)
104 response.encoding = 'utf-8'
105 result = response.json()
106 memberlist = result['MemberList']
107 for member in memberlist:
108 nickname = member['NickName']
109 username = member['UserName']
110 self.user_list[nickname] = username # 按照昵稱-用戶編號的一一對應關系保存
111
112 def get_new_synckey(self): # 得到最新的synckey 發送或者接收消息后都有這個操作
113 url = 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxsync?sid={}&skey={}&pass_ticket={}'.format(self.wxsid,self.skey,self.pass_ticket)
114 data = {"BaseRequest":{"Uin":self.wxuin,"Sid":self.wxsid,"Skey":self.skey,"DeviceID":get_DeviceID()},"SyncKey":self.synckey_list,"rr":2112925520}
115 response = self.session.post(url,data=json.dumps(data,ensure_ascii=False))
116 response.encoding = 'utf8'
117 result = response.json()
118 self.synckey_list = result['SyncKey']
119 self.synckey = format_synckey(self.synckey_list['List'])
120
121 def sync_check(self): # 檢查是否有新消息
122 while True:
123 # 注意這里的synckey是在url里面,所以要進行urlencode,而且synckey改變之后,url也會變,所以url要放在while True里面
124 url = 'https://webpush.wx.qq.com/cgi-bin/mmwebwx-bin/synccheck?r={}&skey={}&sid={}&uin={}&deviceid={}&{}'.format(get_time(),self.skey,self.wxsid,self.wxuin,get_DeviceID(),self.synckey)
125 response = self.session.get(url)
126 response.encoding = 'utf8'
127 result = response.text
128 # print('正在輪詢是否有新消息...')
129 selector = re.findall(r'selector:"(.*?)"',result)[0]
130 if selector != '0':
131 self.get_msg() # 我這里發消息是在另一個線程,收消息在這個線程
132 self.get_new_synckey() # 每次都需要更新
133
134
135
136 def send_msg(self):
137 url = 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxsendmsg?pass_ticket={}'.format(self.pass_ticket)
138 while True:
139 msg = input('>>>>>')
140 if msg == 'q':
141 break
142 data = {
143 "BaseRequest":{
144 "Uin":self.wxuin,
145 "Sid":self.wxsid,
146 "Skey":self.skey,
147 "DeviceID":get_DeviceID()},
148 "Msg":{
149 "Type":1,
150 "Content":msg,
151 "FromUserName":self.user_list['xxx'], # 這里填你自己微信昵稱
152 "ToUserName":self.user_list['xxx'], # 這里填你想發送消息的好友的昵稱,你也可以用input鍵盤輸入的方式
153 "LocalID":get_time(),
154 "ClientMsgId":get_time()},
155 "Scene":0
156 }
157
158 self.session.post(url,data=(json.dumps(data,ensure_ascii=False)).encode('utf-8'))
159 self.get_new_synckey() # 發送消息之后也會更新synckey
160
161
162
163 def get_msg(self):
164 '''接收消息'''
165 url = ' https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxsync?sid={}&skey={}&pass_ticket={}'.format(self.wxsid,self.skey,self.pass_ticket)
166 data = {
167 "BaseRequest":{
168 "Uin":self.wxuin,
169 "Sid":self.wxsid,
170 "Skey":self.skey,
171 "DeviceID":get_DeviceID()},
172 "SyncKey":self.synckey_list,
173 "rr":-2123282759}
174 response = self.session.post(url,data=json.dumps(data))
175 response.encoding = 'utf-8'
176 result = response.json()
177 self.synckey_list = result['SyncKey'] # 接受消息的時候,響應里面有這個值,在更新synckey的時候,參數和響應里都有synckey
178 self.synckey = format_synckey(self.synckey_list['List'])
179 msglist = result['AddMsgList']
180 for msg in msglist:
181 if msg['ToUserName'] == self.user_list['XXXX']: # 這里面填你自己微信的昵稱,不過加上之后不會受到群消息,大家可以試試不加這個判斷
182 fromName = msg['FromUserName']
183 for k,v in self.user_list.items():
184 if v == fromName:
185 fromNickName = k
186 content = msg['Content']
187 print('來自{}的消息:{}'.format(fromNickName,content))
188
189 def main(self):
190 self.get_uuid()
191 self.qrcode()
192 self.get_redirect_uri()
193 self.get_require_data()
194 self.login()
195 self.get_userlist()
196 send_msg = Thread(target=self.send_msg)
197 recieve_msg = Thread(target=self.sync_check)
198 send_msg.start()
199 recieve_msg.start()
200 send_msg.join()
201 recieve_msg.join()
202
203
204 if __name__ == '__main__':
205 wechat = WeChat()
206 wechat.main()
View Code
在這理,我只實現了文本消息,至于表情以及圖片,還沒有實現,大家可以完善一下。
總結
以上是生活随笔為你收集整理的用requests登录微信网页版,并接收发送消息的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: poj3069 Saruman's Ar
- 下一篇: xpath定位中starts-with、