Redis 技术整理
認識Redis
Redis官網:https://redis.io/
Redis誕生于2009年全稱是Remote Dictionary Server 遠程詞典服務器,是一個基于內存的鍵值型NoSQL數據庫
特征:
- 鍵值(key-value)型,value支持多種不同數據結構,功能豐富
- 單線程,每個命令具備原子性
- 低延遲,速度快(基于內存.IO多路復用.良好的編碼)
- 支持數據持久化
- 支持主從集群、分片集群
NoSQL可以翻譯做Not Only SQL(不僅僅是SQL),或者是No SQL(非SQL的)數據庫。是相對于傳統關系型數據庫而言,有很大差異的一種特殊的數據庫,因此也稱之為非關系型數據庫
關系型數據是結構化的,即有嚴格要求,而NoSQL則對數據庫格式沒有嚴格約束,往往形式松散,*
可以是鍵值型:
也可以是文檔型:
甚至可以是圖格式:
在事務方面:
- 傳統關系型數據庫能滿足事務ACID的原則
- 非關系型數據庫往往不支持事務,或者不能嚴格保證ACID的特性,只能實現基本的一致性
除了上面說的,在存儲方式.擴展性.查詢性能上關系型與非關系型也都有著顯著差異,總結如下:
- 存儲方式
- 關系型數據庫基于磁盤進行存儲,會有大量的磁盤IO,對性能有一定影響
- 非關系型數據庫,他們的操作更多的是依賴于內存來操作,內存的讀寫速度會非常快,性能自然會好一些
- 擴展性
- 關系型數據庫集群模式一般是主從,主從數據一致,起到數據備份的作用,稱為垂直擴展。
- 非關系型數據庫可以將數據拆分,存儲在不同機器上,可以保存海量數據,解決內存大小有限的問題。稱為水平擴展。
- 關系型數據庫因為表之間存在關聯關系,如果做水平擴展會給數據查詢帶來很多麻煩
安裝Redis
企業都是基于Linux服務器來部署項目,而且Redis官方也沒有提供Windows版本的安裝包
本文選擇的Linux版本為CentOS 7
單機安裝
- 安裝需要的依賴
yum install -y gcc tcl
- 上傳壓縮包并解壓
tar -zxf redis-7.0.12.tar.gz
- 進入解壓的redis目錄
cd redis-7.0.12
- 編譯并安裝
make && make install
默認的安裝路徑是在 /usr/local/bin目錄下
該目錄已經默認配置到環境變量,因此可以在任意目錄下運行這些命令。其中:
- redis-cli:是redis提供的命令行客戶端
- redis-server:是redis的服務端啟動腳本
- redis-sentinel:是redis的哨兵啟動腳本
啟動Redis
redis的啟動方式有很多種,例如:
- 默認啟動
- 指定配置啟動
- 開機自啟
默認啟動
安裝完成后,在任意目錄輸入redis-server命令即可啟動Redis:
redis-server
這種啟動屬于“前臺啟動”,會阻塞整個會話窗口,窗口關閉或者按下CTRL + C則Redis停止
指定配置啟動
如果要讓Redis以“后臺”方式啟動,則必須修改Redis配置文件,就在之前解壓的redis安裝包下(/usr/local/src/redis-6.2.6),名字叫redis.conf
修改redis.conf文件中的一些配置:可以先拷貝一份再修改
# 允許訪問的地址,默認是127.0.0.1,會導致只能在本地訪問。修改為0.0.0.0則可以在任意IP訪問,生產環境不要設置為0.0.0.0
bind 0.0.0.0
# 守護進程,修改為yes后即可后臺運行
daemonize yes
# 密碼,設置后訪問Redis必須輸入密碼
requirepass 072413
Redis的其它常見配置:
# 監聽的端口
port 6379
# 工作目錄,默認是當前目錄,也就是運行redis-server時的命令,日志、持久化等文件會保存在這個目錄
dir .
# 數據庫數量,設置為1,代表只使用1個庫,默認有16個庫,編號0~15
databases 1
# 設置redis能夠使用的最大內存
maxmemory 512mb
# 日志文件,默認為空,不記錄日志,可以指定日志文件名
logfile "redis.log"
啟動Redis:
# 進入redis安裝目錄
cd /opt/redis-6.2.13
# 啟動
redis-server redis.conf
停止服務:
# 利用redis-cli來執行 shutdown 命令,即可停止 Redis 服務,
# 因為之前配置了密碼,因此需要通過 -u 來指定密碼
redis-cli -u password shutdown
開機自啟
可以通過配置來實現開機自啟。
首先,新建一個系統服務文件:
vim /etc/systemd/system/redis.service
內容如下:
[Unit]
Description=redis-server
After=network.target
[Service]
Type=forking
ExecStart=/usr/local/bin/redis-server /opt/redis-7.0.12/redis.conf
PrivateTmp=true
[Install]
WantedBy=multi-user.target
然后重載系統服務:
systemctl daemon-reload
現在,我們可以用下面這組命令來操作redis了:
# 啟動
systemctl start redis
# 停止
systemctl stop redis
# 重啟
systemctl restart redis
# 查看狀態
systemctl status redis
執行下面的命令,可以讓redis開機自啟:
systemctl enable redis
主從集群安裝
主:具有讀寫操作
從:只有讀操作
- 修改redis.conf文件
# 開啟RDB
# save ""
save 3600 1
save 300 100
save 60 10000
# 關閉AOF
appendonly no
- 將上面的redis.conf文件拷貝到不同地方
# 方式一:逐個拷貝
cp /usr/local/bin/redis-7.0.12/redis.conf /tmp/redis-7001
cp /usr/local/bin/redis-7.0.12/redis.conf /tmp/redis-7002
cp /usr/local/bin/redis-7.0.12/redis.conf /tmp/redis-7003
# 方式二:管道組合命令,一鍵拷貝
echo redis-7001 redis-7002 redis-7003 | xargs -t -n 1 cp /usr/local/bin/redis-7.0.12/redis.conf
- 修改各自的端口、rdb目錄改為自己的目錄
sed -i -e 's/6379/7001/g' -e 's/dir .\//dir \/tmp\/redis-7001\//g' redis-7001/redis.conf
sed -i -e 's/6379/7002/g' -e 's/dir .\//dir \/tmp\/redis-7002\//g' redis-7002/redis.conf
sed -i -e 's/6379/7003/g' -e 's/dir .\//dir \/tmp\/redis-7003\//g' redis-7003/redis.conf
- 修改每個redis節點的IP聲明。虛擬機本身有多個IP,為了避免將來混亂,需要在redis.conf文件中指定每一個實例的綁定ip信息,格式如下:
# redis實例的聲明 IP
replica-announce-ip IP地址
# 逐一執行
sed -i '1a replica-announce-ip 192.168.150.101' redis-7001/redis.conf
sed -i '1a replica-announce-ip 192.168.150.101' redis-7002/redis.conf
sed -i '1a replica-announce-ip 192.168.150.101' redis-7003/redis.conf
# 或者一鍵修改
printf '%s\n' redis-7001 redis-7002 redis-7003 | xargs -I{} -t sed -i '1a replica-announce-ip 192.168.150.101' {}/redis.conf
- 啟動
# 第1個
redis-server redis-7001/redis.conf
# 第2個
redis-server redis-7002/redis.conf
# 第3個
redis-server redis-7003/redis.conf
# 一鍵停止
printf '%s\n' redis-7001 redis-7002 redis-7003 | xargs -I{} -t redis-cli -p {} shutdown
- 開啟主從關系:配置主從可以使用replicaof 或者slaveof(5.0以前)命令
永久配置:在redis.conf中添加一行配置
slaveof <masterip> <masterport>
臨時配置:使用redis-cli客戶端連接到redis服務,執行slaveof命令(重啟后失效)
# 5.0以后新增命令replicaof,與salveof效果一致
slaveof <masterip> <masterport>
卸載Redis
- 查看redis是否啟動
ps aux | grep redis
- 若啟動,則殺死進程
kill -9 PID
- 停止服務
redis-cli shutdown
- 查看
/usr/local/lib目錄中是否有與Redis相關的文件
ll /usr/local/bin/redis-*
# 有的話就刪掉
rm -rf /usr/local/bin/redis-*
Redis客戶端工具
命令行客戶端
Redis安裝完成后就自帶了命令行客戶端:redis-cli,使用方式如下:
redis-cli [options] [commonds]
其中常見的options有:
-
-h 127.0.0.1:指定要連接的redis節點的IP地址,默認是127.0.0.1 -
-p 6379:指定要連接的redis節點的端口,默認是6379 -
-a 072413:指定redis的訪問密碼
其中的commonds就是Redis的操作命令,例如:
-
ping:與redis服務端做心跳測試,服務端正常會返回pong
不指定commond時,會進入redis-cli的交互控制臺:
圖形化客戶端
地址:https://github.com/uglide/RedisDesktopManager
不過該倉庫提供的是RedisDesktopManager的源碼,并未提供windows安裝包。
在下面這個倉庫可以找到安裝包:https://github.com/lework/RedisDesktopManager-Windows/releases
下載之后,解壓、安裝
Redis默認有16個倉庫,編號從0至15. 通過配置文件可以設置倉庫數量,但是不超過16,并且不能自定義倉庫名稱。
如果是基于redis-cli連接Redis服務,可以通過select命令來選擇數據庫
# 選擇 0號庫
select 0
Redis常見命令 / 對象
Redis是一個key-value的數據庫,key一般是String類型,不過value的類型多種多樣:
查命令的官網: https://redis.io/commands
在交互界面使用 help 命令查詢:
help [command]
通用命令
通用指令是部分數據類型都可以使用的指令,常見的有:
- KEYS:查看符合模板的所有key。在生產環境下,不推薦使用keys 命令,因為這個命令在key過多的情況下,效率不高
- DEL:刪除一個指定的key
- EXISTS:判斷key是否存在
- EXPIRE:給一個key設置有效期,有效期到期時該key會被自動刪除。內存非常寶貴,對于一些數據,我們應當給他一些過期時間,當過期時間到了之后,他就會自動被刪除
- 當使用EXPIRE給key設置的有效期過期了,那么此時查詢出來的TTL結果就是-2
- 如果沒有設置過期時間,那么TTL返回值就是-1
- TTL:查看一個KEY的剩余有效期
String命令
使用場景:
- 驗證碼保存
- 不易變動的對象保存
- 簡單鎖的保存
String類型,也就是字符串類型,是Redis中最簡單的存儲類型
其value是字符串,不過根據字符串的格式不同,又可以分為3類:
- string:普通字符串
- int:整數類型,可以做自增.自減操作
- float:浮點類型,可以做自增.自減操作
String的常見命令有:
-
SET:添加或者修改已經存在的一個String類型的鍵值對,對于SET,若key不存在則為添加,存在則為修改
-
GET:根據key獲取String類型的value
-
MSET:批量添加多個String類型的鍵值對
-
MGET:根據多個key獲取多個String類型的value
-
INCR:讓一個整型的key自增1
-
INCRBY:讓一個整型的key自增并指定步長
- incrby num 2 讓num值自增2
- 也可以使用負數,是為減法,如:incrby num -2 讓num值-2。此種類似 DECR 命令,而DECR是每次-1
-
INCRBYFLOAT:讓一個浮點類型的數字自增并指定步長
-
SETNX:添加一個String類型的鍵值對(key不存在為添加,存在則不執行)
-
SETEX:添加一個String類型的鍵值對,并且指定有效期
注:以上命令除了INCRBYFLOAT 都是常用命令
key問題
key的設計
Redis沒有類似MySQL中的Table的概念,我們該如何區分不同類型的key?
可以通過給key添加前綴加以區分,不過這個前綴不是隨便加的,有一定的規范
Redis的key允許有多個單詞形成層級結構,多個單詞之間用:隔開,格式如下:
這個格式并非固定,也可以根據自己的需求來刪除或添加詞條
如項目名稱叫 automation,有user和product兩種不同類型的數據,我們可以這樣定義key:
-
user相關的key:automation:user:1
-
product相關的key:automation:product:1
同時還需要滿足:
- key的長度最好別超過44字節(3.0版本是39字節)
- key中別包含特殊字符
BigKey問題
BigKey通?!耙訩ey的大小和Key中成員的數量來綜合判定”,例如:
- Key本身的數據量過大:一個String類型的Key,它的值為5 MB
- Key中的成員數過多:一個ZSET類型的Key,它的成員數量為10,000個
- Key中成員的數據量過大:一個Hash類型的Key,它的成員數量雖然只有1,000個但這些成員的Value(值)總大小為100 MB
判定元素大小的方式:
MEMORY USAGE key # 查看某個key的內存大小,不建議使用:因為此命令對CPU使用率較高
# 衡量值 或 值的個數
STRLEN key # string結構 某key的長度
LLEN key # list集合 某key的值的個數
.............
推薦值:
- 單個key的value小于10KB
- 對于集合類型的key,建議元素數量小于1000
BigKey的危害
- 網絡阻塞:對BigKey執行讀請求時,少量的QPS就可能導致帶寬使用率被占滿,導致Redis實例,乃至所在物理機變慢
- 數據傾斜:BigKey所在的Redis實例內存使用率遠超其他實例,無法使數據分片的內存資源達到均衡
- Redis阻塞:對元素較多的hash、list、zset等做運算會耗時較舊,使主線程被阻塞
- CPU壓力:對BigKey的數據序列化和反序列化會導致CPU的使用率飆升,影響Redis實例和本機其它應用
如何發現BigKey
-
redis-cli --bigkeys 命令:
redis-cli -a 密碼 --bigkeys
此命令可以遍歷分析所有key,并返回Key的整體統計信息與每個數據的Top1的key
不足:返回的是內存大小是TOP1的key,而此key不一定是BigKey,同時TOP2、3.......的key也不一定就不是BigKey
- scan命令掃描:]每次會返回2個元素,第一個是下一次迭代的光標(cursor),第一次光標會設置為0,當最后一次scan 返回的光標等于0時,表示整個scan遍歷結束了,第二個返回的是List,一個匹配的key的數組
127.0.0.1:7001> help SCAN
SCAN cursor [MATCH pattern] [COUNT count] [TYPE type]
summary: Incrementally iterate the keys space
since: 2.8.0
group: generic
自定義代碼來判定是否為BigKey
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.ScanResult;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class JedisTest {
private final static int STR_MAX_LEN = 10 * 1024;
private final static int HASH_MAX_LEN = 500;
private Jedis jedis;
@BeforeEach
void setUp() {
// 1.建立連接
jedis = new Jedis("192.168.146.100", 6379);
// 2.設置密碼
jedis.auth("072413");
// 3.選擇庫
jedis.select(0);
}
@Test
void testScan() {
int maxLen = 0;
long len = 0;
String cursor = "0";
do {
// 掃描并獲取一部分key
ScanResult<String> result = jedis.scan(cursor);
// 記錄cursor
cursor = result.getCursor();
List<String> list = result.getResult();
if (list == null || list.isEmpty()) {
break;
}
// 遍歷
for (String key : list) {
// 判斷key的類型
String type = jedis.type(key);
switch (type) {
case "string":
len = jedis.strlen(key);
maxLen = STR_MAX_LEN;
break;
case "hash":
len = jedis.hlen(key);
maxLen = HASH_MAX_LEN;
break;
case "list":
len = jedis.llen(key);
maxLen = HASH_MAX_LEN;
break;
case "set":
len = jedis.scard(key);
maxLen = HASH_MAX_LEN;
break;
case "zset":
len = jedis.zcard(key);
maxLen = HASH_MAX_LEN;
break;
default:
break;
}
if (len >= maxLen) {
System.out.printf("Found big key : %s, type: %s, length or size: %d %n", key, type, len);
}
}
} while (!cursor.equals("0"));
}
@AfterEach
void tearDown() {
if (jedis != null) {
jedis.close();
}
}
}
- 第三方工具
- 利用第三方工具,如 Redis-Rdb-Tools 分析RDB快照文件,全面分析內存使用情況
- 網絡監控
- 自定義工具,監控進出Redis的網絡數據,超出預警值時主動告警
- 一般阿里云搭建的云服務器就有相關監控頁面
如何刪除BigKey
BigKey內存占用較多,即便是刪除這樣的key也需要耗費很長時間,導致Redis主線程阻塞,引發一系列問題
- redis 3.0 及以下版本:如果是集合類型,則遍歷BigKey的元素,先逐個刪除子元素,最后刪除BigKey
- Redis 4.0以后:使用異步刪除的命令 unlink
127.0.0.1:7001> help UNLINK
UNLINK key [key ...]
summary: Delete a key asynchronously in another thread. Otherwise it is just as DEL, but non blocking.
since: 4.0.0
group: generic
解決BigKey問題
上一節是刪除BigKey,但是數據最終還是未解決
要解決BigKey:
- 選擇合適的數據結構(String、Hash、List、Set、ZSet、Stream、GEO、HyperLogLog、BitMap)
- 將大數據拆為小數據,具體根據業務來
如:一個對象放在hash中,hash底層會使用ZipList壓縮,但entry數量超過500時(看具體redis版本),會使用哈希表而不是ZipList
# 查看redis的entry數量
[root@zixq ~]# redis-cli
127.0.0.1:7001> config get hash-max-ziplist-entries
# 修改redis的entry數量 別太離譜即可
config set hash-max-ziplist-entries
因此:一個hash的key中若是field-value約束在一定的entry以內即可,超出的就用另一個hash的key來存儲,具體實現以業務來做
Hash命令
使用場景:
- 易改變對象的保存
- 分布式鎖的保存(Redisson分布式鎖的實現原理)
這個在工作中使用頻率很高
Hash類型,也叫散列,其value是一個無序字典,類似于Java中的HashMap結構。
String結構是將對象序列化為JSON字符串后存儲,當需要修改對象某個字段時很不方便:
Hash結構可以將對象中的每個字段獨立存儲,可以針對單個字段做CRUD:
Hash類型的常見命令
-
HSET key field value:添加或者修改hash類型key的field的值。同理:操作不存在數據是為新增,存在則為修改
-
HGET key field:獲取一個hash類型key的field的值
-
HMSET:批量添加多個hash類型key的field的值
-
HMGET:批量獲取多個hash類型key的field的值
-
HGETALL:獲取一個hash類型的key中的所有的field和value
-
HKEYS:獲取一個hash類型的key中的所有的field
-
HINCRBY:讓一個hash類型key的field的value值自增并指定步長
-
HSETNX:添加一個hash類型的key的field值,前提是這個field不存在,否則不執行
List命令 - 命令規律開始變化
Redis中的List類型與Java中的LinkedList類似,可以看做是一個雙向鏈表結構。既可以支持正向檢索,也可以支持反向檢索。
特征也與LinkedList類似:
- 有序
- 元素可以重復
- 插入和刪除快
- 查詢速度一般
使用場景:
- 朋友圈點贊列表
- 評論列表
List的常見命令有:
- LPUSH key element ... :向列表左側插入一個或多個元素
- LPOP key:移除并返回列表左側的第一個元素,沒有則返回nil
- RPUSH key element ... :向列表右側插入一個或多個元素
- RPOP key:移除并返回列表右側的第一個元素
- LRANGE key star end:返回一段角標范圍內的所有元素
- BLPOP和BRPOP:與LPOP和RPOP類似,只不過在沒有元素時等待指定時間,而不是直接返回nil
Set命令
Redis的Set結構與Java中的HashSet類似,可以看做是一個value為null的HashMap。因為也是一個hash表,因此具備與HashSet類似的特征:
- 無序
- 元素不可重復
- 查找快
- 支持交集.并集.差集等功能
使用場景:
- 一人一次的業務。如:某商品一個用戶只能買一次
- 共同擁有的業務,如:關注、取關與共同關注
Set類型的常見命令
- SADD key member ... :向set中添加一個或多個元素
- SREM key member ... :移除set中的指定元素
- SCARD key:返回set中元素的個數
- SISMEMBER key member:判斷一個元素是否存在于set中
- SMEMBERS:獲取set中的所有元素
- SINTER key1 key2 ... :求key1與key2的交集
- SDIFF key1 key2 ... :求key1與key2的差集
- SUNION key1 key2 ..:求key1和key2的并集
SortedSet / ZSet 命令
Redis的SortedSet是一個可排序的set集合,與Java中的TreeSet有些類似,但底層數據結構卻差別很大。SortedSet中的每一個元素都帶有一個score屬性,可以基于score屬性對元素排序,底層的實現是一個跳表(SkipList)加 hash表。
SortedSet具備下列特性:
- 可排序
- 元素不重復
- 查詢速度快
使用場景:
- 排行榜
SortedSet的常見命令有:
- ZADD key score member:添加一個或多個元素到sorted set ,如果已經存在則更新其score值
- ZREM key member:刪除sorted set中的一個指定元素
- ZSCORE key member : 獲取sorted set中的指定元素的score值
- ZRANK key member:獲取sorted set 中的指定元素的排名
- ZCARD key:獲取sorted set中的元素個數
- ZCOUNT key min max:統計score值在給定范圍內的所有元素的個數
- ZINCRBY key increment member:讓sorted set中的指定元素自增,步長為指定的increment值
- ZRANGE key min max:按照score排序后,獲取指定排名范圍內的元素
- ZRANGEBYSCORE key min max:按照score排序后,獲取指定score范圍內的元素
- ZDIFF.ZINTER.ZUNION:求差集.交集.并集
注意:所有的排名默認都是升序,如果要降序則在命令的Z后面添加REV即可,例如:
- 升序獲取sorted set 中的指定元素的排名:ZRANK key member
- 降序獲取sorted set 中的指定元素的排名:ZREVRANK key memeber
Stream命令
基于Stream的消息隊列-消費者組
消費者組(Consumer Group):將多個消費者劃分到一個組中,監聽同一個隊列。具備下列特點:
- 創建消費者組
XGROUP CREATE key groupName ID [MKSTREAM]
- key:隊列名稱
- groupName:消費者組名稱
- ID:起始ID標示,$代表隊列中最后一個消息,0則代表隊列中第一個消息
- MKSTREAM:隊列不存在時自動創建隊列
- 刪除指定的消費者組
XGROUP DESTORY key groupName
- 給指定的消費者組添加消費者
XGROUP CREATECONSUMER key groupname consumername
- 刪除消費者組中的指定消費者
XGROUP DELCONSUMER key groupname consumername
- 從消費者組讀取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
-
group:消費組名稱
-
consumer:消費者名稱,如果消費者不存在,會自動創建一個消費者
-
count:本次查詢的最大數量
-
BLOCK milliseconds:當沒有消息時最長等待時間
-
NOACK:無需手動ACK,獲取到消息后自動確認
-
STREAMS key:指定隊列名稱
-
ID:獲取消息的起始ID:
- ">":從下一個未消費的消息開始
- 其它:根據指定id從pending-list中獲取已消費但未確認的消息,例如0,是從pending-list中的第一個消息開始
STREAM類型消息隊列的XREADGROUP命令特點:
- 消息可回溯
- 可以多消費者爭搶消息,加快消費速度
- 可以阻塞讀取
- 沒有消息漏讀的風險
- 有消息確認機制,保證消息至少被消費一次
消費者監聽消息的基本思路:
GEO命令
GEO就是Geolocation的簡寫形式,代表地理坐標。Redis在3.2版本中加入了對GEO的支持,允許存儲地理坐標信息,幫助我們根據經緯度來檢索數據
常見的命令有:
- GEOADD:添加一個地理空間信息,包含:經度(longitude)、緯度(latitude)、值(member)
- GEODIST:計算指定的兩個點之間的距離并返回
- GEOHASH:將指定member的坐標轉為hash字符串形式并返回
- GEOPOS:返回指定member的坐標
- GEORADIUS:指定圓心、半徑,找到該圓內包含的所有member,并按照與圓心之間的距離排序后返回。6.以后已廢棄
- GEOSEARCH:在指定范圍內搜索member,并按照與指定點之間的距離排序后返回。范圍可以是圓形或矩形。6.2.新功能
- GEOSEARCHSTORE:與GEOSEARCH功能一致,不過可以把結果存儲到一個指定的key。 6.2.新功能
BitMap命令
bit:指的就是bite,二進制,里面的內容就是非0即1咯
map:就是說將適合使用0或1的業務進行關聯。如:1為簽到、0為未簽到,這樣就直接使用某bite就可表示出一個用戶一個月的簽到情況,減少內存花銷了
BitMap底層是基于String實現的,因此:在Java中BitMap相關的操作封裝到了redis的String操作中
BitMap的操作命令有:
- SETBIT:向指定位置(offset)存入一個0或1
- GETBIT :獲取指定位置(offset)的bit值
- BITCOUNT :統計BitMap中值為1的bit位的數量
- BITFIELD :操作(查詢、修改、自增)BitMap中bit數組中的指定位置(offset)的值
- BITFIELD_RO :獲取BitMap中bit數組,并以十進制形式返回
- BITOP :將多個BitMap的結果做位運算(與 、或、異或)
- BITPOS :查找bit數組中指定范圍內第一個0或1出現的位置
HyperLogLog 命令
UV:全稱Unique Visitor,也叫獨立訪客量,是指通過互聯網訪問、瀏覽這個網頁的自然人。1天內同一個用戶多次訪問該網站,只記錄1次
PV:全稱Page View,也叫頁面訪問量或點擊量,用戶每訪問網站的一個頁面,記錄1次PV,用戶多次打開頁面,則記錄多次PV。往往用來衡量網站的流量
Hyperloglog(HLL)是從Loglog算法派生的概率算法,用于確定非常大的集合的基數,而不需要存儲其所有值
相關算法原理大家可以參考:https://juejin.cn/post/6844903785744056333#heading-0
Redis中的HLL是基于string結構實現的,單個HLL的內存永遠小于16kb。作為代價,其測量結果是概率性的,有小于0.81%的誤差,同時此結構自帶去重
常用命令如下:目前也只有這些命令
PubSub 發布訂閱
PubSub(發布訂閱)是Redis2.0版本引入的消息傳遞模型。顧名思義,消費者可以訂閱一個或多個channel,生產者向對應channel發送消息后,所有訂閱者都能收到相關消息
- SUBSCRIBE channel [channel] :訂閱一個或多個頻道
- PUBLISH channel msg :向一個頻道發送消息
- PSUBSCRIBE pattern[pattern] :訂閱與pattern格式匹配的所有頻道。pattern支持的通配符如下:
? 表示 一個 字符 如:h?llo 則可以為 hallo、hxllo
* 表示 0個或N個 字符 如:h*llo 則可以為 hllo、heeeello.........
[ae] 表示 是a或e都行 如:h[ae]llo 則可以為 hello、hallo
優點:
- 采用發布訂閱模型,支持多生產、多消費
缺點:
- 不支持數據持久化
- 無法避免消息丟失
- 消息堆積有上限,超出時數據丟失
Java操作:Jedis
官網:https://redis.io/docs/clients/
其中Java客戶端也包含很多:
標記為?的就是推薦使用的Java客戶端,包括:
- Jedis和Lettuce:這兩個主要是提供了“Redis命令對應的API”,方便我們操作Redis,而SpringDataRedis又對這兩種做了抽象和封裝
- Redisson:是在Redis基礎上實現了分布式的可伸縮的Java數據結構,例如Map.Queue等,而且支持跨進程的同步機制:Lock.Semaphore等待,比較適合用來實現特殊的功能需求
入門Jedis
創建Maven項目
- 依賴
<!--jedis-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.7.0</version>
</dependency>
- 測試:其他類型如Hash、Set、List、SortedSet和下面String是一樣的用法
package com.zixieqing.redis;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import redis.clients.jedis.Jedis;
/**
* jedis操作redis:redis的命令就是jedis對應的API
*
* @author : ZiXieqing
*/
public class QuickStartTest {
private Jedis jedis;
@Before
public void setUp() throws Exception {
jedis = new Jedis("host", 6379);
// 設置密碼
jedis.auth("072413");
// 設置庫
jedis.select(0);
}
@After
public void tearDown() throws Exception {
if (null != jedis) jedis.close();
}
/**
* String類型
*/
@Test
public void stringTest() {
// 添加key-value
String result = jedis.set("name", "zixieqing");
System.out.println("result = " + result);
// 通過key獲取value
String value = jedis.get("name");
System.out.println("value = " + value);
// 批量添加或修改
String mset = jedis.mset("age", "18", "sex", "girl");
System.out.println("mset = " + mset);
System.out.println("jedis.keys() = " + jedis.keys("*"));
// 給key自增并指定步長
long incrBy = jedis.incrBy("age", 5L);
System.out.println("incrBy = " + incrBy);
// 若key不存在,則添加,存在則不執行
long setnx = jedis.setnx("city", "hangzhou");
System.out.println("setnx = " + setnx);
// 添加key-value,并指定有效期
String setex = jedis.setex("job", 10L, "Java");
System.out.println("setex = " + setex);
// 獲取key的有效期
long ttl = jedis.ttl("job");
System.out.println("ttl = " + ttl);
}
}
連接池
Jedis本身是線程不安全的,并且頻繁的創建和銷毀連接會有性能損耗,推薦使用Jedis連接池代替Jedis的直連方式
package com.zixieqing.redis.util;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.time.Duration;
/**
* Jedis連接池
*
* @author : ZiXieqing
*/
public class JedisConnectionFactory {
private static JedisPool jedisPool;
static {
// 設置連接池
JedisPoolConfig PoolConfig = new JedisPoolConfig();
PoolConfig.setMaxTotal(30);
PoolConfig.setMaxIdle(30);
PoolConfig.setMinIdle(0);
PoolConfig.setMaxWait(Duration.ofSeconds(1));
/*
設置鏈接對象
JedisPool(GenericObjectPoolConfig<Jedis> poolConfig, String host, int port, int timeout, String password)
*/
jedisPool = new JedisPool(PoolConfig, "192.168.46.128", 6379, 1000, "072413");
}
public static Jedis getJedis() {
return jedisPool.getResource();
}
}
Java操作:SpringDataRedis
SpringData是Spring中數據操作的模塊,包含對各種數據庫的集成,其中對Redis的集成模塊就叫做SpringDataRedis
官網:https://spring.io/projects/spring-data-redis
- 提供了對不同Redis客戶端的整合(Lettuce和Jedis)
- 提供了RedisTemplate統一API來操作Redis
- 支持Redis的發布訂閱模型
- 支持Redis哨兵和Redis集群
- 支持基于JDK.JSON、字符串、Spring對象的數據序列化及反序列化
- 支持基于Redis的JDKCollection實現
SpringDataRedis中提供了RedisTemplate工具類,其中封裝了各種對Redis的操作。并且將不同數據類型的操作API封裝到了不同的類型中:
入門SpringDataRedis
創建SpringBoot項目
- pom.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.9.RELEASE</version>
<relativePath/>
</parent>
<groupId>com.zixieqing</groupId>
<artifactId>02-spring-data-redis</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>02-spring-data-redis</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>8</java.version>
</properties>
<dependencies>
<!--redis依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--common-pool-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!--Jackson依賴-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
- YAML文件配置
spring:
redis:
host: 192.168.46.128
port: 6379
password: "072413"
jedis:
pool:
max-active: 100 # 最大連接數
max-idle: 100 # 最大空閑數
min-idle: 0 # 最小空閑數
max-wait: 5 # 最大鏈接等待時間 單位:ms
- 測試:其他如Hash、List、Set、SortedSet的方法和下面String差不多
package com.zixieqing.springdataredis;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@SpringBootTest(classes = App.class)
class ApplicationTests {
@Resource
private RedisTemplate<String, Object> redisTemplate;
/**
* SpringDataRedis操作redis:String類型 其他類型都是同理操作
*
* String:opsForValue
* Hash:opsForHash
* List:opsForList
* Set:opsForSet
* SortedSet:opsForZSet
*/
@Test
void stringTest() {
// 添加key-value
redisTemplate.opsForValue().set("name", "紫邪情");
// 根據key獲取value
String getName = Objects.requireNonNull(redisTemplate.opsForValue().get("name")).toString();
System.out.println("getName = " + getName);
// 添加key-value 并 指定有效期
redisTemplate.opsForValue().set("job", "Java", 10L, TimeUnit.SECONDS);
String getJob = Objects.requireNonNull(redisTemplate.opsForValue().get("job")).toString();
System.out.println("getJob = " + getJob);
// 就是 setnx 命令,key不存在則添加,存在則不執行
redisTemplate.opsForValue().setIfAbsent("city", "杭州");
redisTemplate.opsForValue().setIfAbsent("info", "臉皮厚,欠揍", 10L, TimeUnit.SECONDS);
ArrayList<String> keys = new ArrayList<>();
keys.add("name");
keys.add("job");
keys.add("city");
keys.add("info");
redisTemplate.delete(keys);
}
}
數據序列化
RedisTemplate可以接收Object類型作為值寫入Redis:
只不過寫入前會把Object序列化為字節形式,默認是采用JDK序列化,得到的結果是這樣的:
缺點:
- 可讀性差
- 內存占用較大
Jackson序列化
我們可以自定義RedisTemplate的序列化方式,代碼如下:
package com.zixieqing.springdataredis.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
/**
* redis自定義序列化方式
*
* @author : ZiXieqing
*/
@Configuration
public class RedisSerializeConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory){
// 創建RedisTemplate對象
RedisTemplate<String, Object> template = new RedisTemplate<>();
// 設置連接工廠
template.setConnectionFactory(connectionFactory);
// 創建JSON序列化工具
GenericJackson2JsonRedisSerializer jsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
// 設置Key的序列化
template.setKeySerializer(RedisSerializer.string());
template.setHashKeySerializer(RedisSerializer.string());
// 設置Value的序列化
template.setValueSerializer(jsonRedisSerializer);
template.setHashValueSerializer(jsonRedisSerializer);
// 返回
return template;
}
}
這里采用了JSON序列化來代替默認的JDK序列化方式。最終結果如圖:
整體可讀性有了很大提升,并且能將Java對象自動的序列化為JSON字符串,并且查詢時能自動把JSON反序列化為Java對象
不過,其中記錄了序列化時對應的class名稱,目的是為了查詢時實現自動反序列化。這會帶來額外的內存開銷。
StringRedisTemplate
盡管JSON的序列化方式可以滿足我們的需求,但依然存在一些問題
為了在反序列化時知道對象的類型,JSON序列化器會將類的class類型寫入json結果中,存入Redis,會帶來額外的內存開銷。
為了減少內存的消耗,我們可以采用手動序列化的方式,換句話說,就是不借助默認的序列化器,而是我們自己來控制序列化的動作,同時,我們只采用String的序列化器,這樣,在存儲value時,我們就不需要在內存中多存儲數據,從而節約我們的內存空間
這種用法比較普遍,因此SpringDataRedis就提供了RedisTemplate的子類:StringRedisTemplate,它的key和value的序列化方式默認就是String方式
使用示例:
package com.zixieqing.springdataredis;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zixieqing.springdataredis.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.StringRedisTemplate;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@Slf4j
@SpringBootTest(classes = App.class)
class ApplicationTests {
@Autowired
private StringRedisTemplate stringRedisTemplate;
// 是jackson中的
private final ObjectMapper mapper = new ObjectMapper();
/**
* 使用StringRedisTemplate操作Redis 和 序列化與反序列化
*
* 操作redis和String類型一樣的
*/
@Test
void serializeTest() throws JsonProcessingException {
User user = new User();
user.setName("zixieqing")
.setJob("Java");
// 序列化
String userStr = mapper.writeValueAsString(user);
stringRedisTemplate.opsForValue().set("com:zixieqing:springdataredis:user", userStr);
// 反序列化
String userStr2 = stringRedisTemplate.opsForValue().get("com:zixieqing:springdataredis:user");
User user2 = mapper.readValue(userStr2, User.class);
log.info("反序列化結果:{}", user2);
}
}
緩存更新策略
緩存更新是redis為了節約內存而設計出來的一個東西,主要是因為內存數據寶貴,當我們向redis插入太多數據,此時就可能會導致緩存中的數據過多,所以redis會對部分數據進行淘汰
內存淘汰:redis自動進行,當redis內存達到咱們設定的max-memery的時候,會自動觸發淘汰機制,淘汰掉一些不重要的數據(可以自己設置策略方式)
超時剔除:當我們給redis設置了過期時間ttl之后,redis會將超時的數據進行刪除,方便咱們繼續使用緩存
主動更新:我們可以手動調用方法把緩存刪掉,通常用于解決緩存和數據庫不一致問題
業務場景:先說結論,后面分析這些結論是怎么來的
- 低一致性需求:使用Redis自帶的內存淘汰機制
- 高一致性需求:主動更新,并以超時剔除作為兜底方案讀操作:
- 讀操作:
- 緩存命中則直接返回
- 緩存未命中則查詢數據庫,并寫入緩存,設定超時時間寫操作
- 寫操作:
- 先寫數據庫,然后再刪除緩存
- 要確保數據庫與緩存操作的原子性(單體系統寫庫操作和刪除緩存操作放入一個事務;分布式系統使用分布式事務管理這二者)
- 讀操作:
主動更新策略:數據庫與緩存不一致問題
由于我們的緩存的數據源來自于數據庫,而數據庫的數據是會發生變化的。因此,如果當數據庫中數據發生變化,而緩存卻沒有同步,此時就會有一致性問題存在,其后果是:
用戶使用緩存中的過時數據,就會產生類似多線程數據安全問題,從而影響業務,產品口碑等;怎么解決呢?有如下幾種方案
Cache Aside Pattern 人工編碼方式:緩存調用者在更新完數據庫后再去更新緩存,也稱之為雙寫方案。這種由我們自己編寫,所以可控,因此此種方式勝出
Read/Write Through Pattern : 由系統本身完成,數據庫與緩存的問題交由系統本身去處理
Write Behind Caching Pattern :調用者只操作緩存,其他線程去異步處理數據庫,實現最終一致
Cache Aside 人工編碼 解決數據庫與緩存不一致
由上一節知道數據庫與緩存不一致的解決方案是 Cache Aside 人工編碼,但是這個玩意兒需要考慮幾個問題:
-
刪除緩存還是更新緩存?
-
更新緩存:每次更新數據庫都更新緩存,無效寫操作較多
-
刪除緩存:更新數據庫時讓緩存失效,查詢時再更新緩存(勝出)
-
-
如何保證緩存與數據庫的操作的同時成功或失???
- 單體系統,將緩存與數據庫操作放在一個事務
- 分布式系統,利用TCC等分布式事務方案
-
先操作緩存還是先操作數據庫?
-
先刪除緩存,再操作數據庫
-
先操作數據庫,再刪除緩存(勝出)
-
為什么是先操作數據庫,再刪除緩存?
操作數據庫和操作緩存在“串行”情況下沒什么太大區別,問題不大,但是:在“并發”情況下,二者就有區別,就會產生數據庫與緩存數據不一致的問題
先看“先刪除緩存,再操作數據庫”:
再看“先操作數據庫,再刪除緩存”:redis操作幾乎是微秒級,所以下圖線程1會很快完成,然后線程2業務一般都慢一點,所以緩存中能極快地更新成數據庫中的最新數據,因此這種方式雖也會發生數據不一致,但幾率很小(數據庫操作一般不會在微秒級別內完成)
因此:勝出的是“先操作數據庫,再刪除緩存”
緩存穿透及解決方式
緩存穿透:指客戶端請求的數據在緩存中和數據庫中都不存在。這樣緩存永遠不會生效,這些請求都會打到數據庫
場景:如別人模仿id,然后發起大量請求,而這些id對應的數據redis中沒有,然后全跑去查庫,數據庫壓力就會增大,導致數據庫扛不住而崩掉
解決方式:
-
緩存空對象:就是緩存和數據庫中都沒有時,直接放個空對象到緩存中,并設置有效期即可
-
優點:實現簡單,維護方便
-
缺點:
- 額外的內存消耗
- 可能造成短期的不一致。一開始redis和數據庫都沒有,后面新增了數據,而此數據的id可能恰好對上,這樣redis中存的這id的數據還是空對象
-
-
布隆過濾:采用的是哈希思想來解決這個問題,通過一個龐大的二進制數組,用哈希思想去判斷當前這個要查詢的數據是否存在,如果布隆過濾器判斷存在,則放行,這個請求會去訪問redis,哪怕此時redis中的數據過期了,但是數據庫中一定存在這個數據,在數據庫中查詢出來這個數據后,再將其放入到redis中,假設布隆過濾器判斷這個數據不存在,則直接返回
-
優點:內存占用較少,沒有多余key
-
缺點:
-
實現復雜
-
存在誤判可能。布隆過濾器判斷存在,可數據庫中不一定真存在,因它采用的是哈希算法,就會產生哈希沖突
-
-
-
增加主鍵id的復雜度,從而提前做好基礎數據校驗
-
做用戶權限認證
-
做熱點參數限流
空對象和布隆過濾的架構如下:左為空對象,右為布隆過濾
緩存空對象示例:
@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result findShopById(Long id) {
String cacheKey = CACHE_SHOP_KEY + id;
// 查 redis
Map<Object, Object> shopMap = stringRedisTemplate.opsForHash().entries(cacheKey);
// 有則返回 同時需要看是否命中的是:空對象
if (!shopMap.isEmpty()) {
return Result.ok(JSONUtil.toJsonStr(shopMap));
}
// 無則查庫
Shop shop = getById(id);
// 庫中無
if (null == shop) {
// 向 redis 中放入 空對象,且設置有效期
Map<String, String> hashMap = new HashMap<>(16);
hashMap.put("", "");
stringRedisTemplate.opsForHash().putAll(cacheKey, hashMap);
// CACHE_NULL_TTL = 2L
stringRedisTemplate.expire(cacheKey, CACHE_NULL_TTL, TimeUnit.MINUTES);
return Result.fail("商鋪不存在");
}
// 庫中有 BeanUtil 使用的是hutool工具
// 這步意思:因為Shop實例類中字段類型不是均為String,因此需要將字段值轉成String,否則存入Redis時會發生 造型異常
Map<String, Object> shopMapData = BeanUtil.beanToMap(shop, new HashMap<>(16),
CopyOptions.create()
.ignoreNullValue()
.setIgnoreError(false)
.setFieldValueEditor((filedKey, filedValue) -> filedValue = filedValue + "")
);
// 寫入 redis
stringRedisTemplate.opsForHash().putAll(cacheKey, shopMapData);
// 設置有效期 CACHE_SHOP_TTL = 30L
stringRedisTemplate.expire(cacheKey, CACHE_SHOP_TTL, TimeUnit.MINUTES);
// 返回客戶端
return Result.ok(JSONUtil.toJsonStr(shop));
}
}
緩存雪崩及解決方式
緩存雪崩:指在同一時段大量的緩存key同時失效 或 Redis服務宕機,導致大量請求到達數據庫,帶來巨大壓力
解決方案:
- 給不同的Key的TTL添加隨機值
- 利用Redis集群提高服務的可用性
- 給緩存業務添加降級限流策略
- 給業務添加多級緩存
緩存擊穿及解決方式
緩存擊穿問題也叫熱點Key問題,就是一個被高并發訪問并且緩存重建業務較復雜的key突然失效了,無數的請求訪問會在瞬間給數據庫帶來巨大的沖擊
常見的解決方案有兩種:
- 互斥鎖
- 邏輯過期
互斥鎖 - 保一致
互斥鎖:保一致性,會讓線程阻塞,有死鎖風險
本質:利用了String的setnx指令;key不存在則添加,存在則不操作
示例:下列邏輯該封裝則封裝即可
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result queryShopById(Long id) {
String cacheKey = CACHE_SHOP_KEY + id;
// 查 redis
Map<Object, Object> shopMap = stringRedisTemplate.opsForHash().entries(cacheKey);
// redis 中有責返回
if (!shopMap.isEmpty()) {
Shop shop = BeanUtil.fillBeanWithMap(shopMap, new Shop(), false);
return Result.ok(shop);
}
Shop shop = null;
try {
// 無則獲取 互斥鎖
Boolean res = stringRedisTemplate
.opsForValue()
.setIfAbsent(LOCK_SHOP_KEY + id, UUID.randomUUID().toString(true), LOCK_SHOP_TTL, TimeUnit.MINUTES);
boolean flag = BooleanUtil.isTrue(res);
// 獲取失敗則等一會兒再試
if (!flag) {
Thread.sleep(20);
return queryShopById(id);
}
// 獲取鎖成功則查 redis 此時有沒有,從而減少緩存重建
Map<Object, Object> shopMa = stringRedisTemplate.opsForHash().entries(cacheKey);
// redis 中有責返回
if (!shopMa.isEmpty()) {
shop = BeanUtil.fillBeanWithMap(shopMa, new Shop(), false);
return Result.ok(shop);
}
// 有則返回,無則查庫
shop = getById(id);
// 庫中無
if (null == shop) {
// 向 redis放入 空值,并設置有效期
Map<String, String> hashMap = new HashMap<>(16);
hashMap.put("", "");
stringRedisTemplate.opsForHash().putAll(cacheKey, hashMap);
stringRedisTemplate.expire(cacheKey, 2L, TimeUnit.MINUTES);
return Result.fail("無此數據");
}
// 庫中有則寫入 redis,并設置有效期
Map<String, Object> sMap = BeanUtil.beanToMap(shop, new HashMap<>(16),
CopyOptions.create()
.ignoreNullValue()
.setIgnoreError(false)
.setFieldValueEditor((filedKey, filedValue) -> filedValue = filedValue + "")
);
stringRedisTemplate.opsForHash().putAll(cacheKey, sMap);
stringRedisTemplate.expire(cacheKey, CACHE_SHOP_TTL, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 釋放鎖
stringRedisTemplate.delete(LOCK_SHOP_KEY + id);
}
//返回客戶端
return Result.ok(shop);
}
}
邏輯過期 - 保性能
這玩意兒在互斥鎖的基礎上再變動一下即可
邏輯過期:不保一致性,性能好,有額外內存消耗,會造成短暫的數據不一致
本質:數據不過期,一直在Redis中,只是程序員自己使用過期字段和當前時間來判定是否過期,過期則獲取“互斥鎖”,獲取鎖成功(此時可以再判斷一下Redis中的數據是否過期,減少緩存重建),則開線程重建緩存即可
示例:
@Data
@Accessors(chain = true)
public class RedisData {
private LocalDateTime expireTime;
private Object data;
}
@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
@Resource
private StringRedisTemplate stringRedisTemplate;
private static final ExecutorService EXECUTORS = Executors.newFixedThreadPool(10);
@Override
public Result queryShopById(Long id) {
// 使用互斥鎖解決 緩存擊穿
// return cacheBreakDownWithMutex(id);
String cacheKey = CACHE_SHOP_KEY + id;
// 查 redis
String shopJson = stringRedisTemplate.opsForValue().get(cacheKey);
// redis 中沒有則報錯(理論上是一直存在redis中的,邏輯過期而已,所以這一步不用判斷都可以)
if (StrUtil.isBlank(shopJson)) {
return Result.fail("無此數據");
}
// redis 中有,則看是否過期
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
LocalDateTime expireTime = redisData.getExpireTime();
Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
// 沒過期,直接返回數據
if (expireTime.isAfter(LocalDateTime.now())) {
return Result.ok(shop);
}
try {
// 獲取互斥鎖 LOCK_SHOP_TTL = 10L
Boolean res = stringRedisTemplate
.opsForValue()
.setIfAbsent(LOCK_SHOP_KEY + id, UUID.randomUUID().toString(true),
LOCK_SHOP_TTL, TimeUnit.SECONDS);
boolean flag = BooleanUtil.isTrue(res);
// 獲取鎖失敗則瞇一會兒再嘗試
if (!flag) {
Thread.sleep(20);
return queryShopById(id);
}
// 獲取鎖成功
// 再看 redis 中的數據是否過期,減少緩存重建
shopJson = stringRedisTemplate.opsForValue().get(cacheKey);
redisData = JSONUtil.toBean(shopJson, RedisData.class);
expireTime = redisData.getExpireTime();
shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
// 已過期
if (expireTime.isBefore(LocalDateTime.now())) {
EXECUTORS.submit(() -> {
// 重建緩存
this.buildCache(id, 20L);
});
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 釋放鎖
stringRedisTemplate.delete(LOCK_SHOP_KEY + id);
}
// 返回客戶端
return Result.ok(shop);
}
/**
* 重建緩存
*/
public void buildCache(Long id, Long expireTime) {
String key = LOCK_SHOP_KEY + id;
// 重建緩存
Shop shop = getById(id);
if (null == shop) {
// 庫中沒有則放入 空對象
stringRedisTemplate.opsForValue().set(key, "", 10L, TimeUnit.SECONDS);
}
RedisData redisData = new RedisData();
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireTime))
.setData(shop);
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
}
}
簡單認識Lua腳本
Redis提供了Lua腳本功能,在一個腳本中編寫多條Redis命令,確保多條命令執行時的原子性
基本語法可以參考網站:https://www.runoob.com/lua/lua-tutorial.html
Redis提供的調用函數,語法如下:
redis.call('命令名稱', 'key', '其它參數', ...)
如:先執行set name Rose,再執行get name,則腳本如下:
# 先執行 set name jack
redis.call('set', 'name', 'Rose')
# 再執行 get name
local name = redis.call('get', 'name')
# 返回
return name
寫好腳本以后,需要用Redis命令來調用腳本,調用腳本的常見命令如下:
例如,我們要執行 redis.call('set', 'name', 'jack') 這個腳本,語法如下:
如果腳本中的key、value不想寫死,可以作為參數傳遞
key類型參數會放入KEYS數組,其它參數會放入ARGV數組,在腳本中可以從KEYS和ARGV數組獲取這些參數:
Java+Redis調用Lua腳本
RedisTemplate中,可以利用execute方法去執行lua腳本,參數對應關系就如下圖股
示例:
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
// 搞出腳本對象 DefaultRedisScript是RedisTemplate的實現類
UNLOCK_SCRIPT = new DefaultRedisScript<>();
// 腳本在哪個旮旯地方
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
// 返回值類型
UNLOCK_SCRIPT.setResultType(Long.class);
}
public void unlock() {
// 調用lua腳本
stringRedisTemplate.execute(
UNLOCK_SCRIPT, // lua腳本
Collections.singletonList(KEY_PREFIX + name), // 對應key參數的數值:KEYS數組
ID_PREFIX + Thread.currentThread().getId()); // 對應其他參數的數值:ARGV數組
}
Redisson
官網地址: https://redisson.org
GitHub地址: https://github.com/redisson/redisson
分布式鎖需要解決幾個問題:而下圖的問題可以通過Redisson這個現有框架解決
Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-Memory Data Grid)。它不僅提供了一系列的分布式的Java常用對象,還提供了許多分布式服務,其中就包含了各種分布式鎖的實現
使用Redisson
- 依賴2
<!-- 基本 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
<!-- Spring Boot整合的依賴 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.6</version>
</dependency>
- 創建redisson客戶端
YAML配置:常用參數戳這里
spring:
application:
name: springboot-redisson
redis:
redisson:
config: |
singleServerConfig:
password: "redis服務密碼"
address: "redis:/redis服務ip:6379"
database: 1
threads: 0
nettyThreads: 0
codec: !<org.redisson.codec.FstCodec> {}
transportMode: "NIO"
代碼配置
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.redisson.config.TransportMode;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.setTransportMode(TransportMode.NIO);
// 添加redis地址,這里添加了單點的地址,也可以使用 config.useClusterServers() 添加集群地址
config.useSingleServer()
.setAddress("redis://redis服務ip:6379")
.setPassword("redis密碼");
// 創建RedissonClient對象
return Redisson.create(config);
}
}
- 使用redisson客戶端
@Resource
private RedissionClient redissonClient;
@Test
void testRedisson() throws Exception{
// 獲取鎖(可重入),指定鎖的名稱
RLock lock = redissonClient.getLock("lockName");
/*
* 嘗試獲取鎖
*
* 參數分別是:獲取鎖的最大等待時間(期間會重試),鎖自動釋放時間,時間單位
*/
boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
// 判斷獲取鎖成功
if(isLock){
try{
System.out.println("執行業務");
}finally{
//釋放鎖
lock.unlock();
}
}
}
Redisson 可重入鎖原理
- 采用Hash結構
- key為鎖名
- field為線程標識
- value就是一個計數器count,同一個線程再來獲取鎖就讓此值 +1,同線程釋放一次鎖此值 -1
- PS:Java中使用的是state,C語言中用的是count,作用差不多
源碼在:lock.tryLock(waitTime, leaseTime, TimeUnit)中,leaseTime這個參數涉及到WatchDog機制,所以可以直接看 lock.tryLock(waitTime, TimeUnit) 這個的源碼
核心點在里面的lua腳本中:
"if (redis.call('exists', KEYS[1]) == 0) then " + -- KEYS[1] : 鎖名稱 判斷鎖是否存在
-- ARGV[2] = id + ":" + threadId 鎖的小key 充當 field
"redis.call('hset', KEYS[1], ARGV[2], 1); " + -- 當前這把鎖不存在則添加鎖,value=count=1 是hash結構
"redis.call('pexpire', KEYS[1], ARGV[1]); " + -- 并給此鎖設置有效期
"return nil; " + -- 獲取鎖成功,返回nil,即:null
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + -- 判斷 key+field 是否存在。即:判斷是否是同一線程來獲取鎖
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + -- 是自己,讓value +1
"redis.call('pexpire', KEYS[1], ARGV[1]); " + -- 給鎖重置有效期
"return nil; " + -- 成功,返回nil,即:null
"end; " +
"return redis.call('pttl', KEYS[1]);" -- 獲取鎖失敗(含失效),返回鎖的TTL有效期
Redission 鎖重試 和 WatchDog機制
看源碼時選擇:RedissonLock
鎖重試
這里的 tryLock(long waitTime, long leaseTime, TimeUnit unit)選擇的是帶參的,無參的 tryLock()是,默認不會重試的
public class RedissonLock extends RedissonExpirable implements RLock {
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// 將 waitTime 最大等待時間轉成 毫秒
long time = unit.toMillis(waitTime);
// 獲取此時的毫秒值
long current = System.currentTimeMillis();
// 獲取當前線程ID
long threadId = Thread.currentThread().getId();
// 搶鎖邏輯:涉及到WatchDog,待會兒再看
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired 表示在上一步 tryAcquire() 中搶鎖成功
if (ttl == null) {
return true;
}
// 有獲取當前時間
time -= System.currentTimeMillis() - current;
// 看執行上面的邏輯之后,是否超出了waitTime最大等待時間
if (time <= 0) {
// 超出waitTime最大等待時間,則獲取鎖失敗
acquireFailed(waitTime, unit, threadId);
return false;
}
// 再精確時間,看經過上面邏輯之后,是否超出waitTime最大等待時間
current = System.currentTimeMillis();
// 訂閱 發布邏輯是在 lock.unLock() 的邏輯中,里面有一個lua腳本,使用了 publiser 命令
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 若訂閱在waitTime最大等待時間內未完成,即超出waitTime最大等待時間
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
// 同時訂閱也未取消
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
// 則取消訂閱
unsubscribe(subscribeFuture, threadId);
}
});
}
// 訂閱在waitTime最大等待時間內未完成,則獲取鎖失敗
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
// 繼續精確時間,經過上面邏輯之后,是否超出waitTime最大等待時間
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 還在waitTime最大等待時間內 這里面就是重試的邏輯
while (true) {
long currentTime = System.currentTimeMillis();
// 搶鎖
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired 獲取鎖成功
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) { // 經過上面邏輯之后,時間已超出waitTime最大等待時間,則獲取鎖失敗
acquireFailed(waitTime, unit, threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
// 未失效 且 失效期還在 waitTime最大等待時間 以內
if (ttl >= 0 && ttl < time) {
// 同時該進程也未被中斷,則通過該信號量繼續獲取鎖
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
// 否則處于任務調度的目的,從而禁用當前線程,讓其處于休眠狀態
// 除非其他線程調用當前線程的 release方法 或 當前線程被中斷 或 waitTime已過
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
// 還未獲取鎖成功,那就真的是獲取鎖失敗了
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
// 取消訂閱
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
}
上面說到“訂閱”,“發布”的邏輯需要進入:lock.unlock();,和前面說的一樣,選擇:RedissonLock
import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
@SpringBootTest
class ApplicationTests {
@Resource
private RedissonClient redissonClient;
/**
* 偽代碼
*/
@Test
void buildCache() throws InterruptedException {
// 獲取 鎖名字
RLock lock = redissonClient.getLock("");
// 獲取鎖
boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
// 釋放鎖
lock.unlock();
}
}
public class RedissonLock extends RedissonExpirable implements RLock {
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(
getName(),
LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " + // 發布,從而在上面的 重試 中進行訂閱
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(
getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE,
internalLockLeaseTime, getLockName(threadId)
);
}
}
WatchDog 機制
上一節中有如下的代碼:進入 tryAcquire()
// 搶鎖邏輯:涉及到WatchDog,待會兒再看
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
public class RedissonLock extends RedissonExpirable implements RLock {
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
/**
* 異步獲取鎖
*/
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
// leaseTime到期時間 等于 -1 否,此值決定著是否開啟watchDog機制
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(
waitTime,
/*
* getLockWatchdogTimeout() 就是獲取 watchDog 時間,即:
* private long lockWatchdogTimeout = 30 * 1000;
*/
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS,
threadId,
RedisCommands.EVAL_LONG
);
// 上一步ttlRemainingFuture異步執行完時
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
// 若出現異常了,則說明獲取鎖失敗,直接滾犢子了
if (e != null) {
return;
}
// lock acquired 獲取鎖成功
if (ttlRemaining == null) {
// 過期了,則重置到期時間,進入這個方法瞄一下
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
/**
* 重新續約到期時間
*/
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
/*
* private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();
*
* getEntryName() 就是 this.entryName = id + ":" + name
* 而 this.id = commandExecutor.getConnectionManager().getId()
*
* putIfAbsent() key未有value值則進行關聯,相當于:
* if (!map.containsKey(key))
* return map.put(key, value);
* else
* return map.get(key);
*/
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
// 續訂到期 續約邏輯就在這里面
renewExpiration();
}
}
/**
* 續約
*/
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
/*
* 這里 newTimeout(new TimerTask(), 參數2, 參數3) 指的是:參數2,參數3去描述什么時候去做參數1的事情
* 這里的參數2:internalLockLeaseTime / 3 就是前面的 lockWatchdogTimeout = (30 * 1000) / 3 = 1000ms = 10s
*
* 鎖的失效時間是30s,當10s之后,此時這個timeTask 就觸發了,它就去進行續約,把當前這把鎖續約成30s,
* 如果操作成功,那么此時就會遞歸調用自己,再重新設置一個timeTask(),于是再過10s后又再設置一個timerTask,
* 完成不停的續約
*/
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// renewExpirationAsync() 里面就是一個lua腳本,腳本中使用 pexpire 指令重置失效時間,
// pexpire 此指令是以 毫秒 進行,expire是以 秒 進行
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself 遞歸此方法,從而完成不停的續約
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
}
Redis持久化
分為兩種:RDB和AOF
Redis的持久化雖然可以保證數據安全,但也會帶來很多額外的開銷,因此持久化請遵循下列建議:
- 用來做緩存的Redis實例盡量不要開啟持久化功能
- 建議關閉RDB持久化功能,使用AOF持久化
- 利用腳本定期在slave節點做RDB,實現數據備份
- 設置合理的rewrite閾值,避免頻繁的bgrewrite
- [不是絕對,依情況而行]配置no-appendfsync-on-rewrite = yes,禁止在rewrite / fork期間做aof,避免因AOF引起的阻塞
部署有關建議:
- Redis實例的物理機要預留足夠內存,應對fork和rewrite
- 單個Redis實例內存上限不要太大,如4G或8G??梢约涌靎ork的速度、減少主從同步、數據遷移壓力
- 不要與CPU密集型應用部署在一起。如:一臺虛擬機中部署了很多應用
- 不要與高硬盤負載應用一起部署。例如:數據庫、消息隊列.........,這些應用會不斷進行磁盤IO
RDB 持久化
RDB全稱Redis Database Backup file(Redis數據備份文件),也被叫做Redis數據快照。里面的內容是二進制。簡單來說就是把內存中的所有數據都記錄到磁盤中。當Redis實例故障重啟后,從磁盤讀取快照文件,恢復數據??煺瘴募Q為RDB文件,默認是保存在當前運行目錄(redis.conf的dir配置)
RDB持久化在四種情況下會執行::
- save命令:save命令會導致主進程執行RDB,這個過程中其它所有命令都會被阻塞。只有在數據遷移時可能用到。執行下面的命令,可以立即執行一次RDB
- bgsave命令:這個命令執行后會開啟獨立進程完成RDB,主進程可以持續處理用戶請求,不受影響。下面的命令可以異步執行RDB
- Redis停機時:Redis停機時會執行一次save命令,實現RDB持久化
- 觸發RDB條件:redis.conf文件中配置的條件滿足時就會觸發RDB,如下列樣式
# 900秒內,如果至少有1個key被修改,則執行bgsave,如果是save "" 則表示禁用RDB
save 900 1
save 300 10
save 60 10000
RDB的其它配置也可以在redis.conf文件中設置:
# 是否壓縮 ,建議不開啟,壓縮也會消耗cpu,磁盤的話不值錢
rdbcompression yes
# RDB文件名稱
dbfilename dump.rdb
# 文件保存的路徑目錄
dir ./
RDB的原理
RDB方式bgsave的基本流程?
- fork主進程得到一個子進程,共享內存空間
- 子進程讀取內存數據并寫入新的RDB文件
- 用新RDB文件替換舊的RDB文件
fork采用的是copy-on-write技術:
- 當主進程執行讀操作時,訪問共享內存;
- 當主進程執行寫操作時,則會拷貝一份數據,執行寫操作。
所以1可以得知:RDB的缺點
- RDB執行間隔時間長,兩次RDB之間寫入數據有丟失的風險
- fork子進程、壓縮、寫出RDB文件都比較耗時
AOF 持久化
AOF全稱為Append Only File(追加文件)。Redis處理的每一個寫命令都會記錄在AOF文件,可以看做是命令日志文件
OF默認是關閉的,需要修改redis.conf配置文件來開啟AOF:
# 是否開啟AOF功能,默認是no
appendonly yes
# AOF文件的名稱
appendfilename "appendonly.aof"
AOF的命令記錄的頻率也可以通過redis.conf文件來配:
# 表示每執行一次寫命令,立即記錄到AOF文件
appendfsync always
# 寫命令執行完先放入AOF緩沖區,然后表示每隔1秒將緩沖區數據寫到AOF文件,是默認方案
appendfsync everysec
# 寫命令執行完先放入AOF緩沖區,由操作系統決定何時將緩沖區內容寫回磁盤
appendfsync no
三種策略對比:
AOF文件重寫
因為是記錄命令,AOF文件會比RDB文件大的多。而且AOF會記錄對同一個key的多次寫操作,但只有最后一次寫操作才有意義。通過執行bgrewriteaof命令,可以讓AOF文件執行重寫功能,用最少的命令達到相同效果。
Redis也會在觸發閾值時自動去重寫AOF文件。閾值也可以在redis.conf中配置:
# AOF文件比上次文件 增長超過多少百分比則觸發重寫
auto-aof-rewrite-percentage 100
# AOF文件體積最小多大以上才觸發重寫
auto-aof-rewrite-min-size 64mb
主從數據同步原理
全量同步
完整流程描述:先說完整流程,然后再說怎么來的
- slave節點請求增量同步
- master節點判斷replid,發現不一致,拒絕增量同步
- master將完整內存數據生成RDB,發送RDB到slave
- slave清空本地數據,加載master的RDB
- master將RDB期間的命令記錄在repl_baklog,并持續將log中的命令發送給slave
- slave執行接收到的命令,保持與master之間的同步
主從第一次建立連接時,會執行全量同步,將master節點的所有數據都拷貝給slave節點,流程:
master如何得知salve是第一次來連接??
有兩個概念,可以作為判斷依據:
- Replication Id:簡稱replid,是數據集的標記,id一致則說明是同一數據集。每一個master都有唯一的replid,slave則會繼承master節點的replid
- offset:偏移量,隨著記錄在repl_baklog中的數據增多而逐漸增大。slave完成同步時也會記錄當前同步的offset。如果slave的offset小于master的offset,說明slave數據落后于master,需要更新
因此slave做數據同步,必須向master聲明自己的replication id 和 offset,master才可以判斷到底需要同步哪些數據
因為slave原本也是一個master,有自己的replid和offset,當第一次變成slave與master建立連接時,發送的replid和offset是自己的replid和offset
master判斷發現slave發送來的replid與自己的不一致,說明這是一個全新的slave,就知道要做全量同步了
master會將自己的replid和offset都發送給這個slave,slave保存這些信息。以后slave的replid就與master一致了。
因此,master判斷一個節點是否是第一次同步的依據,就是看replid是否一致。
如圖:
增量同步
全量同步需要先做RDB,然后將RDB文件通過網絡傳輸個slave,成本太高了。因此除了第一次做全量同步,其它大多數時候slave與master都是做增量同步
增量同步:就是只更新slave與master存在差異的部分數據
這種方式會出現失效的情況:原因就在repl_baklog中
repl_baklog原理
master怎么知道slave與自己的數據差異在哪里呢?
這就要說到全量同步時的repl_baklog文件了。
這個文件是一個固定大小的數組,只不過數組是環形,也就是說角標到達數組末尾后,會再次從0開始讀寫,這樣數組頭部的數據就會被覆蓋。
repl_baklog中會記錄Redis處理過的命令日志及offset,包括master當前的offset,和slave已經拷貝到的offset:
slave與master的offset之間的差異,就是salve需要增量拷貝的數據了。
隨著不斷有數據寫入,master的offset逐漸變大,slave也不斷的拷貝,追趕master的offset:
直到數組被填滿:
此時,如果有新的數據寫入,就會覆蓋數組中的舊數據。不過,舊的數據只要是綠色的,說明是已經被同步到slave的數據,即便被覆蓋了也沒什么影響。因為未同步的僅僅是紅色部分。
但是,如果slave出現網絡阻塞,導致master的offset遠遠超過了slave的offset:
如果master繼續寫入新數據,其offset就會覆蓋舊的數據,直到將slave現在的offset也覆蓋:
棕色框中的紅色部分,就是尚未同步,但是卻已經被覆蓋的數據。此時如果slave恢復,需要同步,卻發現自己的offset都沒有了,無法完成增量同步了。只能做全量同步。
主從同步優化
可以從以下幾個方面來優化Redis主從集群:
- 在master中配置repl-diskless-sync yes啟用無磁盤復制,避免全量同步時的磁盤IO。
- Redis單節點上的內存占用不要太大,減少RDB導致的過多磁盤IO
- 適當提高repl_baklog的大小,發現slave宕機時盡快實現故障恢復,盡可能避免全量同步
- 限制一個master上的slave節點數量,如果實在是太多slave,則可以采用主-從-從鏈式結構,減少master壓力
哨兵集群
Redis提供了哨兵(Sentinel)機制來實現主從集群的自動故障恢復
哨兵的作用
- 監控:Sentinel 會不斷檢查您的master和slave是否按預期工作
- 自動故障恢復:如果master故障,Sentinel會將一個slave提升為master。當故障實例恢復后也以新的master為主
- 通知:Sentinel充當Redis客戶端的服務發現來源,當集群發生故障轉移時,會將最新信息推送給Redis的客戶端
集群監控原理
Sentinel基于心跳機制監測服務狀態,每隔1秒向集群的每個實例發送ping命令:
- 主觀下線:如果某sentinel節點發現某實例未在規定時間響應,則認為該實例主觀下線
- 客觀下線:若超過指定數量(quorum)的sentinel都認為該實例主觀下線,則該實例客觀下線。quorum值最好超過Sentinel實例數量的一半
集群故障恢復原理
一旦發現master故障,sentinel需要在salve中選擇一個作為新的master,選擇依據是這樣的:
- 首先會判斷slave節點與master節點斷開時間長短,如果超過指定值(redis.conf配置的down-after-milliseconds * 10)則會排除該slave節點
- 然后判斷slave節點的slave-priority值,越小優先級越高,如果是0則永不參與選舉
- 若slave-prority一樣,則判斷slave節點的offset值,越大說明數據越新,優先級越高
- 若offset一樣,則最后判斷slave節點的運行id(redis開啟時都會分配一個)大小,越小優先級越高
當選出一個新的master后,該如何實現切換?流程如下:
- sentinel鏈接備選的slave節點,讓其執行 slaveof no one(翻譯:不要當奴隸) 命令,讓該節點成為master
- sentinel給所有其它slave發送
slaveof <newMasterIp> <newMasterPort>命令,讓這些slave成為新master的從節點,開始從新的master上同步數據 - 最后,sentinel將故障節點標記為slave,當故障節點恢復后會自動成為新的master的slave節點
搭建哨兵集群
- 創建目錄
# 進入/tmp目錄
cd /tmp
# 創建目錄
mkdir s1 s2 s3
- 在s1目錄中創建sentinel.conf文件,編輯如下內容
port 27001
sentinel announce-ip 192.168.150.101
sentinel monitor mymaster 192.168.150.101 7001 2
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 60000
dir "/tmp/s1"
-
port 27001:是當前sentinel實例的端口 -
sentinel monitor mymaster 192.168.150.101 7001 2:指定主節點信息-
mymaster:主節點名稱,自定義,任意寫 -
192.168.150.101 7001:主節點的ip和端口 -
2:選舉master時的quorum值
-
- 將s1/sentinel.conf文件拷貝到s2、s3兩個目錄中(在/tmp目錄執行下列命令):
# 方式一:逐個拷貝
cp s1/sentinel.conf s2
cp s1/sentinel.conf s3
# 方式二:管道組合命令,一鍵拷貝
echo s2 s3 | xargs -t -n 1 cp s1/sentinel.conf
- 修改s2、s3兩個文件夾內的配置文件,將端口分別修改為27002、27003:
sed -i -e 's/27001/27002/g' -e 's/s1/s2/g' s2/sentinel.conf
sed -i -e 's/27001/27003/g' -e 's/s1/s3/g' s3/sentinel.conf
- 啟動
# 第1個
redis-sentinel s1/sentinel.conf
# 第2個
redis-sentinel s2/sentinel.conf
# 第3個
redis-sentinel s3/sentinel.conf
RedisTemplate的哨兵模式
- 依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
- YAML配置哨兵集群
spring:
redis:
sentinel:
master: mymaster # 前面哨兵集群搭建時的名字
nodes:
- 192.168.150.101:27001 # 這里的ip:port是sentinel哨兵的,而不用關注哨兵監管下的那些節點
- 192.168.150.101:27002
- 192.168.150.101:27003
- 配置讀寫分離
@Bean
public LettuceClientConfigurationBuilderCustomizer clientConfigurationBuilderCustomizer(){
return clientConfigurationBuilder -> clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
}
這個bean中配置的就是讀寫策略,包括四種:
- MASTER:從主節點讀取
- MASTER_PREFERRED:優先從master節點讀取,master不可用才讀取replica
- REPLICA:從slave(replica)節點讀取
- REPLICA _PREFERRED:優先從slave(replica)節點讀取,所有的slave都不可用才讀取master
- 就可以在需要的地方正常使用RedisTemplate了,如前面玩的StringRedisTemplate
分片集群
主從和哨兵可以解決高可用、高并發讀的問題。但是依然有兩個問題沒有解決:
-
海量數據存儲問題
-
高并發寫的問題
分片集群可解決上述問題,分片集群特征:
-
集群中有多個master,每個master保存不同數據
-
每個master都可以有多個slave節點
-
master之間通過ping監測彼此健康狀態
-
客戶端請求可以訪問集群任意節點,最終都會被轉發到正確節點
搭建分片集群
- 創建目錄
# 進入/tmp目錄
cd /tmp
# 創建目錄
mkdir 7001 7002 7003 8001 8002 8003
- 在temp目錄下新建redis.conf文件,編輯內容如下:
port 6379
# 開啟集群功能
cluster-enabled yes
# 集群的配置文件名稱,不需要我們創建,由redis自己維護
cluster-config-file /tmp/6379/nodes.conf
# 節點心跳失敗的超時時間
cluster-node-timeout 5000
# 持久化文件存放目錄
dir /tmp/6379
# 綁定地址
bind 0.0.0.0
# 讓redis后臺運行
daemonize yes
# 注冊的實例ip
replica-announce-ip 192.168.146.100
# 保護模式
protected-mode no
# 數據庫數量
databases 1
# 日志
logfile /tmp/6379/run.log
- 將文件拷貝到每個目錄下
# 進入/tmp目錄
cd /tmp
# 執行拷貝
echo 7001 7002 7003 8001 8002 8003 | xargs -t -n 1 cp redis.conf
- 將每個目錄下的第redis.conf中的6379改為所在目錄一致
# 進入/tmp目錄
cd /tmp
# 修改配置文件
printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t sed -i 's/6379/{}/g' {}/redis.conf
- 啟動
# 進入/tmp目錄
cd /tmp
# 一鍵啟動所有服務
printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t redis-server {}/redis.conf
# 查看啟動狀態
ps -ef | grep redis
# 關閉所有進程
printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t redis-cli -p {} shutdown
- 創建集群:上一步啟動了redis,但這幾個redis之間還未建立聯系,以下命令要求redis版本大于等于5.0
redis-cli \
--cluster create \
--cluster-replicas 1 \
192.168.146.100:7001 \
192.168.146.100:7002 \
192.168.146.100:7003 \
192.168.146.100:8001 \
192.168.146.100:8002 \
192.168.146.100:8003
# 更多集群相關命令
redis-cli --cluster help
create host1:port1 ... hostN:portN
--cluster-replicas <arg>
check <host:port> or <host> <port> - separated by either colon or space
--cluster-search-multiple-owners
info <host:port> or <host> <port> - separated by either colon or space
fix <host:port> or <host> <port> - separated by either colon or space
--cluster-search-multiple-owners
--cluster-fix-with-unreachable-masters
reshard <host:port> or <host> <port> - separated by either colon or space
--cluster-from <arg>
--cluster-to <arg>
--cluster-slots <arg>
--cluster-yes
--cluster-timeout <arg>
--cluster-pipeline <arg>
--cluster-replace
rebalance <host:port> or <host> <port> - separated by either colon or space
--cluster-weight <node1=w1...nodeN=wN>
--cluster-use-empty-masters
--cluster-timeout <arg>
--cluster-simulate
--cluster-pipeline <arg>
--cluster-threshold <arg>
--cluster-replace
add-node new_host:new_port existing_host:existing_port
--cluster-slave
--cluster-master-id <arg>
del-node host:port node_id
call host:port command arg arg .. arg
--cluster-only-masters
--cluster-only-replicas
set-timeout host:port milliseconds
import host:port
--cluster-from <arg>
--cluster-from-user <arg>
--cluster-from-pass <arg>
--cluster-from-askpass
--cluster-copy
--cluster-replace
backup host:port backup_directory
-
redis-cli --cluster或者./redis-trib.rb:代表集群操作命令 -
create:代表是創建集群 -
--replicas 1或者--cluster-replicas 1:指定集群中每個master的副本個數為1,此時節點總數 ÷ (replicas + 1)得到的就是master的數量。因此節點列表中的前n個就是master,其它節點都是slave節點,隨機分配到不同master
- 查看集群狀態
redis-cli -p 7001 cluster nodes
- 命令行鏈接集群redis注意點
# 需要加上 -c 參數,否則進行寫操作時會報錯
redis-cli -c -p 7001
散列插槽
Redis會把每一個master節點映射到0~16383共16384個插槽(hash slot)上,查看集群信息時就能看到
數據key不是與節點綁定,而是與插槽綁定。redis會根據key的有效部分計算插槽值,分兩種情況:
- key中包含"{}",且“{}”中至少包含1個字符,“{}”中的部分是有效部分
- key中不包含“{}”,整個key都是有效部分
好處:節點掛了,但插槽還在,將插槽分配給健康的節點,那數據就恢復了
如:key是num,那么就根據num計算;如果是{name}num,則根據name計算。計算方式是利用CRC16算法得到一個hash值,然后對16384取余,得到的結果就是slot值
如上:在7001存入name,對name做hash運算,之后對16384取余,得到的5798就是name=zixq要存儲的位置,而5798在7002節點中,所以跳入了7002節點;而 set job java 也是同樣的道理
利用上述的原理可以做到:將同一類數據固定地保存在同一個Redis實例/節點(即:這一類數據使用相同的有效部分,如key都以{typeId}為前綴)
分片集群下的故障轉移
分片集群下,雖然沒有哨兵,但是也可以進行故障轉移
-
自動故障轉移:master掛了、選一個slave為主........,和前面玩過的主從一樣
-
手動故障轉移:后續新增的節點(此時是slave),性能比原有的節點(master)性能好,故而將新節點弄為master
# 在新增節點一方執行下列命令,就會讓此新增節點與其master節點身份對調
cluster failover
failover命令可以指定三種模式:
- 缺?。耗J的流程,如下圖1~6歩(推薦使用)
- force:省略了對offset的一致性校驗
- takeover:直接執行第5歩,忽略數據一致性、忽略master狀態和其它master的意見
RedisTemplate訪問分片集群
RedisTemplate底層同樣基于lettuce實現了分片集群的支持、所以和哨兵集群的RedisTemplate一樣,區別就是YAML配置
- 依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
- YAML配置分片集群
spring:
redis:
cluster:
nodes:
- 192.168.146.100:7001 # 和哨兵集群的區別:此集群沒有哨兵,這里使用的是分片集群的每個節點的ip:port
- 192.168.146.100:7002
- 192.168.146.100:7003
- 192.168.146.100:8001
- 192.168.146.100:8002
- 192.168.146.100:8003
- 配置讀寫分離
@Bean
public LettuceClientConfigurationBuilderCustomizer clientConfigurationBuilderCustomizer(){
return clientConfigurationBuilder -> clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
}
這個bean中配置的就是讀寫策略,包括四種:
- MASTER:從主節點讀取
- MASTER_PREFERRED:優先從master節點讀取,master不可用才讀取replica
- REPLICA:從slave(replica)節點讀取
- REPLICA _PREFERRED:優先從slave(replica)節點讀取,所有的slave都不可用才讀取master
- 就可以在需要的地方正常使用RedisTemplate了,如前面玩的StringRedisTemplate
集群伴隨的問題
集群雖然具備高可用特性,能實現自動故障恢復,但是如果使用不當,也會存在一些問題
- 集群完整性問題:在Redis的默認配置中,如果發現任意一個插槽不可用,則整個集群都會停止對外服務,即就算有一個slot不能用了,那么集群也會不可用,像什么set、get......命令也用不了了,因此開發中,最重要的是可用性,因此修改 redis.conf文件中的內容
# 集群全覆蓋 默認值是 yes,改為no即可
cluster-require-full-coverage no
- 集群帶寬問題:集群節點之間會不斷的互相Ping來確定集群中其它節點的狀態。每次Ping攜帶的信息至少包括
- 插槽信息
- 集群狀態信息
集群中節點越多,集群狀態信息數據量也越大,10個節點的相關信息可能達到1kb,此時每次集群互通需要的帶寬會非常高,這樣會導致集群中大量的帶寬都會被ping信息所占用,這是一個非??膳碌膯栴},所以我們需要去解決這樣的問題
解決途徑:
- 避免大集群,集群節點數不要太多,最好少于1000,如果業務龐大,則建立多個集群。
- 避免在單個物理機中運行太多Redis實例
- 配置合適的cluster-node-timeout值
-
lua和事務的問題:這兩個都是為了保證原子性,這就要求執行的所有key都落在一個節點上,而集群則會破壞這一點,集群中無法保證lua和事務問題
-
數據傾斜問題:因為hash_tag(散列插槽中說的hash算的slot值)落在一個節點上,就導致大量數據都在一個redis節點中了
-
集群和主從選擇問題:單體Redis(主從Redis+哨兵)已經能達到萬級別的QPS,并且也具備很強的高可用特性。如果主從能滿足業務需求的情況下,若非萬不得已的情況下,盡量不搭建Redis集群
批處理
單機redis批處理:mxxx命令 與 Pipeline
Mxxx雖然可以批處理,但是卻只能操作部分數據類型(String是mset、Hash是hmset、Set是sadd key member.......),因此如果有對復雜數據類型的批處理需要,建議使用Pipeline
注意點:mxxx命令可以保證原子性,一堆命令一起執行;而Pipeline是非原子性的,這是將一堆命令一起發給redis服務器,然后把這些命令放入服務器的一個隊列中,最后慢慢執行而已,因此執行命令不一定是一起執行的
@Test
void testPipeline() {
// 創建管道 Pipeline 此對象基本上可以進行所有的redis操作,如:pipeline.set()、pipeline.hset().....
Pipeline pipeline = jedis.pipelined();
for (int i = 1; i <= 100000; i++) {
// 放入命令到管道
pipeline.set("test:key_" + i, "value_" + i);
if (i % 1000 == 0) {
// 每放入1000條命令,批量執行
pipeline.sync();
}
}
}
集群redis批處理
MSET或Pipeline這樣的批處理需要在一次請求中攜帶多條命令,而此時如果Redis是一個集群,那批處理命令的多個key必須落在一個插槽中,否則就會導致執行失敗。這樣的要求很難實現,因為我們在批處理時,可能一次要插入很多條數據,這些數據很有可能不會都落在相同的節點上,這就會導致報錯了
解決方式:hash_tag就是前面散列插槽中說的算插槽的hash值
在jedis中,對于集群下的批處理并沒有解決,因此一旦使用jedis來操作redis,那么就需要我們自己來實現集群的批處理邏輯,一般選擇串行slot或并行slot即可
而在Spring中是解決了集群下的批處理問題的
@Test
void testMSetInCluster() {
Map<String, String> map = new HashMap<>(4);
map.put("name", "Rose");
map.put("age", "21");
map.put("sex", "Female");
stringRedisTemplate.opsForValue().multiSet(map);
List<String> strings = stringRedisTemplate
.opsForValue()
.multiGet(Arrays.asList("name", "age", "sex"));
strings.forEach(System.out::println);
}
原理:使用jedis操作時,要編寫集群的批處理邏輯可以借鑒
在RedisAdvancedClusterAsyncCommandsImpl 類中
首先根據slotHash算出來一個partitioned的map,map中的key就是slot,而他的value就是對應的對應相同slot的key對應的數據
通過 RedisFuture<String> mset = super.mset(op); 進行異步的消息發送
public class RedisAdvancedClusterAsyncCommandsImpl {
@Override
public RedisFuture<String> mset(Map<K, V> map) {
// 算key的slot值,然后key相同的分在一組
Map<Integer, List<K>> partitioned = SlotHash.partition(codec, map.keySet());
if (partitioned.size() < 2) {
return super.mset(map);
}
Map<Integer, RedisFuture<String>> executions = new HashMap<>();
for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
Map<K, V> op = new HashMap<>();
entry.getValue().forEach(k -> op.put(k, map.get(k)));
// 異步發送:即mset()中的邏輯就是并行slot方式
RedisFuture<String> mset = super.mset(op);
executions.put(entry.getKey(), mset);
}
return MultiNodeExecution.firstOfAsync(executions);
}
}
慢查詢
Redis執行時耗時超過某個閾值的命令,稱為慢查詢
慢查詢的危害:由于Redis是單線程的,所以當客戶端發出指令后,他們都會進入到redis底層的queue來執行,如果此時有一些慢查詢的數據,就會導致大量請求阻塞,從而引起報錯,所以我們需要解決慢查詢問題
慢查詢的閾值可以通過配置指定:
slowlog-log-slower-than:慢查詢閾值,單位是微秒。默認是10000,建議1000
慢查詢會被放入慢查詢日志中,日志的長度有上限,可以通過配置指定:
slowlog-max-len:慢查詢日志(本質是一個隊列)的長度。默認是128,建議1000
- 臨時配置
config set slowlog-log-slower-than 1000 # 臨時配置:慢查詢閾值,重啟redis則失效
config set slowlog-max-len 1000 # 慢查詢日志長度
- 永久配置:在redis.conf文件中添加相應內容即可
# 慢查詢閾值
slowlog-log-slower-than 1000
# 慢查詢日志長度
slowlog-max-len 1000
查看慢查詢
- 命令方式
slowlog len # 查詢慢查詢日志長度
slowlog get [n] # 讀取n條慢查詢日志
slowlog reset # 清空慢查詢列表
- 客戶端工具:不同客戶端操作不一樣
內存配置
當Redis內存不足時,可能導致Key頻繁被刪除、響應時間變長、QPS不穩定等問題。當內存使用率達到90%以上時就需要我們警惕,并快速定位到內存占用的原因
redis中的內存劃分:
| 內存占用 | 說明 | 備注 |
|---|---|---|
| 數據內存 | 是Redis最主要的部分,存儲Redis的鍵值信息。主要問題是BigKey問題、內存碎片問題 | 內存碎片問題:Redis底層分配并不是這個key有多大,他就會分配多大,而是有他自己的分配策略,比如8,16,20等等,假定當前key只需要10個字節,此時分配8肯定不夠,那么他就會分配16個字節,多出來的6個字節就不能被使用,這就是我們常說的 碎片問題。這種一般重啟redis就解決了 |
| 進程內存 | Redis主進程本身運?肯定需要占?內存,如代碼、常量池等等;這部分內存?約?兆,在?多數?產環境中與Redis數據占?的內存相?可以忽略 | 這部分內存一般都可以忽略不計 |
| 緩沖區內存 | 一般包括客戶端緩沖區、AOF緩沖區、復制緩沖區等??蛻舳司彌_區又包括輸入緩沖區和輸出緩沖區兩種。這部分內存占用波動較大,不當使用BigKey,可能導致內存溢出 | 一般包括客戶端緩沖區、AOF緩沖區、復制緩沖區等。客戶端緩沖區又包括輸入緩沖區和輸出緩沖區兩種。這部分內存占用波動較大,所以這片內存也是需要重點分析的內存問題 |
查看內存情況
- 查看內存分配的情況
# 要查看info自己看哪些東西,直接輸入 info 回車即可看到
info memory
# 示例
127.0.0.1:7001> INFO memory
# Memory
used_memory:2353272
used_memory_human:2.24M
used_memory_rss:9281536
used_memory_rss_human:8.85M
used_memory_peak:2508864
used_memory_peak_human:2.39M
used_memory_peak_perc:93.80%
used_memory_overhead:1775724
used_memory_startup:1576432
used_memory_dataset:577548
used_memory_dataset_perc:74.35%
allocator_allocated:2432096
allocator_active:2854912
allocator_resident:5439488
total_system_memory:1907740672
total_system_memory_human:1.78G
used_memory_lua:31744
used_memory_vm_eval:31744
used_memory_lua_human:31.00K
used_memory_scripts_eval:0
number_of_cached_scripts:0
number_of_functions:0
number_of_libraries:0
used_memory_vm_functions:32768
used_memory_vm_total:64512
used_memory_vm_total_human:63.00K
used_memory_functions:184
used_memory_scripts:184
used_memory_scripts_human:184B
maxmemory:0
maxmemory_human:0B
maxmemory_policy:noeviction
allocator_frag_ratio:1.17
allocator_frag_bytes:422816
allocator_rss_ratio:1.91
allocator_rss_bytes:2584576
rss_overhead_ratio:1.71
rss_overhead_bytes:3842048
mem_fragmentation_ratio:3.95
mem_fragmentation_bytes:6930528
mem_not_counted_for_evict:0
mem_replication_backlog:184540
mem_total_replication_buffers:184536
mem_clients_slaves:0
mem_clients_normal:3600
mem_cluster_links:10880
mem_aof_buffer:0
mem_allocator:jemalloc-5.2.1
active_defrag_running:0
lazyfree_pending_objects:0
lazyfreed_objects:0
- 查看key的主要占用情況
memory xxx
# 用法查詢
127.0.0.1:7001> help MEMORY
MEMORY
summary: A container for memory diagnostics commands
since: 4.0.0
group: server
MEMORY DOCTOR
summary: Outputs memory problems report
since: 4.0.0
group: server
MEMORY HELP
summary: Show helpful text about the different subcommands
since: 4.0.0
group: server
MEMORY MALLOC-STATS
summary: Show allocator internal stats
since: 4.0.0
group: server
MEMORY PURGE
summary: Ask the allocator to release memory
since: 4.0.0
group: server
MEMORY STATS
summary: Show memory usage details
since: 4.0.0
group: server
MEMORY USAGE key [SAMPLES count]
summary: Estimate the memory usage of a key
since: 4.0.0
group: server
MEMORY STATS查詢結果解讀
127.0.0.1:7001> MEMORY STATS
1) "peak.allocated" # redis進程自啟動以來消耗內存的峰值
2) (integer) 2508864
3) "total.allocated" # redis使用其分配器分配的總字節數,即當前的總內存使用量
4) (integer) 2353152
5) "startup.allocated" # redis啟動時消耗的初始內存量,即redis啟動時申請的內存大小
6) (integer) 1576432
7) "replication.backlog" # 復制積壓緩存區的大小
8) (integer) 184540
9) "clients.slaves" # 主從復制中所有從節點的讀寫緩沖區大小
10) (integer) 0
11) "clients.normal" # 除從節點外,所有其他客戶端的讀寫緩沖區大小
12) (integer) 3600
13) "cluster.links" #
14) (integer) 10880
15) "aof.buffer" # AOF持久化使用的緩存和AOF重寫時產生的緩存
16) (integer) 0
17) "lua.caches" #
18) (integer) 0
19) "functions.caches" #
20) (integer) 184
21) "db.0" # 業務數據庫的數量
22) 1) "overhead.hashtable.main" # 當前數據庫的hash鏈表開銷內存總和,即元數據內存
2) (integer) 72
3) "overhead.hashtable.expires" # 用于存儲key的過期時間所消耗的內存
4) (integer) 0
5) "overhead.hashtable.slot-to-keys" #
6) (integer) 16
23) "overhead.total" # 數值 = startup.allocated + replication.backlog + clients.slaves + clients.normal + aof.buffer + db.X
24) (integer) 1775724
25) "keys.count" # 當前redis實例的key總數
26) (integer) 1
27) "keys.bytes-per-key" # 當前redis實例每個key的平均大小。計算公式:(total.allocated - startup.allocated) / keys.count
28) (integer) 776720
29) "dataset.bytes" # 純業務數據占用的內存大小
30) (integer) 577428
31) "dataset.percentage" # 純業務數據占用的第內存比例。計算公式:dataset.bytes * 100 / (total.allocated - startup.allocated)
32) "74.341850280761719"
33) "peak.percentage" # 當前總內存與歷史峰值的比例。計算公式:total.allocated * 100 / peak.allocated
34) "93.793525695800781"
35) "allocator.allocated" #
36) (integer) 2459024
37) "allocator.active" #
38) (integer) 2891776
39) "allocator.resident" #
40) (integer) 5476352
41) "allocator-fragmentation.ratio" #
42) "1.1759852170944214"
43) "allocator-fragmentation.bytes" #
44) (integer) 432752
45) "allocator-rss.ratio" #
46) "1.8937677145004272"
47) "allocator-rss.bytes" #
48) (integer) 2584576
49) "rss-overhead.ratio" #
50) "1.6851159334182739"
51) "rss-overhead.bytes" #
52) (integer) 3751936
53) "fragmentation" # 內存的碎片率
54) "3.9458441734313965"
55) "fragmentation.bytes" # 內存碎片所占字節大小
56) (integer) 6889552
- 客戶端鏈接工具查看:不同redis客戶端鏈接工具不一樣,操作不一樣
解決內存問題
由前面鋪墊得知:內存緩沖區常見的有三種
- 復制緩沖區:主從復制的repl_backlog_buf,如果太小可能導致頻繁的全量復制,影響性能。通過replbacklog-size來設置,默認1mb
- AOF緩沖區:AOF刷盤之前的緩存區域,AOF執行rewrite的緩沖區。無法設置容量上限
- 客戶端緩沖區:分為輸入緩沖區和輸出緩沖區,輸入緩沖區最大1G且不能設置。輸出緩沖區可以設置
其他的基本上可以忽略,最關鍵的其實是客戶端緩沖區的問題:
客戶端緩沖區:指的就是我們發送命令時,客戶端用來緩存命令的一個緩沖區,也就是我們向redis輸入數據的輸入端緩沖區和redis向客戶端返回數據的響應緩存區
輸入緩沖區最大1G且不能設置,所以這一塊我們根本不用擔心,如果超過了這個空間,redis會直接斷開,因為本來此時此刻就代表著redis處理不過來了,我們需要擔心的就是輸出端緩沖區
我們在使用redis過程中,處理大量的big value,那么會導致我們的輸出結果過多,如果輸出緩存區過大,會導致redis直接斷開,而默認配置的情況下, 其實它是沒有大小的,這就比較坑了,內存可能一下子被占滿,會直接導致咱們的redis斷開,所以解決方案有兩個
- 設置一個大小
- 增加我們帶寬的大小,避免我們出現大量數據從而直接超過了redis的承受能力
原理篇
涉及的Redis源碼采用的版本為redis-6.2.6
Redis數據結構:SDS
Redis是C語言實現的,但C語言本身的字符串有缺陷,因此Redis就自己弄了一個字符串SDS(Simple Dynamic String 簡單動態字符串)
C語言本身的字符串缺陷如下:
- 二級制不安全
C 語?的字符串其實就是?個字符數組,即數組中每個元素是字符串中的?個字符
奇為什么最后?個字符是“\0”?
在 C 語??,對字符串操作時,char * 指針只是指向字符數組的起始位置,?字符數組的結尾位置就?“\0”表示,意思是指字符串的結束
所以,C 語?標準庫中的字符串操作函數就通過判斷字符是不是 “\0” 來決定要不要停?操作,如果當前字符不是 “\0” ,說明字符串還沒結束,可以繼續操作,如果當前字符是 “\0” 是則說明字符串結束了,就要停?操作
因此,C 語?字符串? “\0” 字符作為結尾標記有個缺陷。假設有個字符串中有個 “\0” 字符,這時在操作這個字符串時就會提早結束
故C語言字符串缺陷:字符串??不能含有 “\0” 字符,否則最先被程序讀?的 “\0” 字符將被誤認為是字符串結尾,這個限制使得 C 語?的字符串只能保存?本數據,不能保存像圖?、?頻、視頻?化這樣的?進制數據;同時C 語?獲取字符串?度的時間復雜度是 O(N)
- 字符串操作函數不?效且不安全(可能導致發?緩沖區溢出)
舉個例?,strcat 函數是可以將兩個字符串拼接在?起
// 將 src 字符串拼接到 dest 字符串后?
char *strcat(char *dest, const char* src);
strcat 函數和 strlen 函數類似,時間復雜度也很?,也都需要先通過遍歷字符串才能得到?標字符串的末尾。對于 strcat 函數來說,還要再遍歷源字符串才能完成追加,對字符串的操作效率不?
C 語?的字符串是不會記錄?身的緩沖區??的,所以 strcat 函數假定程序員在執?這個函數時,已經為dest 分配了?夠多的內存,可以容納 src 字符串中的所有內容,??旦這個假定不成?,就會發?緩沖區溢出將可能會造成程序運?終?
SDS 結構體
Redis源碼:官網下載redis x.x.x.tar.gz,然后解壓,src目錄下即為源碼所在
Redis中SDS是一個結構體,源碼如下:
-
len,記錄了字符串?度。這樣獲取字符串?度的時候,只需要返回這個成員變量值就?,時間復雜度只需要 O(1)
-
alloc,分配給字符數組的空間?度。這樣在修改字符串的時候,可以通過 alloc - len 計算出剩余的空間??,可以?來判斷空間是否滿?修改需求,如果不滿?的話,就會?動將 SDS 的空間擴展?執?修改所需的??,然后才執?實際的修改操作,所以使? SDS 既不需要?動修改 SDS 的空間??,也不會出現前?所說的緩沖區溢出的問題。
-
flags,?來表示不同類型的 SDS。?共設計了 5 種類型,分別是 sdshdr5、sdshdr8、sdshdr16、sdshdr32 和 sdshdr64
-
buf[],字符數組,?來保存實際數據。不僅可以保存字符串,也可以保存?進制數據
- 解決C語言原來字符串效率不高問題
Redis 的 SDS 結構因為加?了 len 成員變量,那么獲取字符串?度的時候,直接返回這個成員變量的值就?,所以復雜度只有 O(1)
- 解決二進制不安全問題
SDS 不需要? “\0” 字符來標識字符串結尾了,?是有個專?的 len 成員變量來記錄?度,所以可存儲包含 “\0” 的數據。但是 SDS 為了兼容部分 C 語?標準庫的函數, SDS 字符串結尾還是會加上 “\0” 字符
因此, SDS 的 API 都是以處理?進制的?式來處理 SDS 存放在 buf[] ?的數據,程序不會對其中的數據做任何限制,數據寫?的時候時什么樣的,它被讀取時就是什么樣的
通過使??進制安全的 SDS,?不是 C 字符串,使得 Redis 不僅可以保存?本數據,也可以保存任意格式的?進制數據
- 解決緩沖區可能溢出問題
Redis 的 SDS 結構?引?了 alloc 和 len 成員變量,這樣 SDS API 通過 alloc - len 計算,可以算出剩余可?的空間??,這樣在對字符串做修改操作的時候,就可以由程序內部判斷緩沖區??是否?夠?
?且,當判斷出緩沖區??不夠?時,Redis 會?動將擴? SDS 的空間??,擴容方式如下:
- 若新字符串小于1M,則新空間為擴展后字符串長度的兩倍 +1
PS:+ 1的原因是“\0”字符
- 若新字符串大于1M,則新空間為擴展后字符串長度 +1M +1。稱為內存預分配
PS:內存預分配好處,下次在操作 SDS 時,如果 SDS 空間夠的話,API 就會直接使?「未使?空間」,??須執?內存分配,有效的減少內存分配次數
- 節省內存空間
SDS 結構中有個 flags 成員變量,表示的是 SDS 類型
Redos ?共設計了 5 種類型,分別是 sdshdr5、sdshdr8、sdshdr16、sdshdr32 和 sdshdr64
這 5 種類型的主要區別就在于,它們數據結構中的 len 和 alloc 成員變量的數據類型不同
如 sdshdr16 和 sdshdr32 這兩個類型,它們的定義分別如下:
struct __attribute__ ((__packed__)) sdshdr16 {
uint16_t len;
uint16_t alloc;
unsigned char flags;
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr32 {
uint32_t len;
uint32_t alloc;
unsigned char flags;
char buf[];
};
可以看到:
-
sdshdr16 類型的 len 和 alloc 的數據類型都是 uint16_t,表示字符數組?度和分配空間??不能超過2 的 16 次?
-
sdshdr32 則都是 uint32_t,表示表示字符數組?度和分配空間??不能超過 2 的 32 次?
之所以 SDS 設計不同類型的結構體,是為了能靈活保存不同??的字符串,從?有效節省內存空間。如,在保存?字符串時,結構頭占?空間也?較少
除了設計不同類型的結構體,Redis 在編程上還使?了專?的編譯優化來節省內存空間,即在 struct 聲明了 __attribute__ ((packed)) ,它的作?是:告訴編譯器取消結構體在編譯過程中的優化對?,按照實際占?字節數進?對?
?如,sdshdr16 類型的 SDS,默認情況下,編譯器會按照 16 字節對?的?式給變量分配內存,這意味著,即使?個變量的??不到 16 個字節,編譯器也會給它分配 16 個字節
舉個例?,假設下?這個結構體,它有兩個成員變量,類型分別是 char 和 int,如下所示:
#include <stdio.h>
struct test1 {
char a;
int b;
} test1;
int main() {
printf("%lu\n", sizeof(test1));
return 0;
}
默認情況下,這個結構體??計算出來就會是 8
這是因為默認情況下,編譯器是使?「字節對?」的?式分配內存,雖然 char 類型只占?個字節,但是由于成員變量?有 int 類型,它占?了 4 個字節,所以在成員變量為 char 類型分配內存時,會分配 4 個字節,其中這多余的 3 個字節是為了字節對??分配的,相當于有 3 個字節被浪費掉了。
如果不想編譯器使?字節對?的?式進?分配內存,可以采?了 __attribute__ ((packed)) 屬性定義結構體,這樣?來,結構體實際占?多少內存空間,編譯器就分配多少空間
Redis數據結構:intset
IntSet是Redis中set集合的一種實現方式,基于整數數組來實現,并且如下特征:
- Redis會確保Intset中的元素唯一、有序
- 具備類型升級機制,可以節省內存空間
- 底層采用二分查找方式來查詢
Redis中intset的結構體源碼如下:
其中的encoding包含三種模式,表示存儲的整數大小不同:
為了方便查找,Redis會將intset中所有的整數按照升序依次保存在contents數組中,結構如圖:
現在,數組中每個數字都在int16_t的范圍內,因此采用的編碼方式是INTSET_ENC_INT16,每部分占用的字節大小為:
- encoding:4字節
- length:4字節
- contents:2字節 * 3 = 6字節
假如,現在向其中添加一個數字:50000,這個數字超出了int16_t的范圍,intset會自動升級編碼方式到合適的大小
以當前案例來說流程如下:
- 升級編碼為INTSET_ENC_INT32, 每個整數占4字節,并按照新的編碼方式及元素個數擴容數組
- 倒序依次將數組中的元素拷貝到擴容后的正確位置。PS:倒序保證數據不亂
- 將待添加的元素放入數組末尾
- 最后,將inset的encoding屬性改為INTSET_ENC_INT32,將length屬性改為4
上述邏輯的源碼如下:
- 問題:intset支持降級操作嗎?
不?持降級操作,?旦對數組進?了升級,就會?直保持升級后的狀態。如:前面已經從INTSET_ENC_INT16(2字節整數)升級到INTSET_ENC_INT32(4字節整數),就算刪除50000元素,intset集合的類型也還是INTSET_ENC_INT32類型,不會降級為INTSET_ENC_INT16類型
Redis數據結構:Dict
Redis是一個鍵值型(Key-Value Pair)的數據庫,我們可以根據鍵實現快速的增刪改查。而鍵與值的映射關系正是通過Dict來實現的。
Dict由三部分組成,分別是:哈希表(DictHashTable)、哈希節點(DictEntry)、字典(Dict)
哈希表(Dictht)與哈希節點(DictEntry)
- 哈希節點(DictEntry)
-
結構?不僅包含指向鍵和值的指針,還包含了指向下?個哈希表節點的指針,這個指針可以將多個哈希值相同的鍵值對鏈接起來,以此來解決哈希沖突的問題,這就是鏈式哈希
-
dictEntry 結構?鍵值對中的值是?個「聯合體 v」定義的,因此,鍵值對中的值可以是?個指向實際值的指針,或者是?個?符號的 64 位整數或有符號的 64 位整數或double 類的值。這么做的好處是可以節省內存空間,因為當「值」是整數或浮點數時,就可以將值的數據內嵌在 dictEntry
結構?,?需再??個指針指向實際的值,從?節省了內存空間
- 哈希表(Dictht)
其中:
- 哈希表大小size初始值為4,而且其值總等于2^n
- 為什么sizemask = size -1?
因為size總等于2^n,所以size -1就為奇數,這樣”key利用hash函數得到的哈希值h & sizemask”做與運算就正好得到的是size的低位,而這個低位值也正好和“哈希值 % 哈希表??”取模一樣
當我們向Dict添加鍵值對時,Redis首先根據key計算出hash值(h),然后利用 h & sizemask 來計算元素應該存儲到數組中的哪個索引位置
假如存儲k1=v1,假設k1的哈希值h =1,則1&3 =1,因此k1=v1要存儲到數組角標1位置
假如此時又添加了一個k2=v2,那么就會添加到鏈表的隊首
哈希沖突解決方式:鏈式哈希
就是每個哈希表節點都有?個 next 指針,?于指向下?個哈希表節點,因此多個哈希表節點可以? next 指針構成?個單項鏈表,被分配到同?個哈希桶上的多個節點可以?這個單項鏈表連接起來
不過,鏈式哈希局限性也很明顯,隨著鏈表?度的增加,在查詢這?位置上的數據的耗時就會增加,畢竟鏈表的查詢的時間復雜度是 O(n),需要解決就得擴容
Dict的擴容與收縮
擴容
Dict中的HashTable就是數組結合單向鏈表的實現,當集合中元素較多時,必然導致哈希沖突增多,鏈表過長,則查詢效率會大大降低
Dict在每次新增鍵值對時都會檢查負載因子(LoadFactor = used/size,即負載因子=哈希表已保存節點數量 / 哈希表大?。?/strong> ,滿足以下兩種情況時會觸發哈希表擴容:
- 哈希表的 LoadFactor >= 1;并且服務器沒有執行 bgsave或者 bgrewiteaof 等后臺進程。也就是沒有執? RDB 快照或沒有進? AOF 重寫的時候
- 哈希表的 LoadFactor > 5 ;此時說明哈希沖突?常嚴重了,不管有沒有有在執? RDB 快照或 AOF重寫都會強制執行哈希擴容
源碼邏輯如下:
收縮
Dict除了擴容以外,每次刪除元素時,也會對負載因子做檢查,當LoadFactor < 0.1 時,會做哈希表收縮
字典(Dict)
Redis 定義?個 dict 結構體,這個結構體?定義了兩個哈希表(dictht ht[2])
在正常服務請求階段,插?的數據,都會寫?到「哈希表 1」,此時的「哈希表 2 」 并沒有被分配空間(這個哈希表涉及到漸進式rehash)
Dict的漸進式rehash
rehash基本流程
不管是擴容還是收縮,必定會創建新的哈希表,導致哈希表的size和sizemask(sizemask = size -1)變化,而key的查詢與sizemask有關(key通過hash函數計算得到哈希值h,數據存儲的角標值 = h & sizemask)。因此必須對哈希表中的每一個key重新計算索引,插入新的哈希表,這個過程稱為rehash
過程如下:
- 計算新hash表的realeSize,值取決于當前要做的是擴容還是收縮:
- 如果是擴容,則新size為第一個大于等于dict.ht[0].used + 1的2^n
- 如果是收縮,則新size為第一個大于等于dict.ht[0].used的2^n (不得小于4)
- 按照新的realeSize申請內存空間,創建dictht,并賦值給dict.ht[1]
- 設置dict.rehashidx = 0,標示開始rehash
- 將dict.ht[0]中的每一個dictEntry都rehash到dict.ht[1]
- 將dict.ht[1]賦值給dict.ht[0],給dict.ht[1]初始化為空哈希表,釋放原來的dict.ht[0]的內存
以上過程對于小數據影響小,但是對于大數據來說就有問題了,如果「哈希表 1 」的數據量?常?,那么在遷移?「哈希表 2 」的時候,因為會涉及?量的數據拷?,此時可能會對 Redis 造成阻塞,?法服務其他請求,因此就需要漸進式rehash
漸進式rehash
為了避免 rehash 在數據遷移過程中,因拷?數據的耗時,影響 Redis 性能的情況,所以 Redis 采?了漸進式 rehash,也就是將數據的遷移的?作不再是?次性遷移完成,?是分多次遷移
Dict的rehash并不是一次性完成的。試想一下,如果Dict中包含數百萬的entry,要在一次rehash完成,極有可能導致主線程阻塞。所以Dict的rehash是分多次、漸進式的完成,因此稱為漸進式rehash。過程如下:
- 計算新hash表的realeSize,值取決于當前要做的是擴容還是收縮:
- 如果是擴容,則新size為第一個大于等于dict.ht[0].used + 1的2^n
- 如果是收縮,則新size為第一個大于等于dict.ht[0].used的2^n (不得小于4)
-
按照新的realeSize申請內存空間,創建dictht,并賦值給dict.ht[1]
-
設置dict.rehashidx = 0,標示開始rehash
-
將dict.ht[0]中的每一個dictEntry都rehash到dict.ht[1] -
每次執行新增、刪除、查詢、修改操作時,除了執行對應操作之外,還會都檢查一下dict.rehashidx是否大于 -1;若是則按順序將dict.ht[0].table[rehashidx]的entry鏈表rehash到dict.ht[1],并且將rehashidx++,直至dict.ht[0]的所有資源都rehash到dict.ht[1]
PS:隨著處理客戶端發起的哈希表操作請求數量越多,最終在某個時間點,會把「哈希表 1 」的所有key-value 遷移到「哈希表 2」
-
將dict.ht[1]賦值給dict.ht[0],給dict.ht[1]初始化為空哈希表,釋放原來的dict.ht[0]的內存
-
將rehashidx賦值為-1,代表rehash結束
-
在rehash過程中,新增操作,則直接寫入ht[1],查詢、修改和刪除則會在dict.ht[0]和dict.ht[1]依次查找并執行。這樣可以確保ht[0]的數據只減不增,隨著rehash最終為空
上述流程動畫圖如下:
Redis數據結構:ZipList
壓縮列表的最?特點,就是它被設計成?種內存緊湊型的數據結構,占??塊連續的內存空間,不僅可以利? CPU 緩存,?且會針對不同?度的數據,進?相應編碼,這種?法可以有效地節省內存開銷
但是,壓縮列表的缺陷也是有的:
- 不能保存過多的元素,否則查詢效率就會降低;
- 新增或修改某個元素時,壓縮列表占?的內存空間需要重新分配,甚?可能引發連鎖更新的問題
壓縮列表是 Redis 為了節約內存?開發的,它是由連續內存塊組成的順序型數據結構,有點類似于數組:
| 屬性 | 類型 | 長度 | 用途 |
|---|---|---|---|
| zlbytes | uint32_t | 4 字節 | 記錄整個壓縮列表占?對內存字節數 |
| zltail | uint32_t | 4 字節 | 記錄壓縮列表尾節點距離壓縮列表的起始地址有多少字節,通過這個偏移量,可以確定表尾節點的地址 |
| zllen | uint16_t | 2 字節 | 記錄了壓縮列表包含的節點數量。 最大值為UINT16_MAX (65534),如果超過這個值,此處會記錄為65535,但節點的真實數量需要遍歷整個壓縮列表才能計算得出。 |
| entry | 列表節點 | 不定 | 壓縮列表包含的各個節點,節點的長度由節點保存的內容決定。 |
| zlend | uint8_t | 1 字節 | 特殊值 0xFF (十進制 255 ),用于標記壓縮列表的末端。 |
在壓縮列表中,如果我們要查找定位第?個元素和最后?個元素,可以通過表頭三個字段的?度直接定位,復雜度是 O(1)。?查找其他元素時,就沒有這么?效了,只能逐個查找,此時的復雜度就是 O(N)了,因此壓縮列表不適合保存過多的元素
ZipList的Entry結構
ZipList 中的Entry并不像普通鏈表那樣記錄前后節點的指針,因為記錄兩個指針要占用16個字節,浪費內存。而是采用了下面的結構:
-
previous_entry_length:前一節點的長度,占1個或5個字節。PS:這個點涉及到“連鎖更新”問題
- 如果前一節點的長度小于254字節,則采用1個字節來保存這個長度值
- 如果前一節點的長度大于254字節,則采用5個字節來保存這個長度值,第一個字節為0xfe,后四個字節才是真實長度數據
-
encoding:編碼屬性,記錄content的數據類型(字符串還是整數)以及長度,占用1個、2個或5個字節
-
contents:負責保存節點的數據,可以是字符串或整數
ZipList中所有存儲長度的數值均采用小端字節序,即低位字節在前,高位字節在后。例如:數值0x1234,采用小端字節序后實際存儲值為:0x3412
PS:人的閱讀習慣是從左到右,即大端字節序,機器讀取數據是反著的,所以采用小端字節序,從而先處理低位,再處理高位
ZipList的Entry中的encoding編碼
當我們往壓縮列表中插?數據時,壓縮列表就會根據數據是字符串還是整數,以及數據的??,會使?不同空間??的 previous_entry_length和 encoding 這兩個元素保存的信息
previous_entry_length的規則上一節中已經提到了,接下來看看encoding
encoding 屬性的空間??跟數據是字符串還是整數,以及字符串的?度有關:
-
如果當前節點的數據是整數,則 encoding 會使? 1 字節的空間進?編碼。
-
如果當前節點的數據是字符串,根據字符串的?度??,encoding 會使? 1 字節/2字節/5字節的空間進?編碼
- 當前節點的數據是整數
如果encoding是以“11”開始,則證明content是整數,且encoding固定只占用1個字節
| 編碼 | 編碼長度 | 整數類型 |
|---|---|---|
| 11000000 | 1 | int16_t(2 bytes) |
| 11010000 | 1 | int32_t(4 bytes) |
| 11100000 | 1 | int64_t(8 bytes) |
| 11110000 | 1 | 24位有符整數(3 bytes) |
| 11111110 | 1 | 8位有符整數(1 bytes) |
| 1111xxxx | 1 | 直接在xxxx位置保存數值,范圍從0001~1101,減1后結果為實際值 |
- 當前節點的數據是字符串
如果encoding是以“00”、“01”或者“10”開頭(即整數是11開頭,排除這種情況剩下的就是字符串),則證明content是字符串
| 編碼 | 編碼長度 | 字符串大小 |
|---|---|---|
| |00pppppp| | 1 bytes | <= 63 bytes |
| |01pppppp|qqqqqqqq| | 2 bytes | <= 16383 bytes |
| |10000000|qqqqqqqq|rrrrrrrr|ssssssss|tttttttt| | 5 bytes | <= 4294967295 bytes |
如,要保存字符串:“ab”和 “bc”
ZipList的連鎖更新問題
壓縮列表節點的 previous_entry_length屬性會根據前?個節點的?度進?不同的空間??分配:
- 如前?個節點的?度?于 254 字節,那么 previous_entry_length屬性需要? 1 字節的空間來保存這個?度值;
- 如果前?個節點的?度?于等于 254 字節,那么 previous_entry_length屬性需要? 5 字節的空間來保存這個?度值
現在假設?個壓縮列表中有多個連續的、?度在 250~253 之間的節點,如下圖:
因為這些節點?度值?于 254 字節,所以 previous_entry_length屬性需要? 1 字節的空間來保存這個?度值
這時,如果將?個?度?于等于 254 字節的新節點加?到壓縮列表的表頭節點,即新節點將成為 e1 的前置節點,如下圖:
因為 e1 節點的 previous_entry_length屬性只有 1 個字節??,?法保存新節點的?度,此時就需要對壓縮列表的空間重分配,并將 e1 節點的 previous_entry_length屬性從原來的 1 字節??擴展為 5 字節??,那么多?諾牌的效應就此開始:
e1 原本的?度在 250~253 之間,因為剛才的擴展空間,此時 e1 的?度就?于等于 254 了,因此原本 e2保存 e1 的 previous_entry_length屬性也必須從 1 字節擴展? 5 字節??
正如擴展 e1 引發了對 e2 擴展?樣,擴展 e2 也會引發對 e3 的擴展,?擴展 e3 ?會引發對 e4 的擴展................?直持續到結尾
因此:ZipList這種特殊情況下產生的連續多次空間擴展操作稱之為“連鎖更新(Cascade Update)”。新增、刪除都可能導致連鎖更新的發生。就像多?諾牌的效應?樣,第?張牌倒下了,推動了第?張牌倒下;第?張牌倒下,?推動了第三張牌倒下....
所以壓縮列表(ZipList)就有了缺陷:如果保存的元素數量增加了,或是元素變?了,會導致內存重新分配,最糟糕的是會有「連鎖更新」的問題
因此也就可以得出結論:壓縮列表只會?于保存的節點數量不多的場景,只要節點數量?夠?,即使發?連鎖更新,也是能接受的
當然,Redis 針對壓縮列表在設計上的不?,在后來的版本中,新增設計了兩種數據結構:quicklist(Redis 3.2 引?) 和 listpack(Redis 5.0 引?)。這兩種數據結構的設計?標,就是盡可能地保持壓縮列表節省內存的優勢,同時解決壓縮列表的「連鎖更新」的問題
Redis數據結構:QuickList
前面講到雖然壓縮列表是通過緊湊型的內存布局節省了內存開銷,但是因為它的結構設計,如果保存的元素數量增加,或者元素變?了,壓縮列表會有「連鎖更新」的?險,?旦發?,會造成性能下降
QuickList解決辦法:通過控制每個鏈表節點中的壓縮列表的??或者元素個數,來規避連鎖更新的問題。因為壓縮列表元素越少或越?,連鎖更新帶來的影響就越?,從?提供了更好的訪問性能
問題1:ZipList雖然節省內存,但申請內存必須是連續空間,如果內存占用較多,申請內存效率很低。怎么辦?
? 答:為了緩解這個問題,我們必須限制ZipList的長度和entry大小。
問題2:但是我們要存儲大量數據,超出了ZipList最佳的上限該怎么辦?
? 答:我們可以創建多個ZipList來分片存儲數據。
問題3:數據拆分后比較分散,不方便管理和查找,這多個ZipList如何建立聯系?
? 答:Redis在3.2版本引入了新的數據結構QuickList,它是一個雙端鏈表,只不過鏈表中的每個節點都是一個ZipList。
為了避免QuickList中的每個ZipList中entry過多,Redis提供了一個配置項:list-max-ziplist-size 來限制
- 如果值為正,則代表ZipList的允許的entry個數的最大值
- 如果值為負,則代表ZipList的最大內存大小,分5種情況:
| 值 | 含義 |
|---|---|
| -1 | 每個ZipList的內存占用不能超過4kb |
| -2 | 每個ZipList的內存占用不能超過8kb |
| -3 | 每個ZipList的內存占用不能超過16kb |
| -4 | 每個ZipList的內存占用不能超過32kb |
| -5 | 每個ZipList的內存占用不能超過64kb |
其默認值為 -2:
QuickList的和QuickListNode的結構源碼:
方便理解,用個流程圖來描述當前的這個結構:
Redis數據結構:SkipList
SkipList(鏈表)在查找元素的時候,因為需要逐?查找,所以查詢效率?常低,時間復雜度是O(N),于是就出現了跳表。跳表是在鏈表基礎上改進過來的,實現了?種「多層」的有序鏈表,這樣的好處是能快讀定位數據
跳表與傳統鏈表相比有幾點差異:
- 元素按照升序排列存儲
- 節點可能包含多個指針,指針跨度不同
SkipList的源碼與圖形化示意如下:
跳表是?個帶有層級關系的鏈表,?且每?層級可以包含多個節點,每?個節點通過指針連接起來,實現這?特性就是靠跳表節點結構體中的zskiplistLevel 結構體類型的 level[] 數組
level 數組中的每?個元素代表跳表的?層,也就是由 zskiplistLevel 結構體表示,?如 leve[0] 就表示第?層,leve[1] 就表示第?層。zskiplistLevel 結構體?定義了「指向下?個跳表節點的指針」和「跨度」,跨度時?來記錄兩個節點之間的距離
跳表節點查詢過程
查找?個跳表節點的過程時,跳表會從頭節點的最?層開始,逐?遍歷每?層。在遍歷某?層的跳表節點時,會?跳表節點中的 SDS 類型的元素和元素的權重來進?判斷,共有兩個判斷條件:
-
如果當前節點的權重「?于」要查找的權重時,跳表就會訪問該層上的下?個節點。
-
如果當前節點的權重「等于」要查找的權重時,并且當前節點的 SDS 類型數據「?于」要查找的數據時,跳表就會訪問該層上的下?個節點
-
如果上?兩個條件都不滿?,或者下?個節點為空時,跳表就會使??前遍歷到的節點的 level 數組?的下?層指針,然后沿著下?層指針繼續查找,這就相當于跳到了下?層接著查找
如下圖有個 3 層級的跳表:
如果要查找「元素:abcd,權重:4」的節點,查找的過程是這樣的:
- 先從頭節點的最?層開始,L2 指向了「元素:abc,權重:3」節點,這個節點的權重?要查找節點的?,所以要訪問該層上的下?個節點;
- 但是該層上的下?個節點是空節點,于是就會跳到「元素:abc,權重:3」節點的下?層去找,也就是 leve[1];
- 「元素:abc,權重:3」節點的 leve[1] 的下?個指針指向了「元素:abcde,權重:4」的節點,然后將其和要查找的節點?較。雖然「元素:abcde,權重:4」的節點的權重和要查找的權重相同,但是當前節點的 SDS 類型數據「?于」要查找的數據,所以會繼續跳到「元素:abc,權重:3」節點的下?層去找,也就是 leve[0];
- 「元素:abc,權重:3」節點的 leve[0] 的下?個指針指向了「元素:abcd,權重:4」的節點,該節點正是要查找的節點,查詢結束
跳表節點側層數設置
跳表的相鄰兩層的節點數量最理想的?例是 2:1,查找復雜度可以降低到 O(logN)
下圖的跳表就是,相鄰兩層的節點數量的?例是 2 : 1
- 怎樣才能維持相鄰兩層的節點數量的?例為 2 : 1 ?
如果采?新增節點或者刪除節點時,來調整跳表節點以維持?例的?法的話,會帶來額外的開銷
Redis 則采??種巧妙的?法是,跳表在創建節點的時候,隨機?成每個節點的層數,并沒有嚴格維持相鄰兩層的節點數量?例為 2 : 1 的情況
具體的做法是:跳表在創建節點時候,會?成范圍為[0-1]的?個隨機數,如果這個隨機數?于 0.25(相當于概率 25%),那么層數就增加 1 層,然后繼續?成下?個隨機數,直到隨機數的結果?于 0.25 結束,最終確定該節點的層數
這樣的做法,相當于每增加?層的概率不超過 25%,層數越?,概率越低,層?最?限制是 64層
Redis數據結構:RedisObject
Redis中的任意數據類型的鍵和值都會被封裝為一個RedisObject,也叫做Redis對象
從Redis的使用者的角度來看,?個Redis節點包含多個database(非cluster模式下默認是16個,cluster模式下只能是1個),而一個database維護了從key space到object space的映射關系。這個映射關系的key是string類型,?value可以是多種數據類型,比如:string, list, hash、set、sorted set等。即key的類型固定是string,而value可能的類型是多個
?從Redis內部實現的?度來看,database內的這個映射關系是用?個dict來維護的。dict的key固定用?種數據結構來表達就夠了,即SDS。而value則比較復雜,為了在同?個dict內能夠存儲不同類型的value,這就需要?個通?的數據結構,這個通用的數據結構就是robj,全名是redisObject
RedisObject中encoding編碼方式
Redis中會根據存儲的數據類型不同,選擇不同的編碼方式,共包含11種不同類型:
| 編號 | 編碼方式 | 說明 |
|---|---|---|
| 0 | OBJ_ENCODING_RAW | raw編碼動態字符串 |
| 1 | OBJ_ENCODING_INT | long類型的整數的字符串 |
| 2 | OBJ_ENCODING_HT | hash表(字典dict) |
| 3 | OBJ_ENCODING_ZIPMAP | 已廢棄 |
| 4 | OBJ_ENCODING_LINKEDLIST | 雙端鏈表 |
| 5 | OBJ_ENCODING_ZIPLIST | 壓縮列表 |
| 6 | OBJ_ENCODING_INTSET | 整數集合 |
| 7 | OBJ_ENCODING_SKIPLIST | 跳表 |
| 8 | OBJ_ENCODING_EMBSTR | embstr的動態字符串 |
| 9 | OBJ_ENCODING_QUICKLIST | 快速列表 |
| 10 | OBJ_ENCODING_STREAM | Stream流 |
五種數據結構對應的編碼方式
Redis中會根據存儲的數據類型不同,選擇不同的編碼方式。每種數據類型使用的編碼方式如下:
| 數據類型 | 編碼方式 |
|---|---|
| OBJ_STRING | raw、embstr、int |
| OBJ_LIST | LinkedList和ZipList(3.2以前)、QuickList(3.2以后) |
| OBJ_SET | HT、intset |
| OBJ_ZSET | ZipList、HT、SkipList |
| OBJ_HASH | HT、ZipList |
Redis對象:String
String是Redis中最常見的數據存儲類型,下圖為SDS源碼:
開發中,能用embstr編碼就用,若不能則用int編碼,raw編碼最后考慮
-
基本編碼方式是raw,基于簡單動態字符串(SDS)實現,存儲上限為512mb。驗證方式:使用命令
object encoding key,下面說的另外情況也可通過這種方式驗證
- 如果存儲的SDS長度小于44字節,則會采用embstr編碼,此時object head與SDS是一段連續空間。申請內存時只需要調用一次內存分配函數,效率更高
-
如果?個String類型的value值是數字,那么Redis內部會把它轉成long類型來存儲,從?減少內存的使用
-
如果存儲的字符串是整數值,并且大小在LONG_MAX范圍內,則會采用INT編碼:直接將數據保存在RedisObject的ptr指針位置(剛好8字節),不再需要SDS了
驗證圖:
當然,確切地說,String在Redis中是??個robj來表示的
用來表示String的robj可能編碼成3種內部表?:OBJ_ENCODING_RAW,OBJ_ENCODING_EMBSTR,OBJ_ENCODING_INT。其中前兩種編碼使?的
是sds來存儲,最后?種OBJ_ENCODING_INT編碼直接把string存成了long型
在“對string進行incr, decr等操作時”,如果它內部是OBJ_ENCODING_INT編碼,那么可以直接行加減操作;如果它內部是OBJ_ENCODING_RAW或
OBJ_ENCODING_EMBSTR編碼,那么Redis會先試圖把sds存儲的字符串轉成long型,如果能轉成功,再進行加減操作
“對?個內部表示成long型的string執行append, setbit, getrange這些命令”,針對的仍然是string的值(即?進制表示的字符串),而不是針對內部表?的long型進?操作。比如字符串”32”,如果按照字符數組來解釋,它包含兩個字符,它們的ASCII碼分別是0x33和0x32。當我們執行命令setbit key 7 0的時候,相當于把字符0x33變成了0x32,這樣字符串的值就變成了”22”。?如果將字符串”32”按照內部的64位long型來解釋,那么它是0x0000000000000020,在這個基礎上執?setbit位操作,結果就完全不對了。因此,在這些命令的實現中,會把long型先轉成字符串再進行相應的操作
Redis對象:List
Redis的List類型可以從首、尾操作列表中的元素,滿足這種條件的有以下方式:
- LinkedList :普通鏈表,可以從雙端訪問,內存占用較高,內存碎片較多
- ZipList :壓縮列表,可以從雙端訪問,內存占用低,存儲上限低
- QuickList:LinkedList + ZipList,可以從雙端訪問,內存占用較低,包含多個ZipList,存儲上限高
在3.2版本之前,Redis采用LinkedList和ZipList來實現List,當元素數量小于512并且元素大小小于64字節時采用ZipList編碼,超過則采用LinkedList編碼。
在3.2版本之后,Redis統一采用QuickList來實現List:
Redis對象:Set
Set是Redis中的單列集合,滿足“無序不重復、查詢效率高”的特點
什么樣的數據結構可以滿足?
HashTable,也就是Redis中的Dict,不過Dict是雙列集合(可以存鍵、值對)
Set是Redis中的集合,不一定確保元素有序,可以滿足元素唯一、查詢效率要求極高。
為了查詢效率和唯一性,set采用HT編碼(Dict)。Dict中的key用來存儲元素,value統一為null。當存儲的所有數據都是整數,并且元素數量不超過 set-max-intset-entries 時,Set會采用IntSet編碼,以節省內存
Redis對象:SortedSet
SortedSet也就是ZSet,其中每一個元素都需要指定一個score值和member值:
- 可以根據score值排序后
- member必須唯一
- 可以根據member查詢分數
因此,zset底層數據結構必須滿足鍵值存儲、鍵必須唯一、可排序這幾個需求。哪種編碼結構可以滿足?
- SkipList:可以排序,并且可以同時存儲score和ele值(member)
- HT(Dict):可以鍵值存儲,并且可以根據key找value
當元素數量不多時,HT和SkipList的優勢不明顯,而且更耗內存。因此zset還會采用ZipList結構來節省內存,不過需要同時滿足兩個條件:
- 元素數量小于
zset_max_ziplist_entries,默認值128 - 每個元素都小于
zset_max_ziplist_value字 節,默認值64
ziplist本身沒有排序功能,而且沒有鍵值對的概念,因此需要有zset通過編碼實現:
- ZipList是連續內存,因此score和element是緊挨在一起的兩個entry, element在前,score在后
- score越小越接近隊首,score越大越接近隊尾,按照score值升序排列
Redis對象:Hash
Hash結構與Redis中的Zset非常類似:
- 都是鍵值存儲
- 都需求根據鍵獲取值
- 鍵必須唯一
區別如下:
- zset的鍵是member,值是score;hash的鍵和值都是任意值
- zset要根據score排序;hash則無需排序
底層實現方式:壓縮列表ziplist 或者 字典dict
當Hash中數據項比較少的情況下,Hash底層才?壓縮列表ziplist進?存儲數據,隨著數據的增加,底層的ziplist就可能會轉成dict,具體配置如下:
hash-max-ziplist-entries 512
hash-max-ziplist-value 64
當滿足上面兩個條件其中之?的時候,Redis就使?dict字典來實現hash。
Redis的hash之所以這樣設計,是因為當ziplist變得很?的時候,它有如下幾個缺點:
- 每次插?或修改引發的realloc操作會有更?的概率造成內存拷貝,從而降低性能。
- ?旦發生內存拷貝,內存拷貝的成本也相應增加,因為要拷貝更?的?塊數據。
- 當ziplist數據項過多的時候,在它上?查找指定的數據項就會性能變得很低,因為ziplist上的查找需要進行遍歷。
總之,ziplist本來就設計為各個數據項挨在?起組成連續的內存空間,這種結構并不擅長做修改操作。?旦數據發?改動,就會引發內存realloc,可能導致內存拷貝。
hash結構如下:
zset集合如下:
因此,Hash底層采用的編碼與Zset也基本一致,只需要把排序有關的SkipList去掉即可:
Hash結構默認采用ZipList編碼,用以節省內存。 ZipList中相鄰的兩個entry 分別保存field和value
當數據量較大時,Hash結構會轉為HT編碼,也就是Dict,觸發條件有兩個:
- ZipList中的元素數量超過了
hash-max-ziplist-entries(默認512) - ZipList中的任意entry大小超過了
hash-max-ziplist-value(默認64字節)
過期Key處理
Redis之所以性能強,最主要的原因就是基于內存存儲。然而單節點的Redis其內存大小不宜過大,會影響持久化或主從同步性能。
我們可以通過修改配置文件來設置Redis的最大內存:
當內存使用達到上限時,就無法存儲更多數據了。為了解決這個問題,Redis提供了一些策略實現內存回收
內存過期策略
通過expire命令給Redis的key設置TTL(存活時間):key的TTL到期以后,再次訪問name返回的是nil,說明這個key已經不存在了,對應的內存也得到釋放。從而起到內存回收的目的
Redis本身是一個典型的key-value內存存儲數據庫,因此所有的key、value都保存在前面玩過的Dict結構中。不過在其database結構體中,有兩個Dict:一個用來記錄key-value;另一個用來記錄key-TTL
- 問題:Redis是如何知道一個key是否過期?
利用兩個Dict分別記錄key-value對及key-ttl對
- 問題:是不是TTL到期就立即刪除了?
方式一:惰性刪除:顧明思議并不是在TTL到期后就立刻刪除,而是在訪問一個key的時候,檢查該key的存活時間,如果已經過期才執行刪除
方式二:周期刪除:顧明思議是通過一個定時任務,周期性的抽樣部分過期的key,然后執行刪除。
執行周期有兩種:
- Redis服務初始化函數initServer()中設置定時任務,按照server.hz的頻率來執行過期key清理,模式為SLOW
- Redis的每個事件循環前會調用beforeSleep()函數,執行過期key清理,模式為FAST
SLOW模式規則:即:低頻率高時長
- 執行頻率受server.hz影響,默認為10,即每秒執行10次,每個執行周期100ms。
- 執行清理耗時不超過一次執行周期的25%.默認slow模式耗時不超過25ms
- 逐個遍歷db,逐個遍歷db中的bucket,抽取20個key判斷是否過期
- 如果沒達到時間上限(25ms)并且過期key比例大于10%,再進行一次抽樣,否則結束
FAST模式規則(過期key比例小于10%不執行 ):即:高頻率低時長
- 執行頻率受beforeSleep()調用頻率影響,但兩次FAST模式間隔不低于2ms
- 執行清理耗時不超過1ms
- 逐個遍歷db,逐個遍歷db中的bucket,抽取20個key判斷是否過期
如果沒達到時間上限(1ms)并且過期key比例大于10%,再進行一次抽樣,否則結束
內存淘汰策略
內存淘汰:就是當Redis內存使用達到設置的上限時,主動挑選部分key刪除以釋放更多內存的流程
Redis會在處理客戶端命令的方法processCommand()中嘗試做內存淘汰:
Redis支持8種不同策略來選擇要刪除的key:
- noeviction: 不淘汰任何key,但是內存滿時不允許寫入新數據,默認就是這種策略。
- volatile-ttl: 對設置了TTL的key,比較key的剩余TTL值,TTL越小越先被淘汰
- allkeys-random:對全體key ,隨機進行淘汰。也就是直接從db->dict中隨機挑選
- volatile-random:對設置了TTL的key ,隨機進行淘汰。也就是從db->expires中隨機挑選。
- allkeys-lru: 對全體key,基于LRU算法進行淘汰
- volatile-lru: 對設置了TTL的key,基于LRU算法進行淘汰
- allkeys-lfu: 對全體key,基于LFU算法進行淘汰
- volatile-lfu: 對設置了TTL的key,基于LFI算法進行淘汰
比較容易混淆的有兩個:
- LRU(Least Recently Used),最少最近使用。用當前時間減去最后一次訪問時間,這個值越大則淘汰優先級越高。
- LFU(Least Frequently Used),最少頻率使用。會統計每個key的訪問頻率,值越小淘汰優先級越高。
edis的數據都會被封裝為RedisObject結構:
LFU的訪問次數之所以叫做邏輯訪問次數,是因為并不是每次key被訪問都計數,而是通過運算:
- 生成0~1之間的隨機數R
- 計算 (舊次數 * lfu_log_factor + 1),記錄為P
- 如果 R < P ,則計數器 + 1,且最大不超過255
- 訪問次數會隨時間衰減,距離上一次訪問時間每隔 lfu_decay_time 分鐘,計數器 -1
結合源碼整套邏輯如下:
?
總結
以上是生活随笔為你收集整理的Redis 技术整理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 从零实现的浏览器Web脚本
- 下一篇: c# char unsigned_dll