日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

上下文管理、redis发布订阅、RabbitMQ发布订阅、SQLAlchemy

發布時間:2025/3/15 数据库 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 上下文管理、redis发布订阅、RabbitMQ发布订阅、SQLAlchemy 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、上下文管理

?

import contextlib @contextlib.contextmanager def work_state(state_list,worker_thread):state_list.append(worker_thread)try:yieldfinally:state_list.remove(worker_thread) free_list=[] current_thread="alex" with work_state(free_list,current_thread):print(123)print(456)#以下為執行結果: 123 456

?

代碼執行步驟

?

?

上下文用于需要 close()方法的模塊

?

import contextlib import socket@contextlib.contextmanager def context_socket(host,port):sk=socket.socket()sk.bind((host,port))sk.listen(5)try:yield skfinally:sk.close() with context_socket('127.0.0.1',8888) as sock:print(sock)#以下為執行結果: <socket.socket fd=224, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8888)>

?

?

?

二、redis 發布訂閱

#redis2.py 主程序import redis class RedisHelper:def __init__(self):self.__conn=redis.Redis(host='192.168.11.87')def public(self,msg,chan):self.__conn.publish(chan,msg)return Truedef subscribe(self,chan):pub=self.__conn.pubsub()pub.subscribe(chan)pub.parse_response()return pub

?

?

訂閱

import redis2obj= redis2.RedisHelper() data=obj.subscribe('fm111.7') print(data.parse_response())#接收到發布信息: [b'message', b'fm111.7', b'aaaaaa']

?

發布

import redis2obj= redis2.RedisHelper() obj.public('alex_db','f111.7')

?

?

三、RabbitMQ

import pika#生產者 發布 connection =pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87'))channel = connection.channel() channel.queue_declare(queue='hello_wuwenyu') #創建隊列,存在則忽略 channel.basic_publish(exchange='', routing_key='hello_wuwenyu', body='Hello World') print("[x] Sent 'Hello World!'") connection.close


?

?

import pika#消費者 訂閱 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87')) channel = connection.channel() channel.queue_declare(queue='hello_wuwenyu') # def callback(ch,method,properties,body):print(" [x] Received %r" % body) channel.basic_consume(callback,queue='hello_wuwenyu',no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()#接收到生產者發來的消息: [*] Waiting for messages. To exit press CTRL+C [x] Received b'Hello World'

?

  2?exchange 綁定多個隊列

# import pika#生產者 發布 import pika import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87')) channel = connection.channel()channel.exchange_declare(exchange='logs_fanout',type='fanout')message = '456' channel.basic_publish(exchange='logs_fanout',routing_key='',body=message) print(" [x] Sent %r" % message) connection.close()

?

import pika#訂閱 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87')) channel = connection.channel()channel.exchange_declare(exchange='logs_fanout',type='fanout')# 隨機創建隊列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue # 綁定 channel.queue_bind(exchange='logs_fanout',queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r" % body)channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()#執行多次消費端,隨機產生多個隊列,每個隊列都接收到消息: [*] Waiting for logs. To exit press CTRL+C[x] b'456'

?

?關鍵字

#生產者 severity = 'info' severity = 'errer' 執行兩次 import pika import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87')) channel = connection.channel()channel.exchange_declare(exchange='direct_logs_wuwenyu',type='direct')severity = 'info'
# severity = 'errer' message = '123' channel.basic_publish(exchange='direct_logs_wuwenyu',routing_key=severity,body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()

?

?

#訂閱 消費 客戶端1 import pika import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87')) channel = connection.channel()channel.exchange_declare(exchange='direct_logs_wuwenyu',type='direct')result = channel.queue_declare(exclusive=True) queue_name = result.method.queueseverities = ['error','info','warning']for severity in severities:channel.queue_bind(exchange='direct_logs_wuwenyu',queue=queue_name,routing_key=severity)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()#接受到的消息:[*] Waiting for logs. To exit press CTRL+C[x] 'error':b'123'[x] 'info':b'123'

?

?

#訂閱 消費 客戶端2 import pika import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87')) channel = connection.channel()channel.exchange_declare(exchange='direct_logs_wuwenyu',type='direct')result = channel.queue_declare(exclusive=True) queue_name = result.method.queueseverities = ['error',]for severity in severities:channel.queue_bind(exchange='direct_logs_wuwenyu',queue=queue_name,routing_key=severity)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming() #接受到的消息:[*] Waiting for logs. To exit press CTRL+C[x] 'info':b'123'

