10
from bson import ObjectId
from flask import Blueprint, request, jsonify
from setting import MDB, RET
devices_bp = Blueprint("devices_bp", name)
@devices_bp.route("/scan_qr", methods=["POST"])
def scan_qr():
# 掃碼綁定
# 1.掃描成功,沒有綁定 開啟綁定
# 2.掃碼失敗,未授權
# 3.掃碼成功,已經綁定 ?添加好友
device_key = request.form.to_dict()
print(device_key)
device = MDB.Devices.find(device_key)
print(device)
if device:
# 1.掃描成功,沒有綁定 開啟綁定
RET["CODE"] = 0
RET["MSG"] = "識別玩具成功"
RET["DATA"] = device_key
@devices_bp.route("/bind_toy", methods=["POST"])
def bind_toy():
toy_info = request.form.to_dict()
user_id = toy_info.pop("user_id")
@devices_bp.route("/toy_list", methods=["POST"])
def toy_list():
bind_user = request.form.get("_id")
toyl = list(MDB.Toys.find({"bind_user": bind_user}))
@devices_bp.route("/open_toy", methods=["POST"])
def open_toy():
device_key = request.form.to_dict()
# 1.用 device_key 查詢? Devices or Toys
# 先查詢已綁定Toy 可以有效減少數據庫查詢次數(減少IO操作)
toy = MDB.Toys.find_one(device_key)
# 2.設備處于綁定狀態,正常啟動
# 設備未綁定
# 設備未授權 0.5%
if toy: # 開機成功
ret = {
"code": 0,
"music": "Success.mp3",
"toy_id": str(toy.get("_id")),
"name": toy.get("toy_name")
}
else:
if MDB.Devices.find_one(device_key): # 設備未綁定
ret = {
"code": 1,
"music": "Nobind.mp3"
}
else: # 設備未授權
ret = {
"code": 2,
"music": "Nolic.mp3"
}
import json
from flask import Flask, request, render_template
from geventwebsocket.handler import WebSocketHandler
from geventwebsocket.server import WSGIServer
from geventwebsocket.websocket import WebSocket
ws_app = Flask(name)
user_socket_dict = {}
"789":"as",
"456":"ts"
@ws_app.route("/app/")
def app(user_id): # app 連接位置
print(user_id)
app_socket = request.environ.get("wsgi.websocket") # type:WebSocket
if app_socket:
user_socket_dict[user_id] = app_socket
while True:
app_data = app_socket.receive()
"""
JSON:
{
to_user: 456, // 接收音樂的方ID
music: music_name // 發送的音樂名稱
}
"""
app_data_dict = json.loads(app_data)
to_user = app_data_dict.get("to_user")
print(to_user)
usocket = user_socket_dict.get(to_user)
print(app_socket,usocket)
usocket.send(app_data)
@ws_app.route("/toy/")
def toy(toy_id): # Toy 連接的位置
print(toy_id)
toy_socket = request.environ.get("wsgi.websocket") # type:WebSocket
if toy_socket:
user_socket_dict[toy_id] = toy_socket
while True:
toy_data = toy_socket.receive()
toy_data_dict = json.loads(toy_data)
to_user = toy_data_dict.get("to_user")
usocket = user_socket_dict.get(to_user)
usocket.send(toy_data)
@ws_app.route("/get_toy")
def get_toy():
return render_template("WebToy.html")
if name == 'main':
http_serv = WSGIServer(("0.0.0.0", 9528), ws_app, handler_class=WebSocketHandler)
http_serv.serve_forever()
轉載于:https://www.cnblogs.com/Doner/p/11231141.html
總結
- 上一篇: jmeter使用问题——将接口返回变量存
- 下一篇: Flask项目常见面试问题