日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

python 并发访问数据库_【数据库】如何实现python3实现并发访问水平切分表

發(fā)布時(shí)間:2024/9/15 python 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python 并发访问数据库_【数据库】如何实现python3实现并发访问水平切分表 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

本篇文章給大家?guī)淼膬?nèi)容是關(guān)于如何實(shí)現(xiàn)python3實(shí)現(xiàn)并發(fā)訪問水平切分表,有一定的參考價(jià)值,有需要的朋友可以參考一下,希望對你有所幫助。

場景說明

假設(shè)有一個(gè)mysql表被水平切分,分散到多個(gè)host中,每個(gè)host擁有n個(gè)切分表。

如果需要并發(fā)去訪問這些表,快速得到查詢結(jié)果, 應(yīng)該怎么做呢?

這里提供一種方案,利用python3的asyncio異步io庫及aiomysql異步庫去實(shí)現(xiàn)這個(gè)需求。

代碼演示

import logging

import random

import asynciofrom aiomysql

import create_pool

# 假設(shè)mysql表分散在8個(gè)host, 每個(gè)host有16張子表

TBLES = { "192.168.1.01": "table_000-015",

# 000-015表示該ip下的表明從table_000一直連續(xù)到table_015

"192.168.1.02": "table_016-031",

"192.168.1.03": "table_032-047",

"192.168.1.04": "table_048-063",

"192.168.1.05": "table_064-079",

"192.168.1.06": "table_080-095",

"192.168.1.07": "table_096-0111",

"192.168.1.08": "table_112-0127",

}

USER = "xxx"PASSWD = "xxxx"# wrapper函數(shù),用于捕捉異常def query_wrapper(func):

async def wrapper(*args, **kwargs):

try:

await func(*args, **kwargs) except Exception as e:

print(e) return wrapper

# 實(shí)際的sql訪問處理函數(shù),通過aiomysql實(shí)現(xiàn)異步非阻塞請求@

query_wrapperasync def query_do_something(ip, db, table):

async with create_pool(host=ip, db=db, user=USER, password=PASSWD) as pool:

async with pool.get() as conn:

async with conn.cursor() as cur:

sql = ("select xxx from {} where xxxx")

await cur.execute(sql.format(table))

res = await cur.fetchall()

# then do something...# 生成sql訪問隊(duì)列, 隊(duì)列的每個(gè)元素包含要對某個(gè)表進(jìn)行訪問的函數(shù)及參數(shù)def gen_tasks():

tasks = [] for ip, tbls in TBLES.items():

cols = re.split('_|-', tbls)

tblpre = "_".join(cols[:-2])

min_num = int(cols[-2])

max_num = int(cols[-1])

for num in range(min_num, max_num+1):

tasks.append(

(query_do_something, ip, 'your_dbname', '{}_{}'.format(tblpre, num))

)

random.shuffle(tasks)

return tasks# 按批量運(yùn)行sql訪問請求隊(duì)列def run_tasks(tasks, batch_len):

try:

for idx in range(0, len(tasks), batch_len):

batch_tasks = tasks[idx:idx+batch_len]

logging.info("current batch, start_idx:%s len:%s" % (idx, len(batch_tasks)))

for i in range(0, len(batch_tasks)):

l = batch_tasks[i]

batch_tasks[i] = asyncio.ensure_future(

l[0](*l[1:])

)

loop.run_until_complete(asyncio.gather(*batch_tasks))

except Exception as e:

logging.warn(e)# main方法, 通過asyncio實(shí)現(xiàn)函數(shù)異步調(diào)用def main():

loop = asyncio.get_event_loop()

tasks = gen_tasks()

batch_len = len(TBLES.keys()) * 5 # all up to you

run_tasks(tasks, batch_len)

loop.close()

總結(jié)

以上是生活随笔為你收集整理的python 并发访问数据库_【数据库】如何实现python3实现并发访问水平切分表的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。