日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Python 操作 pymysql 批量 增、删、改、查

發布時間:2024/7/23 数据库 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python 操作 pymysql 批量 增、删、改、查 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

github:https://github.com/PyMySQL/PyMySQL

Python3 MySQL 數據庫連接 - PyMySQL 驅動:Python3 MySQL 數據庫連接 – PyMySQL 驅動 | 菜鳥教程

pymysql 是線程安全的( 搜索 thread,可以看到 thread_safe=1,同時函數 thread_safe() 返回 True?):https://github.com/PyMySQL/PyMySQL/blob/main/pymysql/__init__.py

Mysql? 如果數據存在則更新,不存在則插入

:Mysql:如果數據存在則更新,不存在則插入_飛蛾逐月-CSDN博客_mysql 存在更新不存在寫入

1、PyMySQL 安裝

在使用 PyMySQL 之前,我們需要確保 PyMySQL 已安裝。

PyMySQL 下載地址:GitHub - PyMySQL/PyMySQL: Pure Python MySQL Client

安裝?PyMySQL 的?Python 包:pip3 install PyMySQL

2、數據庫連接

連接數據庫前,請先確認以下事項:

  • 已經創建了數據庫 TESTDB.
  • TESTDB 數據庫中您已經創建了表 EMPLOYEE
  • EMPLOYEE 表字段為 FIRST_NAME, LAST_NAME, AGE, SEXINCOME。
  • 連接數據庫 TESTDB 使用的用戶名為 "testuser" ,密碼為 "test123",你可以可以自己設定或者直接使用 root 用戶名及其密碼,Mysql 數據庫用戶授權請使用 Grant 命令。
  • 已經安裝了 Python MySQLdb 模塊。
  • 如果您對sql語句不熟悉,可以訪問?SQL基礎教程

示? 例:

鏈接 Mysql 的 TESTDB 數據庫:

import pymysql# 打開數據庫連接 db = pymysql.connect("localhost","testuser","test123","TESTDB" )# 使用 cursor() 方法創建一個游標對象 cursor cursor = db.cursor()# 使用 execute() 方法執行 SQL 查詢 cursor.execute("SELECT VERSION()")# 使用 fetchone() 方法獲取單條數據. data = cursor.fetchone()print ("Database version : %s " % data)# 關閉數據庫連接 db.close()

3、使用

創建數據庫表

如果數據庫連接存在我們可以使用execute()方法來為數據庫創建表,如下所示創建表EMPLOYEE:

import pymysql# 打開數據庫連接 db = pymysql.connect("localhost","testuser","test123","TESTDB" )# 使用 cursor() 方法創建一個游標對象 cursor cursor = db.cursor()# 使用 execute() 方法執行 SQL,如果表存在則刪除 cursor.execute("DROP TABLE IF EXISTS EMPLOYEE")# 使用預處理語句創建表 sql = """CREATE TABLE EMPLOYEE (FIRST_NAME CHAR(20) NOT NULL,LAST_NAME CHAR(20),AGE INT, SEX CHAR(1),INCOME FLOAT )"""cursor.execute(sql)# 關閉數據庫連接 db.close()

查詢? 數據

Python 查詢 Mysql 使用 fetchone() 方法獲取單條數據,使用 fetchall() 方法獲取多條數據。

  • fetchone():? 該方法獲取下一個查詢結果集。結果集是一個對象
  • fetchall():??接收全部的返回結果行.
  • rowcount:? 這是一個只讀屬性,并返回執行execute()方法后影響的行數。

查詢 EMPLOYEE 表中 salary(工資)字段大于 1000 的所有數據:

import pymysql# 打開數據庫連接 db = pymysql.connect("localhost","testuser","test123","TESTDB" )# 使用cursor()方法獲取操作游標 cursor = db.cursor()# SQL 查詢語句 sql = "SELECT * FROM EMPLOYEE \WHERE INCOME > %s" % (1000) try:# 執行SQL語句cursor.execute(sql)# 獲取所有記錄列表results = cursor.fetchall()for row in results:fname = row[0]lname = row[1]age = row[2]sex = row[3]income = row[4]# 打印結果print ("fname=%s,lname=%s,age=%s,sex=%s,income=%s" % \(fname, lname, age, sex, income )) except:print ("Error: unable to fetch data")# 關閉數據庫連接 db.close()

示例:

