python写数据库校验_python 验证 sqlite数据库隔离级别
sqlite3支持三種數據庫鎖級別,也叫隔離級別。下面代碼中我們對數據庫中task表進行更新,然后由另一個線程讀取這個表的數據
import
logging
import sqlite3
import
sys
import
threading
import
time
logging.
basicConfig
(
level
=
logging.
DEBUG
,
format
=
'%(asctime)s (%(threadName)-10s) %(message)s'
,
)
db_filename
=
'todo.db'
isolation_level
=
sys.
argv
[
1
]
def writer
(
):
my_name
=
threading.
currentThread
(
).
name
with sqlite3.
connect
(db_filename
,isolation_level
=isolation_level
)
as conn:
cursor
=conn.
cursor
(
)
cursor.
execute
(
'update task set priority=priority+1'
)
logging.
debug
(
'waiting to synchronize'
)
ready.
wait
(
)
# synchronize threads
time.
sleep
(
1
)
conn.
commit
(
)
logging.
debug
(
'CHANGES COMMITTED'
)
return
def reader
(
):
my_name
=
threading.
currentThread
(
).
name
with sqlite3.
connect
(db_filename
,isolation_level
=isolation_level
)
as conn:
cursor
=conn.
cursor
(
)
logging.
debug
(
'waiting to synchronize'
)
ready.
wait
(
)
#synchronize threads
cursor.
execute
(
'select * from task'
)
logging.
debug
(
'SELECT EXECUTED'
)
results
=cursor.
fetchall
(
)
logging.
debug
(
'result fetched'
)
return
if __name__
==
'__main__':
ready
=
threading.
Event
(
)
threads
=
[
threading.
Thread
(name
=
'Reader 1'
,target
=reader
)
,
threading.
Thread
(name
=
'Reader 2'
,target
=reader
)
,
threading.
Thread
(name
=
'Writer 1'
,target
=writer
)
,
threading.
Thread
(name
=
'Writer 2'
,target
=writer
)
,
]
[t.
start
(
)
for t
in threads
]
time.
sleep
(
1
)
logging.
debug
(
'setting ready'
)
ready.
set
(
)
[t.
join
(
)
for t
in threads
]
以下程序演示了,分別使用不同的鎖級別操作數據庫的結果。
1.使用延遲鎖:這種模式是sqlite3的默認模式,也就是只在發生改變的時候才會鎖上被更新的記錄。
$ python sqlite3_isolation_levels.
py DEFERRED
其輸出結果為:
2011-12-13 11:19:38,183
(Reader 1
) waiting to synchronize
2011-12-13 11:19:38,183
(Reader 2
) waiting to synchronize
2011-12-13 11:19:38,187
(Writer 1
) waiting to synchronize
2011-12-13 11:19:39,184
(MainThread
) setting ready
2011-12-13 11:19:39,186
(Reader 1
) SELECT EXECUTED
2011-12-13 11:19:39,186
(Reader 2
) SELECT EXECUTED
2011-12-13 11:19:39,187
(Reader 1
) result fetched
2011-12-13 11:19:39,187
(Reader 2
) result fetched
2011-12-13 11:19:40,243
(Writer 1
) CHANGES COMMITTED
2011-12-13 11:19:40,316
(Writer 2
) waiting to synchronize
2011-12-13 11:19:41,368
(Writer 2
) CHANGES COMMITTED
2.使用“立即鎖”:這種模式下一但要更新數據庫,會立即鎖上這條記錄,直到事務提交才會打開鎖。
$ python sqlite3_isolation_levels.
py IMMEDIATE
其輸出結果為:
2011-12-13 11:27:04,053
(Reader 1
) waiting to synchronize
2011-12-13 11:27:04,053
(Reader 2
) waiting to synchronize
2011-12-13 11:27:04,058
(Writer 1
) waiting to synchronize
2011-12-13 11:27:05,055
(MainThread
) setting ready
2011-12-13 11:27:05,056
(Reader 1
) SELECT EXECUTED
2011-12-13 11:27:05,058
(Reader 1
) result fetched
2011-12-13 11:27:05,058
(Reader 2
) SELECT EXECUTED
2011-12-13 11:27:05,058
(Reader 2
) result fetched
2011-12-13 11:27:06,111
(Writer 1
) CHANGES COMMITTED
2011-12-13 11:27:06,188
(Writer 2
) waiting to synchronize
2011-12-13 11:27:07,244
(Writer 2
) CHANGES COMMITTED
3.使用“排他鎖”:這種鎖會對所有的讀寫操作都上鎖。這種鎖一般用于對數據庫性能要求較高的情況,因為一旦上鎖,這個數據庫連接就只能為一個使用者使用。
$ python sqlite3_isolation_levels.
py EXCLUSIVE
其輸出結果為:
2011-12-13 11:32:37,276
(Reader 1
) waiting to synchronize
2011-12-13 11:32:37,276
(Reader 2
) waiting to synchronize
2011-12-13 11:32:37,278
(Writer 1
) waiting to synchronize
2011-12-13 11:32:38,279
(MainThread
) setting ready
2011-12-13 11:32:39,336
(Writer 1
) CHANGES COMMITTED
2011-12-13 11:32:39,367
(Reader 2
) SELECT EXECUTED
2011-12-13 11:32:39,368
(Reader 1
) SELECT EXECUTED
2011-12-13 11:32:39,368
(Reader 2
) result fetched
2011-12-13 11:32:39,371
(Reader 1
) result fetched
2011-12-13 11:32:39,398
(Writer 2
) waiting to synchronize
2011-12-13 11:32:40,453
(Writer 2
) CHANGES COMMITTED
當然我們也可以把鎖級別設置為None,這樣就是所謂的自動提交模式。我們對上面的代碼修改一下,把isolation_level的值設置為None,然后去掉conn.commit()。這樣每次的數據庫修改會自動提交到數據庫。代碼如下:
import
logging
import sqlite3
import
sys
import
threading
import
time
logging.
basicConfig
(
level
=
logging.
DEBUG
,
format
=
'%(asctime)s (%(threadName)-10s) %(message)s'
,
)
db_filename
=
'todo.db'
def writer
(
):
my_name
=
threading.
currentThread
(
).
name
with sqlite3.
connect
(db_filename
,isolation_level
=
None
)
as conn:
cursor
=conn.
cursor
(
)
cursor.
execute
(
'update task set priority=priority+1'
)
logging.
debug
(
'waiting to synchronize'
)
ready.
wait
(
)
# synchronize threads
time.
sleep
(
1
)
logging.
debug
(
'CHANGES COMMITTED'
)
return
def reader
(
):
my_name
=
threading.
currentThread
(
).
name
with sqlite3.
connect
(db_filename
,isolation_level
=
None
)
as conn:
cursor
=conn.
cursor
(
)
logging.
debug
(
'waiting to synchronize'
)
ready.
wait
(
)
#synchronize threads
cursor.
execute
(
'select * from task'
)
logging.
debug
(
'SELECT EXECUTED'
)
results
=cursor.
fetchall
(
)
logging.
debug
(
'result fetched'
)
return
if __name__
==
'__main__':
ready
=
threading.
Event
(
)
threads
=
[
threading.
Thread
(name
=
'Reader 1'
,target
=reader
)
,
threading.
Thread
(name
=
'Reader 2'
,target
=reader
)
,
threading.
Thread
(name
=
'Writer 1'
,target
=writer
)
,
threading.
Thread
(name
=
'Writer 2'
,target
=writer
)
,
]
[t.
start
(
)
for t
in threads
]
time.
sleep
(
1
)
logging.
debug
(
'setting ready'
)
ready.
set
(
)
[t.
join
(
)
for t
in threads
]
執行后的輸出結果為:
2011-12-13 11:35:00,753
(Reader 1
) waiting to synchronize
2011-12-13 11:35:00,753
(Reader 2
) waiting to synchronize
2011-12-13 11:35:00,825
(Writer 1
) waiting to synchronize
2011-12-13 11:35:00,953
(Writer 2
) waiting to synchronize
2011-12-13 11:35:01,755
(MainThread
) setting ready
2011-12-13 11:35:01,756
(Reader 2
) SELECT EXECUTED
2011-12-13 11:35:01,756
(Reader 1
) SELECT EXECUTED
2011-12-13 11:35:01,757
(Reader 2
) result fetched
2011-12-13 11:35:01,757
(Reader 1
) result fetched
2011-12-13 11:35:02,755
(Writer 2
) CHANGES COMMITTED
2011-12-13 11:35:02,755
(Writer 1
) CHANGES COMMITTED
總結
以上是生活随笔為你收集整理的python写数据库校验_python 验证 sqlite数据库隔离级别的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年碳中和概念龙头股 关注这些上
- 下一篇: python方差选择样本_两个样本方差比