1. 簡介
項目中本身具有非常多的數據庫表隨著項目的開發,數據庫表和數據量也都會增多
1.1 復制集
- 復制集(Replication)
- 數據庫中數據相同,起到備份作用
- 高可用 High Available HA
1.2 分布式
- 分布式(Distribution)
- 數據庫中數據不同,共同組成完整的數據集合
- 通常每個節點被稱為一個分片(shard)
- 高吞吐 High Throughput
- 復制集與分布式可以單獨使用,也可以組合使用(即每個分片都組建一個復制集)
1.3 主從
- 關于主(Master)從(Slave)
- 這個概念是從使用的角度來闡述問題的
- 主節點 -> 表示程序在這個節點上最先更新數據
- 從節點 -> 表示這個節點的數據是要通過復制主節點而來
- 復制集 可選 主從、主主、主主從從
- 分布式 每個分片都是主,組合使用復制集的時候,復制集的是從
2. 復制
1.1 簡介
1. 定義
也叫主從同步,數據備份,是一個異步的復制過程
在兩臺數據庫服務器的基礎上實現了 讀寫分離,把兩臺數據庫服務器分為一臺主服務器(master)和一臺從服務器(slave),一臺主服務器對應一臺從服務器。master只負責寫入(write)數據,從服務器只負責 同步 主服務器的數據,并讓外部程序讀取(read)數據也可以讓外部程序讀取數據(master也可以讓外部程序讀取數據)
2. 本質
slave從master獲取Binary log,然后再在自己身上完全順序的執行日志中所記錄的各種操作
MySQL服務器之間的主從同步是基于二進制日志機制,主服務器使用二進制日志來記錄數據庫的變動情況,從服務器通過讀取和執行該日志文件來保持和主服務器的數據一致。
3. 原理
當master一有數據寫入,slave的I/O thread連接上master,并請求讀取指定日志文件(Binary log)的指定位置之后的日志內容master接收來自slave的IO thread的請求后,讓負責復制的I/O thread通過,根據請求信息讀取日志信息(Binary log),返回給slaveI/O threadslave的IO thread接收到信息后,將接收到的日志內容(數據寫入的操作)依次寫入slave的Relay logslave的SQL thread檢測到Relaylog新增加內容后,會馬上解析該文件的內容,并在自身執行原始SQL語句(數據寫入的操作)
復制分成三步:
master將改變記錄到二進制日志(binary log)中(這些記錄叫做二進制日志事件,binary log events);slave將master的binary log events拷貝到它的中繼日志(relay log);slave重做中繼日志中的事件,將改變反映它自己的數據。
下圖描述了這一過程:
-
該過程的第一部分就是master記錄二進制日志。在每個事務更新數據完成之前,master在二日志記錄這些改變。MySQL將事務串行的寫入二進制日志,即使事務中的語句都是交叉執行的。在事件寫入二進制日志完成后,master通知存儲引擎提交事務。
-
下一步就是slave將master的binary log拷貝到它自己的中繼日志。首先,slave開始一個工作線程——I/O線程。I/O線程在master上打開一個普通的連接,然后開始binlog dump process。Binlog dump process從master的二進制日志中讀取事件,如果已經跟上master,它會睡眠并等待master產生新的事件。I/O線程將這些事件寫入中繼日志。
-
SQL slave thread處理該過程的最后一步。SQL線程從中繼日志讀取事件,更新slave的數據,使其與master中的數據一致。只要該線程與I/O線程保持一致,中繼日志通常會位于OS的緩存中,所以中繼日志的開銷很小。
-
此外,在master中也有一個工作線程:和其它MySQL的連接一樣,slave在master中打開一個連接也會使得master開始一個線程。
-
利用主從在達到高可用的同時,也可以通過讀寫分離提供吞吐量。
-
讀寫分離對事務是否有影響
對于寫操作包括開啟事務和提交或回滾要在一臺機器上執行,分散到多臺master執行后數據庫原生的單機事務就失效了。
對于事務中同時包含讀寫操作,與事務隔離級別設置有關,如果事務隔離級別為read-uncommitted 或者 read-committed,讀寫分離沒影響,如果隔離級別為repeatable-read、serializable,讀寫分離就有影響,因為在slave上會看到新數據,而正在事務中的master看不到新數據。
3. 作用
對數據進行備份,也就是主從同步后, 當主服務器宕機后,可以從從服務器中選一臺當主服務器,提高可用性;當從服務器宕機后,不會有任何影響,體現了 高可用,數據安全
可以增加從服務器來提高數據庫的讀取性能
讀寫分離實現后,有兩臺服務器,分攤了讀取數據庫服務器的壓力,提高了吞吐量,實現了高性能
5. 常用架構
5.1 主從架構
1. 簡介
在多加幾臺數據庫服務器的基礎上實現了讀寫分離,把多態數據庫服務器分為一臺主服務器(master)和多臺從服務器(slave),master負責write操作,slave負責read操作,一臺主服務器對應多臺從服務器。
2. 原理
2. 優缺點
優點:
一主多從,從庫高可用HA,數據安全讀寫分離,提高了吞吐量,實現了高性能
缺點:主庫單點,沒有實現高可用HA一旦掛了,無法寫入
3. 應用場景
微博:微博寫微博和讀微博的人比例大概是1:10
5.2 主備架構
1. 簡介
實質就是開多個數據庫服務器,都是master,都可以writer和read,一旦主庫掛了,就啟用備庫
2. 原理
3. 優缺點
優點:
4. 應用場景
阿里云,美團大企業,性能可以通過多個服務器來解決
5. 問題
既然主備互為備份,為什么不采用雙主方案,提供兩臺Master進行負載均衡
- 因為有延遲,會出現臟數據,數據不一致
- 雖然兩邊執行的修改有先后順序,但由于 Replication 是異步的實現機制,同樣可能導致晚做的修改被做的修改所覆蓋
- 不僅B庫數據錯誤,且A&B庫數據不一致
- 主備架構搭建除了配置雙主同步,還需要配置第三故障轉移/高可用方案
5.3 高可用復合架構
1. 簡介
在主從架構的基礎上,進行主庫的備份:主從架構+主備架構
2. 原理
3. 優缺點
- 讀寫分離,提高吞吐量
- 主從庫實現了高可用HA:主庫宕機后,去找從庫,同理,從庫宕機,去找主庫
- 提高了吞吐量
A庫宕機的情況:
2. 讀寫分離
2.1 Django實現MySQL讀寫分離
1. Docker安裝運行MySQL從機
提示:
- 搭建一主一從的主從同步。
- 主服務器:ubuntu操作系統中的MySQL。
- 從服務器:Docker容器中的MySQL。
1.獲取MySQL鏡像
$ sudo docker image pull mysql:5.7.22
或
$ sudo docker load -i 文件路徑/mysql_docker_5722.tar
2.指定MySQL從機配置文件
- 在使用Docker安裝運行MySQL從機之前,需要準備好從機的配置文件。
- 為了快速準備從機的配置文件,直接把主機的配置文件拷貝到從機中。
$ cd ~
$ mkdir mysql_slave
$ cd mysql_slave
$ mkdir data
$ cp -r /etc/mysql/mysql.conf.d ./
3.修改MySQL從機配置文件
- 編輯 ~/mysql_slave/mysql.conf.d/mysqld.cnf文件。
- 由于主從機都在同一個電腦中,所以選擇使用不同的端口號區分主從機,從機端口號是8306。
# 從機端口號
port = 8306# 關閉日志
general_log = 0# 從機唯一編號
server-id = 2
4.Docker 安裝運行 MySQL 從機
- MYSQL_ROOT_PASSWORD:創建 root 用戶的密碼為 123456。
$ sudo docker run --name mysql-slave -e MYSQL_ROOT_PASSWORD=123456 -d --network=host -v /home/ubuntu/mysql_slave/data:/var/lib/mysql -v /home/ubuntu/mysql_slave/mysql.conf.d:/etc/mysql/mysql.conf.d mysql:5.7.22
5.測試從機是否創建成功
$ mysql -uroot -p123456 -h127.0.0.1 --port=8306
2. 主從同步實現
1.配置主機( ubuntu 中 MySQL)
- 配置文件如有修改,需要重啟主機。
sudo service mysql restart
首先, 進入主機的配置文件所在地:
cd /etc/mysql/mysql.conf.d/
進入后找到 mysqld.cnf 文件, 對其進行修改:
sudo vim mysqld.cnf
修改內容如下所示:
# 開啟日志: 把下面的代碼注釋去掉
general_log_file = /var/log/mysql/mysql.log
general_log = 1# 主機唯一編號
server-id = 1# 二進制日志文件
log_bin = /var/log/mysql/mysql-bin.log
2.從機備份主機原有數據
- 在做主從同步時,如果從機需要主機上原有數據,就要先復制一份到從機。
# 1. 收集主機原有數據
$ mysqldump -uroot -pmysql --all-databases --lock-all-tables > ~/master_db.sql
# 2. 從機復制主機原有數據
$ mysql -uroot -p123456 -h127.0.0.1 --port=8306 < ~/master_db.sql
3.主從同步實現
1.創建用于從服務器同步數據的帳號
# 登錄到主機
$ mysql –uroot –pmysql# 創建從機賬號
$ GRANT REPLICATION SLAVE ON *.* TO 'slave'@'%' identified by 'slave';# 刷新權限
$ FLUSH PRIVILEGES;
2.展示 ubuntu 中 MySQL 主機的二進制日志信息
$ SHOW MASTER STATUS;
3.Docker 中 MySQL 從機連接 ubuntu 中 MySQL 主機
# 登錄到從機
$ mysql -uroot -p123456 -h 127.0.0.1 --port=8306# 從機連接到主機
$ change master to master_host='127.0.0.1', master_user='slave', master_password='slave',master_log_file='mysql-bin.000250', master_log_pos=990250;# 開啟從機服務
$ start slave;# 展示從機服務狀態
$ show slave status \G;
測試:
在主機中新建一個數據庫后,直接在從機查看是否存在。
增加slave數據庫的配置
DATABASES = {'default': { # 寫(主機)'ENGINE': 'django.db.backends.mysql', # 數據庫引擎'HOST': '172.16.238.128', # 數據庫主機'PORT': 3306, # 數據庫端口'USER': 'root', # 數據庫用戶名'PASSWORD': 'mysql', # 數據庫用戶密碼'NAME': 'project' # 數據庫名字},'slave': { # 讀(從機)'ENGINE': 'django.db.backends.mysql','HOST': '172.16.238.128','PORT': 8306,'USER': 'root','PASSWORD': '123456','NAME': 'project'}
}
創建和配置數據庫讀寫路由
創建數據庫讀寫路由
- 在mall.utils.db_router.py中實現讀寫路由
class MasterSlaveDBRouter(object):"""數據庫讀寫路由"""def db_for_read(self, model, **hints):"""讀所使用的服務器:"""return "slave"def db_for_write(self, model, **hints):"""寫所使用的服務器:"""return "default"def allow_relation(self, obj1, obj2, **hints):"""是否運行關聯操作"""return True
配置數據庫讀寫路由
DATABASE_ROUTERS = ['meiduo_mall.utils.db_router.MasterSlaveDBRouter']
2.2 falsk實現讀寫分離
- 需求分析:
sqlchemy并沒有像django-orm一樣內置完善的讀寫分離方案,但是提供了可以自定義的接口:我們可以借此對flask-sqlchemy進行二次開發,實現讀寫分離 - 思路分析:
- 實現自定義的session類(SignallingSession),繼承SignllingSession類
- 重寫`get_bind方法,根據讀寫需求選擇對應的數據庫地址
- 實現自定義的SQLAlchemy類,繼承與SQLAlchemy類
- 重寫create_session方法,在內部實現自定義的Session類
- 虛擬機搭建好Mysql主從,可以直接用于測試使用
2.2 項目集成
- 將工具包routting_db導入common/models中,其中的`routing_sqlchemy.py文件實現了讀寫分離
import random
from flask
import Flask
from flask_sqlalchemy
import SQLAlchemy
, SignallingSession
, get_state
import pymysql
from sqlalchemy
import ormpymysql
.install_as_MySQLdb
()app
= Flask
(__name__
)
app
.config
["SQLALCHEMY_DATABASE_URI"] = "mysql://root:mysql@192.168.243.151:3306/test30"
app
.config
["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app
.config
["SQLALCHEMY_BINDS"] = {"master": "mysql://root:mysql@192.168.243.151:3306/test30","slave1": "mysql://root:mysql@192.168.243.151:8306/test30","slave2": "mysql://root:mysql@192.168.243.151:3306/test30",
}
class RoutingSession(SignallingSession
):def __init__(self
, *args
, **kwargs
):super(RoutingSession
, self
).__init__
(*args
, **kwargs
)def get_bind(self
, mapper
=None, clause
=None):"""每次數據庫操作(增刪改查及事務操作)都會調用該方法, 來獲取對應的數據庫引擎(訪問的數據庫)"""state
= get_state
(self
.app
)if mapper
is not None:try:persist_selectable
= mapper
.persist_selectable
except AttributeError
:persist_selectable
= mapper
.mapped_tableinfo
= getattr(persist_selectable
, 'info', {})bind_key
= info
.get
('bind_key')if bind_key
is not None:return state
.db
.get_engine
(self
.app
, bind
=bind_key
)from sqlalchemy
.sql
.dml
import UpdateBase
if self
._flushing
or isinstance(clause
, UpdateBase
):print("寫操作--主數據庫")return state
.db
.get_engine
(self
.app
, bind
="master")else:slave_key
= random
.choice
(["slave1", "slave2"])print("讀操作--從數據庫: ", slave_key
)return state
.db
.get_engine
(self
.app
, bind
=slave_key
)
class RoutingSQLAlchemy(SQLAlchemy
):def create_session(self
, options
):return orm
.sessionmaker
(class_
=RoutingSession
, db
=self
, **options
)
db
= RoutingSQLAlchemy
(app
)
class User(db
.Model
):__tablename__
= 't_user'id = db
.Column
(db
.Integer
, primary_key
=True)name
= db
.Column
(db
.String
(20), unique
=True)age
= db
.Column
(db
.Integer
, default
=0, index
=True)@app
.route
('/')
def index():"""增加數據"""return "index"def read():print('---讀-----------')users
= User
.query
.all()print(users
)for user
in users
:print(user
.id, user
.name
, user
.age
)def write():print('---寫-----------')user1
= User
(name
='james', age
=20)db
.session
.add
(user1
)db
.session
.commit
()def update():print("---更新寫---")User
.query
.filter(User
.name
== 'xiaoming').update
({"name": "Uzi"})db
.session
.commit
()if __name__
== '__main__':app
.run
(debug
=True, port
=8888)
- 在app/settings/config.py文件中設計值主從數據庫的URL地址
class DefaultConfig:"""默認配置"""SQLALCHEMY_BINDS
= { "master": 'mysql://root:mysql@192.168.105.140:3306/hm_topnews',"slave1": 'mysql://root:mysql@192.168.105.140:3306/hm_topnews',"slave2": 'mysql://root:mysql@192.168.105.140:8306/hm_topnews'}
- 在app/__init__.py文件中使用自定義SQLAchemy類
from models
.routing_db
.routing_sqlalchemy
import RoutingSQLAlchemy
db
= RoutingSQLAlchemy
()
2. 分片(sharding)
2.1 簡介
1. 分庫分表前的問題
任何問題都是太大或者太小的問題,這里面對的數據量太大的問題。
-
用戶請求量太大
因為單服務器TPS,內存,IO都是有限的。 解決方法:分散請求到多個服務器上; 其實用戶請求和執行一個sql查詢是本質是一樣的,都是請求一個資源,只是用戶請求還會經過網關,路由,http服務器等。
-
單庫太大
單個數據庫處理能力有限;單庫所在服務器上磁盤空間不足;單庫上操作的IO瓶頸 解決方法:切分成更多更小的庫
-
單表太大
CRUD都成問題;索引膨脹,查詢超時 解決方法:切分成多個數據集更小的表。
2. 分庫分表的方式方法
-
一般就是垂直切分和水平切分,這是一種結果集描述的切分方式,是物理空間上的切分。 從面臨的問題,開始解決,闡述: 首先是用戶請求量太大,就堆機器搞定
-
然后是單個庫太大,這時要看是因為表多而導致數據多,還是因為單張表里面的數據多。 如果是因為表多而數據多,使用垂直切分,根據業務切分成不同的庫。
-
如果是因為單張表的數據量太大,這時要用水平切分,即把表的數據按某種規則切分成多張表,甚至多個庫上的多張表。 分庫分表的順序應該是先垂直分,后水平分。 因為垂直分更簡單,更符合處理現實世界問題的方式。
3. 分片簡介
-
需求分析:
- 用戶請求量太大,會導致web應用無法及時響應->分布式服務器(分散請求到多個服務器上)
- 表單太大,會導致CRUD都成問題,索引膨脹,查詢超時->拆分表
- 單庫太大,會導致單庫磁盤空間不足:處理能力有限,出現IO瓶頸->拆分庫
-
作用
- 分片也成為數據拆分(Shareding),其主要工作就是對單庫單表進行拆分,多苦多表共同組成完整的數據集合
- 分片可以提高吞吐量,同一時間數據的讀寫完成量更多,擴充單機存儲量的容量/讀寫速度上限
-
分類
- 垂直拆分:字段太多
- 水平拆分
- 使用頻率(常用字段/不常用字段)
- 垂直分庫,分表–blind–key–來實現,修改數據庫
- HASH取模 離散化
去用戶id,然后hash取模,飛陪到不同的數據庫上,遮掩
-
注意點
-
不要輕易分庫分表,因為分片會帶來 諸多分布式問題, 讓應用的復雜度大量增加
-
應避免"過度設計"和"過早優化", 先盡力去做其他優化,例如:升級硬件、升級網絡、讀寫分離、索引優化、緩存設計等等。
-
當數據量達到單表瓶頸時候(參考值: 單表記錄1000W+/硬盤100G+),再考慮分庫分表
-
如果需要進行分庫分表, 優先考慮垂直拆分
* 地理區域
分布式問題* 分布式事務
* 跨Join/排序/分頁方案一:* 不需要分方案二:* 二階段事務session_options = {“twoparse}* begin:xl* prepare:二階段預提交* commit:真正提交方案三:* ebay* 狀態字段* 1. 表中定義狀態字段2. 兩個系統定義 ***消息接口***3. -
分庫訪問
-
水平拆分:記錄太多
3. 垂直拆分
3.1 垂直分表
1. 簡介
- 也就是“大表拆小表”,基于列字段進行的。一般是表中的字段較多,將不常用的, 數據較大,長度較長(比如text類型字段)的拆分到“擴展表“。 一般是針對那種幾百列的大表,也避免查詢時,數據量太大造成的“跨頁”問題。
- 按 字段 將一張表拆分成多張表
- 對于字段較多的表, 每條記錄占用的空間也會較多, 導致每次從硬盤中讀取的記錄以及查詢緩存可緩存的記錄數量較少, 影響查詢查詢效率
- 針對字段多的表就可以采用垂直分表來進行拆分, 這樣可以減少表體積, 提高查詢效率
2. 拆分規則
- 相關性
- 可以將字段根據 業務邏輯 和 使用的相關性 進行分表劃分
- 如: 用戶名和密碼經常配合使用, 將其分到用戶認證表, 生日和郵箱等個人信息經常一起訪問, 將其分到用戶信息表
- 使用頻率
- 可以將字段根據 常用 和 不常用 進行劃分, 并進行分表處理
- 如: 原始用戶表中包含了多個字段, 其中有常用的昵稱、手機號等字段, 也包含不常用的郵箱、生日等字段, 可以根據使用頻率將其分為兩張表: 用戶基礎信息表 和 用戶其他信息表
- 項目中的應用
- 用戶數據垂直分表 user_basic& user_profile
- 文章數據垂直分表 article_basic & article_content (文章內容較長且只在詳情頁才需要)
3.2 垂直分庫
1. 簡介
-
垂直分庫針對的是一個系統中的不同業務進行拆分,比如用戶User一個庫,商品Producet一個庫,訂單Order一個庫。 切分后,要放在多個服務器上,而不是一個服務器上。為什么? 我們想象一下,一個購物網站對外提供服務,會有用戶,商品,訂單等的CRUD。沒拆分之前, 全部都是落到單一的庫上的,這會讓數據庫的單庫處理能力成為瓶頸。按垂直分庫后,如果還是放在一個數據庫服務器上, 隨著用戶量增大,這會讓單個數據庫的處理能力成為瓶頸,還有單個服務器的磁盤空間,內存,tps等非常吃緊。 所以我們要拆分到多個服務器上,這樣上面的問題都解決了,以后也不會面對單機資源問題。
-
數據庫業務層面的拆分,和服務的“治理”,“降級”機制類似,也能對不同業務的數據分別的進行管理,維護,監控,擴展等。 數據庫往往最容易成為應用系統的瓶頸,而數據庫本身屬于“有狀態”的,相對于Web和應用服務器來講,是比較難實現“橫向擴展”的。 數據庫的連接資源比較寶貴且單機處理能力也有限,在高并發場景下,垂直分庫一定程度上能夠突破IO、連接數及單機硬件資源的瓶頸。
- 將一個數據庫中的多張表拆分到多個數據庫(服務器節點)中
- 注意點:
- 由于 本地事務不支持跨庫操作, 所以應該將 有相關聯性的表放在同一個庫中
- 如: 如果后續項目垂直分庫, 將用戶相關的放在數據庫1, 文章相關的放在數據庫2
# 默認
數據庫 t_user t_article # 垂直分表
數據庫 t_user_basic t_user_detail t_article_basic t_article_detail # 垂直分庫
數據庫1 t_user_basic t_user_detail
數據庫2 t_article_detail t_article_basic
3.3 分庫訪問
-
flask-sqlalchemy 通過配置 SQLALCHEMY_BINDS允許設置多個數據庫URI, 并且每個模型類可以 __bind_key__屬性 設置自己對應訪問的數據庫
-
示例場景如下: 項目進行了分庫處理, 包含兩個庫 db1 和 db2, 用戶表t_user存儲在db1中, 而地址表t_adr存儲在db2中
from flask import Flask
from flask_sqlalchemy import SQLAlchemyapp = Flask(__name__)# 設置多個數據庫地址 (用于數據操作)
app.config['SQLALCHEMY_BINDS'] = {'db1': 'mysql://root:mysql@192.168.105.140:3306/db1','db2': 'mysql://root:mysql@192.168.105.140:3306/db2'
}# 其他配置
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config['SQLALCHEMY_ECHO'] = True# 創建組件對象
db = SQLAlchemy(app)# 用戶表 存儲在db1中
class User(db.Model):__tablename__ = 't_user'__bind_key__ = 'db1' # 設置表所在的數據庫URIid = db.Column(db.Integer, primary_key=True)name = db.Column(db.String(20))# 地址表 存儲在db2中
class Address(db.Model):__tablename__ = 't_adr'__bind_key__ = 'db2' # 設置表所在的數據庫URIid = db.Column(db.Integer, primary_key=True)detail = db.Column(db.String(20), unique=True)user_id = db.Column(db.Integer)@app.route('/')
def index():"""添加數據"""user1 = User(name='張三')db.session.add(user1)db.session.flush()adr1 = Address(detail='中關村3號', user_id=user1.id)adr2 = Address(detail='華強北5號', user_id=user1.id)db.session.add_all([adr1, adr2])db.session.commit() # 雖然只調用一次commit, 但由于需要到兩個數據庫進行操作, 其實是兩個數據庫分別創建一個事務并提交return "index"@app.route('/demo1')
def demo1():"""查詢多表數據"""user1 = User.query.filter_by(name='張三').first()adrs = Address.query.filter_by(user_id=user1.id).all()for adr in adrs:print(adr.detail)return 'demo1'if __name__ == '__main__':# 重置所有繼承自db.Model的表db.drop_all()db.create_all()app.run(debug=True)
4. 水平拆分
4.1 水平分表
- 將 一張表的記錄 拆分到多張表中
- 對于記錄較多的表, 會出現 索引膨脹, 查詢超時 等問題, 影響用戶體驗
- 針對數據量巨大的單張表(比如訂單表),按照某種規則(RANGE,HASH取模等),切分到多張表里面去。 但是這些表還是在同一個庫中,所以庫級別的數據庫操作還是有IO瓶頸。不建議采用。
4.2 水平分庫分表
- 將單張表的數據切分到多個服務器上去,每個服務器具有相應的庫與表,只是表中數據集合不同。 水平分庫分表能夠有效的緩解單機和單庫的性能瓶頸和壓力,突破IO、連接數、硬件資源等的瓶頸。
4.3 水平分庫分表拆分規則
4.4 數據庫定向查詢
- 如果進行了水平拆分, 在沒有精確過濾條件的情況下, 可能需要到多個數據庫中依次查詢目標數據
*可以對 RoutingSession 進行二次開發, 提供方法進行 數據庫定向查詢 - 應用場景如下: 對用戶表進行水平分庫分表, 用戶數據分別保存在 db1.t_user 和 db2.t_user 中, 項目的其他數據保存在數據庫 test 中
import random
from flask import Flask
from flask_sqlalchemy import SQLAlchemy, SignallingSession, get_state
from sqlalchemy import ormapp = Flask(__name__)# 設置單個數據庫URI (用于建表并添加測試數據)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@192.168.105.140:3306/db1'# 設置多個數據庫的URI (用于數據操作)
app.config['SQLALCHEMY_BINDS'] = {'db1': 'mysql://root:mysql@192.168.105.140:3306/db1','db2': 'mysql://root:mysql@192.168.105.140:3306/db2','master': 'mysql://root:mysql@192.168.105.140:3306/test','slave': 'mysql://root:mysql@192.168.105.140:3306/test'
}# 其他配置
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ECHO'] = True# 1. 自定義Session類, 繼承SignallingSession, 并重寫get_bind方法
class RoutingSession(SignallingSession):def get_bind(self, mapper=None, clause=None):"""每次數據庫操作(增刪改查及事務操作)都會調用該方法, 來獲取對應的數據庫引擎(訪問的數據庫)"""state = get_state(self.app)if self._bind: # 如果查詢指定了訪問的數據庫, 則使用指定的數據庫print('查詢數據庫:', self._bind)return state.db.get_engine(self.app, bind=self._bind)elif mapper is not None: # 如果模型類已指定數據庫, 使用指定的數據庫info = getattr(mapper.mapped_table, 'info', {})bind_key = info.get('bind_key')if bind_key is not None:return state.db.get_engine(self.app, bind=bind_key)if self._flushing: # 如果模型類未指定數據庫, 判斷是否為寫操作print('寫操作')return state.db.get_engine(self.app, bind='master')else:print('讀操作')return state.db.get_engine(self.app, bind='slave')_bind = None # 定義類屬性記錄要訪問的數據庫def using_bind(self, bind):"""指定要訪問的數據庫"""self._bind = bindreturn self# 2. 自定義SQLALchemy類, 重寫create_session方法
class RoutingSQLAlchemy(SQLAlchemy):def create_session(self, options):return orm.sessionmaker(class_=RoutingSession, db=self, **options)# 創建組件對象
db = RoutingSQLAlchemy(app)# 構建模型類
class User(db.Model):__tablename__ = 't_user'id = db.Column(db.Integer, primary_key=True)name = db.Column('username', db.String(20), unique=True)age = db.Column(db.Integer, default=0, index=True)@app.route('/')
def index():for db_bind in ['db1', 'db2']: # 遍歷各數據庫節點, 查詢用戶數據user = db.session().using_bind(db_bind).query(User).filter(User.name == 'zs').first()print(user)if user:print(user.id, user.name, user.age)return "index"if __name__ == '__main__':# 重置所有繼承自db.Model的表db.drop_all()db.create_all()# 添加測試數據 需要分別往db1和db2中添加一條數據user1 = User(name='zs', age=20)db.session.add(user1)db.session.commit()app.run(debug=True)
5. 分布式問題
5.1 分布式失去問題
- 事務支持
- 分庫分表后,就成了分布式事務了。如果依賴數據庫本身的分布式事務管理功能去執行事務,將付出高昂的性能代價;
- 如果由應用程序去協助控制,形成程序邏輯上的事務,又會造成編程方面的負擔。
- 本地事務不支持跨庫操作
- 解決辦法從簡單到復雜有三種
1. 方案一
- 將有關聯的表放在一個數據庫中
- 同庫操作可以使用一個事務
- 如用戶表&用戶頻道表, 文章基本信息表&文章內容表放在一起
2. 方案二
-
Mysql從5.6開始支持分布式事務
-
核心是二階段提交協議(簡稱 2PC協議 / XA協議)
-
分布式事務會提供一個 事務管理器 來對 各數據庫的本地事務進行統一管理, 只有各本地事務都向管理器 預提交 成功后, 事務管理器才會統一執行提交處理, 否則統一進行回滾處理
-
sqlalchemy 也支持分布式事務
- 只需要在創建 SQLAlchemy對象時, 設置參數 session_options={'twophase': True}即可
-
設置后, 整個session的所有操作會被放入到一個分布式事務中, 并在整個分布式事務范圍內保證原子性
from flask import Flask
from flask_sqlalchemy import SQLAlchemyapp = Flask(__name__)# 設置多個數據庫地址
app.config['SQLALCHEMY_BINDS'] = {'db1': 'mysql://root:mysql@192.168.105.140:3306/db1','db2': 'mysql://root:mysql@192.168.105.140:3306/db2'}app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config['SQLALCHEMY_ECHO'] = True# 創建組件對象 設置二階段提交
db = SQLAlchemy(app, session_options={'twophase': True})# 用戶表
class User(db.Model):__tablename__ = 't_user'__bind_key__ = 'db1' # 設置數據庫db1id = db.Column(db.Integer, primary_key=True)name = db.Column(db.String(20))# 地址表
class Address(db.Model):__tablename__ = 't_adr'__bind_key__ = 'db2' # 設置數據庫db2id = db.Column(db.Integer, primary_key=True)detail = db.Column(db.String(20), unique=True)user_id = db.Column(db.Integer)@app.route('/')
def index():"""添加數據"""user1 = User(name='張三')db.session.add(user1)db.session.flush()adr1 = Address(detail='中關村3號', user_id=user1.id)adr2 = Address(detail='華強北5號', user_id=user1.id)db.session.add_all([adr1, adr2])db.session.flush()db.session.commit() # 由于采用了分布式事務, 整個session的操作會被放入到一個分布式事務中, 并實現事務的原子性return "index"@app.route('/demo1')
def demo1():"""查詢多表數據 需求: 查詢姓名為"張三"的所有地址信息"""# 先根據姓名查找用戶主鍵user1 = User.query.filter_by(name='張三').first()# 再根據主鍵到從表查詢關聯地址adrs = Address.query.filter_by(user_id=user1.id).all()for adr in adrs:print(adr.detail)return 'demo1'if __name__ == '__main__':# 刪除所有繼承自db.Model的表db.drop_all()# 創建所有繼承自db.Model的表db.create_all()app.run(debug=True)
- 注意點:
- 分布式事務要在所有事務都"提交成功"的情況下才會正式提交, 如果參與的部分節點卡頓, 會影響整個事務的性能
3. 方案三
- 基于狀態/消息的最終一致性方案
- 對于 包含多個子系統的大型項目, 需要保證子系統之間的數據一致性
- 單個子系統往往不會操作所有數據庫, 但是 每個子系統可以通過定義字段來記錄操作的狀態, 每完成一個階段則更新相應的狀態
- 如下單-付款流程中, 應用A的下單事務完成后更新訂單狀態為 已下單, 應用B付款事務完成后, 再通過 支付回調接口 通知應用A 更新訂單狀態
- 應用B還需要提供一個 支付查詢接口, 以便在用戶查詢或者訂單超時的情況下, 讓應用A可以查詢訂單的支付情況
- ebay 提出的方案, 理論叫做 BASE
5.2 跨節點 Join/排序/分頁
- 跨庫join
- 不支持的跨庫操作包括join/分組/聚合/排序
1. 方案一
2. 方案二
- 使用一些第三方方案(數據庫中間件)
- 開源框架除了Mycat, 功能較少
- 需要一定學習成本, 二次開發需要公司具有一定技術實力
- 以下為推薦的開源框架:
MySQL Router:https://github.com/mysql/mysql-router
Atlas:https://github.com/Qihoo360/Atlas
Mycat:https://github.com/MyCATApache/Mycat-Server - 付費框架推薦: 阿里DRDS
- 功能: 分庫分表、分布式JOIN/聚合/排序、分布式事務、平滑擴容、讀寫分離, 全局唯一ID
- 基礎版: 14000+/年
- 一個字: 強!
分庫分表后表之間的關聯操作將受到限制,我們無法join位于不同分庫的表,也無法join分表粒度不同的表, 結果原本一次查詢能夠完成的業務,可能需要多次查詢才能完成。 粗略的解決方法: 全局表:基礎數據,所有庫都拷貝一份。 字段冗余:這樣有些字段就不用join去查詢了。 系統層組裝:分別查詢出所有,然后組裝起來,較復雜。
3. 方案三
5.3 多庫結果集合并(group by,order by)
6. 項目應用
CREATE TABLE `user_basic` (`user_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '用戶ID',`account` varchar(20) COMMENT '賬號',`email` varchar(20) COMMENT '郵箱',`status` tinyint(1) NOT NULL DEFAULT '1' COMMENT '狀態,是否可用,0-不可用,1-可用',`mobile` char(11) NOT NULL COMMENT '手機號',`password` varchar(93) NULL COMMENT '密碼',`user_name` varchar(32) NOT NULL COMMENT '昵稱',`profile_photo` varchar(128) NULL COMMENT '頭像',`last_login` datetime NULL COMMENT '最后登錄時間',`is_media` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否是自媒體,0-不是,1-是',`is_verified` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否實名認證,0-不是,1-是',`introduction` varchar(50) NULL COMMENT '簡介',`certificate` varchar(30) NULL COMMENT '認證',`article_count` int(11) unsigned NOT NULL DEFAULT '0' COMMENT '發文章數',`following_count` int(11) unsigned NOT NULL DEFAULT '0' COMMENT '關注的人數',`fans_count` int(11) unsigned NOT NULL DEFAULT '0' COMMENT '被關注的人數',`like_count` int(11) unsigned NOT NULL DEFAULT '0' COMMENT '累計點贊人數',`read_count` int(11) unsigned NOT NULL DEFAULT '0' COMMENT '累計閱讀人數',PRIMARY KEY (`user_id`),UNIQUE KEY `mobile` (`mobile`),UNIQUE KEY `user_name` (`user_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='用戶基本信息表';CREATE TABLE `user_profile` (`user_id` bigint(20) unsigned NOT NULL COMMENT '用戶ID',`gender` tinyint(1) NOT NULL DEFAULT '0' COMMENT '性別,0-男,1-女',`birthday` date NULL COMMENT '生日',`real_name` varchar(32) NULL COMMENT '真實姓名',`id_number` varchar(20) NULL COMMENT '身份證號',`id_card_front` varchar(128) NULL COMMENT '身份證正面',`id_card_back` varchar(128) NULL COMMENT '身份證背面',`id_card_handheld` varchar(128) NULL COMMENT '手持身份證',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間',`register_media_time` datetime NULL COMMENT '注冊自媒體時間',`area` varchar(20) COMMENT '地區',`company` varchar(20) COMMENT '公司',`career` varchar(20) COMMENT '職業',PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='用戶資料表';
CREATE TABLE `news_article_basic` (`article_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '文章ID',`user_id` bigint(20) unsigned NOT NULL COMMENT '用戶ID',`channel_id` int(11) unsigned NOT NULL COMMENT '頻道ID',`title` varchar(128) NOT NULL COMMENT '標題',`cover` json NOT NULL COMMENT '封面',`is_advertising` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否投放廣告,0-不投放,1-投放',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間',`status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '貼文狀態,0-草稿,1-待審核,2-審核通過,3-審核失敗,4-已刪除',`reviewer_id` int(11) NULL COMMENT '審核人員ID',`review_time` datetime NULL COMMENT '審核時間',`delete_time` datetime NULL COMMENT '刪除時間',`reject_reason` varchar(200) COMMENT '駁回原因',`comment_count` int(11) unsigned NOT NULL DEFAULT '0' COMMENT '累計評論數',`allow_comment` tinyint(1) NOT NULL DEFAULT '1' COMMENT '是否允許評論,0-不允許,1-允許',PRIMARY KEY (`article_id`),KEY `user_id` (`user_id`),KEY `article_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='文章基本信息表';CREATE TABLE `news_article_content` (`article_id` bigint(20) unsigned NOT NULL COMMENT '文章ID',`content` longtext NOT NULL COMMENT '文章內容',PRIMARY KEY (`article_id`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8 COMMENT='文章內容表';
總結
以上是生活随笔為你收集整理的MySQL分布式设计的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。