import pymysqlclass DB():def __init__(self, host='localhost', port=3306, db='', user='root', passwd='root', charset='utf8'):# 建立連接 self.conn = pymysql.connect(host=host, port=port, db=db, user=user, passwd=passwd, charset=charset)# 創建游標,操作設置為字典類型 self.cur = self.conn.cursor(cursor = pymysql.cursors.DictCursor)def __enter__(self):# 返回游標 return self.curdef __exit__(self, exc_type, exc_val, exc_tb):# 提交數據庫并執行 self.conn.commit()# 關閉游標 self.cur.close()# 關閉數據庫連接 self.conn.close()if __name__ == '__main__':with DB(host='192.168.68.129',user='root',passwd='zhumoran',db='text3') as db:db.execute('select * from course')print(db)for i in db:print(i)

插入? 數據

執行 SQL INSERT 語句向表 EMPLOYEE 插入記錄:

import pymysql# 打開數據庫連接 db = pymysql.connect("localhost","testuser","test123","TESTDB" )# 使用cursor()方法獲取操作游標 cursor = db.cursor()# SQL 插入語句 sql = """INSERT INTO EMPLOYEE(FIRST_NAME,LAST_NAME, AGE, SEX, INCOME)VALUES ('Mac', 'Mohan', 20, 'M', 2000)""" try:# 執行sql語句cursor.execute(sql)# 提交到數據庫執行db.commit() except:# 如果發生錯誤則回滾db.rollback()# 關閉數據庫連接 db.close()

以上例子也可以寫成如下形式:

import pymysql# 打開數據庫連接 db = pymysql.connect("localhost","testuser","test123","TESTDB" )# 使用cursor()方法獲取操作游標 cursor = db.cursor()# SQL 插入語句 sql = "INSERT INTO EMPLOYEE(FIRST_NAME, \LAST_NAME, AGE, SEX, INCOME) \VALUES ('%s', '%s', %s, '%s', %s)" % \('Mac', 'Mohan', 20, 'M', 2000) try:# 執行sql語句cursor.execute(sql)# 執行sql語句db.commit() except:# 發生錯誤時回滾db.rollback()# 關閉數據庫連接 db.close()

以下代碼使用變量向SQL語句中傳遞參數:

.................................. user_id = "test123" password = "password"con.execute('insert into Login values( %s, %s)' % \(user_id, password)) ..................................

單條插入數據:

#!/usr/bin/python3import pymysql# 打開數據庫連接 db = pymysql.connect("localhost","testuser","test123","TESTDB" )# 使用cursor()方法獲取操作游標 cursor = db.cursor()# SQL 插入語句 里面的數據類型要對應 sql = "INSERT INTO EMPLOYEE(FIRST_NAME, \LAST_NAME, AGE, SEX, INCOME) \VALUES ('%s', '%s', %s, '%s', %s)" % \('Mac', 'Mohan', 20, 'M', 2000) try:# 執行sql語句cursor.execute(sql)# 執行sql語句db.commit() except:# 發生錯誤時回滾db.rollback()# 關閉數據庫連接 db.close()

批量插入數據:

注意:批量插入數據單條插入數據?的區別:

  • 批量插入:VALUES (%s, %s, %s, %s, %s,) 里面 不用引號
  • 單條插入:VALUES ('%s', '%s',?'%s',?'%s', '%s') 里面 需要引號
#!/usr/bin/env python # -*-encoding:utf-8-*-import pymysql# 打開數據庫連接 db = pymysql.connect("localhost","root","123","testdb")# 使用 cursor() 方法創建一個游標對象 cursor cursor = db.cursor()# SQL 插入語句 sql = "INSERT INTO EMPLOYEE(FIRST_NAME, \LAST_NAME, AGE, SEX, INCOME) \VALUES (%s,%s,%s,%s,%s)" # 區別與單條插入數據,VALUES ('%s', '%s', %s, '%s', %s) 里面不用引號val = (('li', 'si', 16, 'F', 1000),('Bruse', 'Jerry', 30, 'F', 3000),('Lee', 'Tomcat', 40, 'M', 4000),('zhang', 'san', 18, 'M', 1500)) try:# 執行sql語句cursor.executemany(sql,val)# 提交到數據庫執行db.commit() except:# 如果發生錯誤則回滾db.rollback()# 關閉數據庫連接 db.close()

更新? 數據

更新操作用于更新數據表的數據,以下實例將 TESTDB 表中 SEX 為 'M' 的 AGE 字段遞增 1:

import pymysql# 打開數據庫連接 db = pymysql.connect("localhost","testuser","test123","TESTDB" )# 使用cursor()方法獲取操作游標 cursor = db.cursor()# SQL 更新語句 sql = "UPDATE EMPLOYEE SET AGE = AGE + 1 WHERE SEX = '%c'" % ('M') try:# 執行SQL語句cursor.execute(sql)# 提交到數據庫執行db.commit() except:# 發生錯誤時回滾db.rollback()# 關閉數據庫連接 db.close()