?

四、SQLAlchemy

SQLAlchemy是Python編程語言下的一款ORM框架,該框架建立在數據庫API之上,使用關系對象映射進行數據庫操作,簡言之便是:將對象轉換成SQL,然后使用數據API執行SQL并獲取執行結果。

Dialect用于和數據API進行交流,根據配置文件的不同調用不同的數據庫API,從而實現對數據庫的操作,如:

MySQL-Python mysql+mysqldb://:@[:]/pymysql mysql+pymysql://:@/[?]MySQL-Connector mysql+mysqlconnector://:@[:]/cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]更多詳見:http://docs.sqlalchemy.org/en/latest/dialects/index.html

?


步驟一:

使用 Engine/ConnectionPooling/Dialect 進行數據庫操作,Engine使用ConnectionPooling連接數據庫,然后再通過Dialect執行SQL語句。

#!/usr/bin/env python # -*- coding:utf-8 -*-fromsqlalchemy importcreate_engineengine =create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)engine.execute( "INSERT INTO ts_test (a, b) VALUES ('2', 'v1')" )engine.execute( "INSERT INTO ts_test (a, b) VALUES (%s, %s)", ((555, "v1"),(666, "v1"),) ) engine.execute( "INSERT INTO ts_test (a, b) VALUES (%(id)s, %(name)s)", id=999, name="v1" )result =engine.execute('select * from ts_test') result.fetchall()

?


事務操作

注:查看數據庫連接:show status like 'Threads%';

步驟二:

使用 Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 進行數據庫操作。Engine使用Schema Type創建一個特定的結構對象,之后通過SQL Expression Language將該對象轉換成SQL語句,然后通過?ConnectionPooling 連接數據庫,再然后通過?Dialect 執行SQL,并獲取結果。

#!/usr/bin/env python # -*- coding:utf-8 -*-fromsqlalchemy importcreate_engine, Table, Column, Integer, String, MetaData, ForeignKeymetadata =MetaData()user =Table('user', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)), )color =Table('color', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)), ) engine =create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)metadata.create_all(engine) # metadata.clear() # metadata.remove()

?


增刪改查

更多內容詳見:

? ??http://www.jianshu.com/p/e6bba189fcbd

? ? http://docs.sqlalchemy.org/en/latest/core/expression_api.html

注:SQLAlchemy無法修改表結構,如果需要可以使用SQLAlchemy開發者開源的另外一個軟件Alembic來完成。

步驟三:

使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有組件對數據進行操作。根據類創建對象,對象轉換成SQL,執行SQL。

#!/usr/bin/env python # -*- coding:utf-8 -*-fromsqlalchemy.ext.declarative importdeclarative_base fromsqlalchemy importColumn, Integer, String fromsqlalchemy.orm importsessionmaker fromsqlalchemy importcreate_engineengine =create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)Base =declarative_base()classUser(Base): __tablename__ ='users' id=Column(Integer, primary_key=True) name =Column(String(50))# 尋找Base的所有子類,按照子類的結構在數據庫中生成對應的數據表信息 # Base.metadata.create_all(engine)Session =sessionmaker(bind=engine) session =Session()# ########## 增 ########## # u = User(id=2, name='sb') # session.add(u) # session.add_all([ # User(id=3, name='sb'), # User(id=4, name='sb') # ]) # session.commit()# ########## 刪除 ########## # session.query(User).filter(User.id > 2).delete() # session.commit()# ########## 修改 ########## # session.query(User).filter(User.id > 2).update({'cluster_id' : 0}) # session.commit() # ########## 查 ########## # ret = session.query(User).filter_by(name='sb').first()# ret = session.query(User).filter_by(name='sb').all() # print ret# ret = session.query(User).filter(User.name.in_(['sb','bb'])).all() # print ret# ret = session.query(User.name.label('name_label')).all() # print ret,type(ret)# ret = session.query(User).order_by(User.id).all() # print ret# ret = session.query(User).order_by(User.id)[1:3] # print ret # session.commit()

?

?

  

  

  

轉載于:https://www.cnblogs.com/wudalang/p/5700242.html

總結

以上是生活随笔為你收集整理的上下文管理、redis发布订阅、RabbitMQ发布订阅、SQLAlchemy的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。