批量 更新

使用 pymysql 的?course.executemany(sql, update_list)?進行批量更新

  • sql:更新一條的 sql 語句模板;
  • update_list:一個列表套元組的結構;

示? 例:

db = pymysql.connect(user='root', password='mysql', database='test', host='127.0.0.1', port=3306, charset='utf8mb4')name_list = ["re", "gh", "ds", "D"] # 存儲name的值 age_list = ["10", "20", "30", "40"] # 存儲age的值 id_list = ["1", "2", "3", "4"] # 存儲id的值 val_list = [[name_list[i], age_list[i], id_list[i]] for i in range(len(id_list))] print(val_list) # [['re', '10', '1'], ['gh', '20', '2'], ['ds', '30', '3'], ['D', '40', '4']]with db.cursor() as cursor:try:sql = "UPDATE test SET name=(%s), age=(%s) WHERE id=(%s)"cursor.executemany(sql, val_list)db.commit()except:db.rollback() db.close()

pymysql 批量 --- 增、刪、改、查

注意:插入數字也是 %s

# coding=utf-8import time import pymysql.cursorsconn= pymysql.connect(host='rm-xxx.mysql.rds.aliyuncs.com',port=3306,user='dba',password='xxxxx',db='app',charset='utf8') cursor= conn.cursor() # conn.ping(reconnect=True)count= 0 posts=[] for postin posts:try:sql= 'DELETE FROM user_like WHERE user_id=%s and like_post_id=%s'ret= cursor.executemany(sql, ((1,2), (3,4), (5,6)))conn.commit()except Exception as e:print("batch Exception:", e)count+=1cursor.close() conn.close()# 基本sql語句寫法 # INSERT INTO star(name,gender) VALUES(“XX”, 20) # SELECT * FROM app.user_post WHERE post_id LIKE '%xxxx%'; # UPDATE app.user_post SET post_id=replace(post_id,'\'','’); # UPDATE app.user_post SET province = ‘xxx', city =‘xxx'; # DELETE FROM app.user_post where updated_at = '0000-00-00 00:00:00’;# 帶參數構造語句的基本寫法 # sql = 'select user_id, post_id from user_post where user_id="{user_id}" and post_id="{post_id}"'.format(user_id=user_id, post_id=post_id) # sql = 'SELECT count(*) FROM user_like where like_post_id = "%s"' % ("xxx") # sql = 'update star set gender="{gender}", height="{height}" where star_id="{star_id}"'.format(gender='M', height=180, star_id=123456789)

刪除? 數據

刪除操作用于刪除數據表中的數據,以下實例演示了刪除數據表 EMPLOYEE 中 AGE 大于 20 的所有數據:

import pymysql# 打開數據庫連接 db = pymysql.connect("localhost","testuser","test123","TESTDB" )# 使用cursor()方法獲取操作游標 cursor = db.cursor()# SQL 刪除語句 sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20) try:# 執行SQL語句cursor.execute(sql)# 提交修改db.commit() except:# 發生錯誤時回滾db.rollback()# 關閉連接 db.close()

執行 事務

事務機制可以確保數據一致性。

對于支持事務的數據庫, 在 Python 數據庫編程中,當游標建立之時,就自動開始了一個隱形的數據庫事務。commit()方法游標的所有更新操作,rollback()方法回滾當前游標的所有操作。每一個方法都開始了一個新的事務。

事務應該具有4個屬性:原子性、一致性、隔離性、持久性。這四個屬性通常稱為ACID特性。

  • 原子性(atomicity)。一個事務是一個不可分割的工作單位,事務中包括的諸操作要么都做,要么都不做。
  • 一致性(consistency)。事務必須是使數據庫從一個一致性狀態變到另一個一致性狀態。一致性與原子性是密切相關的。
  • 隔離性(isolation)。一個事務的執行不能被其他事務干擾。即一個事務內部的操作及使用的數據對并發的其他事務是隔離的,并發執行的各個事務之間不能互相干擾。
  • 持久性(durability)。持續性也稱永久性(permanence),指一個事務一旦提交,它對數據庫中數據的改變就應該是永久性的。接下來的其他操作或故障不應該對其有任何影響。

示例:

# SQL刪除記錄語句 sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20) try:# 執行SQL語句cursor.execute(sql)# 向數據庫提交db.commit() except:# 發生錯誤時回滾db.rollback()

pymysqlpool

線程安全 pymysqlpool

# -*-coding: utf-8-*- # Author : Christopher Lee # License: Apache License # File : test_example.py # Date : 2017-06-18 01-23 # Version: 0.0.1 # Description: simple test.import logging import string import threadingimport pandas as pd import randomfrom pymysqlpool import ConnectionPoolconfig = {'pool_name': 'test','host': 'localhost','port': 3306,'user': 'root','password': 'chris','database': 'test','pool_resize_boundary': 50,'enable_auto_resize': True,# 'max_pool_size': 10 }logging.basicConfig(format='[%(asctime)s][%(name)s][%(module)s.%(lineno)d][%(levelname)s] %(message)s',datefmt='%Y-%m-%d %H:%M:%S',level=logging.DEBUG)def connection_pool():# Return a connection pool instancepool = ConnectionPool(**config)# pool.connect()return pooldef test_pool_cursor(cursor_obj=None):cursor_obj = cursor_obj or connection_pool().cursor()with cursor_obj as cursor:print('Truncate table user')cursor.execute('TRUNCATE user')print('Insert one record')result = cursor.execute('INSERT INTO user (name, age) VALUES (%s, %s)', ('Jerry', 20))print(result, cursor.lastrowid)print('Insert multiple records')users = [(name, age) for name in ['Jacky', 'Mary', 'Micheal'] for age in range(10, 15)]result = cursor.executemany('INSERT INTO user (name, age) VALUES (%s, %s)', users)print(result)print('View items in table user')cursor.execute('SELECT * FROM user')for user in cursor:print(user)print('Update the name of one user in the table')cursor.execute('UPDATE user SET name="Chris", age=29 WHERE id = 16')cursor.execute('SELECT * FROM user ORDER BY id DESC LIMIT 1')print(cursor.fetchone())print('Delete the last record')cursor.execute('DELETE FROM user WHERE id = 16')def test_pool_connection():with connection_pool().connection(autocommit=True) as conn:test_pool_cursor(conn.cursor())def test_with_pandas():with connection_pool().connection() as conn:df = pd.read_sql('SELECT * FROM user', conn)print(df)def delete_users():with connection_pool().cursor() as cursor:cursor.execute('TRUNCATE user')def add_users(users, conn):def execute(c):c.cursor().executemany('INSERT INTO user (name, age) VALUES (%s, %s)', users)c.commit()if conn:execute(conn)returnwith connection_pool().connection() as conn:execute(conn)def add_user(user, conn=None):def execute(c):c.cursor().execute('INSERT INTO user (name, age) VALUES (%s, %s)', user)c.commit()if conn:execute(conn)returnwith connection_pool().connection() as conn:execute(conn)def list_users():with connection_pool().cursor() as cursor:cursor.execute('SELECT * FROM user ORDER BY id DESC LIMIT 5')print('...')for x in sorted(cursor, key=lambda d: d['id']):print(x)def random_user():name = "".join(random.sample(string.ascii_lowercase, random.randint(4, 10))).capitalize()age = random.randint(10, 40)return name, agedef worker(id_, batch_size=1, explicit_conn=True):print('[{}] Worker started...'.format(id_))def do(conn=None):for _ in range(batch_size):add_user(random_user(), conn)if not explicit_conn:do()returnwith connection_pool().connection() as c:do(c)print('[{}] Worker finished...'.format(id_))def bulk_worker(id_, batch_size=1, explicit_conn=True):print('[{}] Bulk worker started...'.format(id_))def do(conn=None):add_users([random_user() for _ in range(batch_size)], conn)time.sleep(3)if not explicit_conn:do()returnwith connection_pool().connection() as c:do(c)print('[{}] Worker finished...'.format(id_))def test_with_single_thread(batch_number, batch_size, explicit_conn=False, bulk_insert=False):delete_users()wk = worker if not bulk_insert else bulk_workerfor i in range(batch_number):wk(i, batch_size, explicit_conn)list_users()def test_with_multi_threads(batch_number=1, batch_size=1000, explicit_conn=False, bulk_insert=False):delete_users()wk = worker if not bulk_insert else bulk_workerthreads = []for i in range(batch_number):t = threading.Thread(target=wk, args=(i, batch_size, explicit_conn))threads.append(t)t.start()[t.join() for t in threads]list_users()if __name__ == '__main__':import timestart = time.perf_counter()test_pool_cursor()test_pool_connection()test_with_pandas()test_with_multi_threads(20, 10, True, bulk_insert=True)test_with_single_thread(1, 10, True, bulk_insert=True)elapsed = time.perf_counter() - startprint('Elapsed time is: "{}"'.format(elapsed))

總結

以上是生活随笔為你收集整理的Python 操作 pymysql 批量 增、删、改、查的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。