腐蚀rust服务器命令_【使用 Rust 写 Parser】2. 解析Redis协议
系列所有文章
https://zhuanlan.zhihu.com/p/115017849?zhuanlan.zhihu.comhttps://zhuanlan.zhihu.com/p/139387293?zhuanlan.zhihu.comhttps://zhuanlan.zhihu.com/p/146455601?zhuanlan.zhihu.comhttps://zhuanlan.zhihu.com/p/186217695?zhuanlan.zhihu.com在基本熟悉 nom 之后, 這次我們準(zhǔn)備用 nom 實(shí)現(xiàn)一個(gè) redis 通信協(xié)議的解析器. 選擇 redis 是因?yàn)?redis 的通信協(xié)議易讀且比較簡(jiǎn)單.
準(zhǔn)備
如果你對(duì) redis 通信協(xié)議不熟悉的話可以查閱 通信協(xié)議(protocol). 簡(jiǎn)單來(lái)說(shuō) redis 通信協(xié)議分為統(tǒng)一請(qǐng)求協(xié)議(這里只討論新版請(qǐng)求協(xié)議)和回復(fù)協(xié)議, 請(qǐng)求協(xié)議可以方便地通過(guò) Rust 內(nèi)置的 format! 拼接構(gòu)成, 而通信協(xié)議則使用 nom 解析. redis 協(xié)議非常簡(jiǎn)單, 這里不再贅述.
首先我們需要一個(gè) redis 服務(wù)器, 這里我在開(kāi)發(fā)的機(jī)器上用 docker 啟動(dòng)一個(gè) redis 服務(wù)器:
docker run -d --name redis -p 6379:6379 redis redis-server --appendonly yes測(cè)試下 redis 服務(wù)
telnet localhost 6379 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. ping +PONG出現(xiàn) +PONG 說(shuō)明服務(wù)器已正常運(yùn)行
實(shí)現(xiàn)基本功能
首先創(chuàng)建項(xiàng)目
cargo new rcli && cd rcli添加如下依賴
[dependencies] tokio = { version = "0.2", features = ["full"]} nom = "5" bytes = "0.5.4" structopt = "0.3.14"structopt 可以幫助我們快速構(gòu)建命令行工具輸入 redis 命令幫助測(cè)試, bytes 則可以幫助我們處理字節(jié), tokio 依賴是上個(gè)測(cè)試代碼遺留的依賴, 剛好新代碼也需要 tcp 連接, 索性使用 tokio 處理 tcp 連接, nom 自然是用于解析回復(fù).
首先我們需要?jiǎng)?chuàng)建 tcp 連接與 redis 通信, 并且寫(xiě)入一些數(shù)據(jù)看看協(xié)議是否管用:
use bytes::{BufMut, BytesMut}; use std::error::Error; use tokio::net::TcpStream; use tokio::prelude::*;#[tokio::main] async fn main() -> Result<(), Box<dyn Error>> {let mut stream = TcpStream::connect("127.0.0.1:6379").await?;let mut buf = [0u8; 1024];let mut resp = BytesMut::with_capacity(1024);let (mut reader, mut writer) = stream.split();// 向服務(wù)器發(fā)送 PINGwriter.write(b"*1rn$4rnPINGrn").await?;let n = reader.read(&mut buf).await?;resp.put(&buf[0..n]);// 返回結(jié)果應(yīng)該是 PONGprintln!("{:?}", resp);Ok(()) }如上面代碼展示的, 我們創(chuàng)建一個(gè) tcp 連接和一個(gè)緩沖 buf, 在成功連接后根據(jù)協(xié)議嘗試寫(xiě)入 *1rn$4rnPINGrn, 預(yù)期結(jié)果是服務(wù)器返回 "+PONGrn".
現(xiàn)在我們可以創(chuàng)建 CLI 實(shí)現(xiàn)幾個(gè)常用的 redis 命令, 方便我們向服務(wù)器發(fā)送命令. 創(chuàng)建 commands.rs 文件, 記得在 main.rs 中導(dǎo)入它.
以 rpush 為例, rpush 命令用法為 RPUSH key value [value …]
使用 structopt 可以這樣定義一個(gè)枚舉(使用結(jié)構(gòu)體也可以, 但因?yàn)閷?lái)有很多子命令, 所以枚舉更合適)
use structopt::StructOpt;#[derive(Debug, Clone, StructOpt)] pub enum Commands {/// push value to listRpush {/// redis keykey: String,/// valuevalues: Vec<String>,}, }接著在 main.rs 中使用 Commands 解析命令行
use structopt::StructOpt; mod commands;#[tokio::main] async fn main() -> Result<(), Box<dyn Error>> {// 創(chuàng)建 tcp 連接, buf 等...let com = commands::Commands::from_args();// 發(fā)送命令 ... }運(yùn)行項(xiàng)目看下效果
cargo run -- helppush value to listUSAGE:rrdis-cli rpush <key> [values]...FLAGS:-h, --help Prints help information-V, --version Prints version informationARGS:<key> redis key<values>... value接下來(lái)要把從命令行傳來(lái)的參數(shù)轉(zhuǎn)換為 redis 統(tǒng)一請(qǐng)求. redis 以 rn 為分隔符, redis 請(qǐng)求格式以 *argc 開(kāi)頭, argc 是此次請(qǐng)求的參數(shù)個(gè)數(shù), 每個(gè)參數(shù)先以 $<參數(shù)長(zhǎng)度> 聲明參數(shù)長(zhǎng)度, 接著 rn 分割符, 然后是參數(shù)數(shù)據(jù), 若有多個(gè)參數(shù)則重復(fù)此步驟. 最后以 rn 結(jié)尾.
比如上面的 PING 轉(zhuǎn)換為 *1rn$4rnPINGrn, 而 GET 轉(zhuǎn)換為 *2rn$3rnGETrn$3rnkeyrn.
可以使用一個(gè) builder 幫助我們轉(zhuǎn)換:
use bytes::{BufMut, BytesMut};#[derive(Debug, Clone)] struct CmdBuilder {args: Vec<String>, }impl CmdBuilder {fn new() -> Self {CmdBuilder { args: vec![] }}fn arg(mut self, arg: &str) -> Self {self.args.push(format!("${}", arg.len()));self.args.push(arg.to_string());self}fn add_arg(&mut self, arg: &str) {self.args.push(format!("${}", arg.len()));self.args.push(arg.to_string());}fn to_bytes(&self) -> BytesMut {let mut bytes = BytesMut::new();bytes.put(&format!("*{}rn", self.args.len() / 2).into_bytes()[..]);bytes.put(&self.args.join("rn").into_bytes()[..]);bytes.put(&b"rn"[..]);bytes} }CmdBuilder 做的很簡(jiǎn)單, 保存通過(guò) arg 或 add_arg 傳入的參數(shù), 在 to_bytes 方法中拼接這些參數(shù)為有效的請(qǐng)求.
例如可以通過(guò)如下方式構(gòu)建一個(gè) GET 命令
let cmd = CmdBuilder::new().arg("GET").arg("key").to_bytes()接下來(lái)使用 CmdBuilder 為 Commands 實(shí)現(xiàn) to_bytes 方法
impl Commands {pub fn to_bytes(&self) -> bytes::BytesMut {let cmd = match self {Commands::Rpush { key, values } => {let mut builder = CmdBuilder::new().arg("RPUSH").arg(key);values.iter().for_each(|v| builder.add_arg(v));builder.to_bytes()}};cmd} }改寫(xiě) main 函數(shù)發(fā)送構(gòu)建的請(qǐng)求
// ... 省略 let com = commands::Commands::from_args(); writer.write(&com.to_bytes()).await?; cargo run -- rpush list a b c d# redis 成功返回響應(yīng) :3rnAll is well, 對(duì)于其他命令可以通過(guò)相同方法實(shí)現(xiàn), 可以在 rrdis-cli/src/commands.rs 看到完整實(shí)現(xiàn).
解析回復(fù)
現(xiàn)在終于到 nom 出場(chǎng)了. 新建 reply.rs 文件, 并在 main.rs 導(dǎo)入. 首先導(dǎo)入需要使用的 nom 方法, 接著定義 Reply, 因?yàn)?redis 回復(fù)種類有限, 所以用一個(gè)枚舉是非常合適的.
use nom::branch::alt; use nom::bytes::complete::tag; use nom::bytes::complete::{take_while, take_while1, take_while_m_n}; use nom::combinator::map; use nom::multi::many_m_n; use nom::sequence::delimited; use nom::IResult;#[derive(Debug)] pub enum Reply {// 狀態(tài)回復(fù)或單行回復(fù)SingleLine(String),// 錯(cuò)誤回復(fù)Err(String),// 整數(shù)回復(fù)Int(i64),// 批量回復(fù)Batch(Option<String>),// 多條批量回復(fù)MultiBatch(Option<Vec<Reply>>),// 回復(fù)中沒(méi)有, 這里是為了方便進(jìn)行錯(cuò)誤處理添加的BadReply(String), }單行回復(fù)
協(xié)議中單行回復(fù)定義如下:
一個(gè)狀態(tài)回復(fù)(或者單行回復(fù),single line reply)是一段以 "+" 開(kāi)始、 "rn" 結(jié)尾的單行字符串。所以解析思路是: 如果回復(fù)以"+"開(kāi)頭, 則讀取余下字節(jié)存作為回復(fù), 直到 "rn", 偽代碼如下
take_if("+"), take_util_new_line, take_if("rn")nom 中的 tag 可以完美實(shí)現(xiàn)偽代碼中的 take_if 功能, 令人驚喜的是對(duì)于"消耗輸入直到不符合某種條件"這個(gè)常見(jiàn)解析模式, nom 提供了 take_while 函數(shù), 所以我們的解析函數(shù)可以寫(xiě)成:
fn parse_single_line(i: &str) -> IResult<&str, Reply> {let (i, _) = tag("+")(i)?;let (i, resp) = take_while(|c| c != 'r' && c != 'n')(i)?;let (i, _) = tag("rn")(i)?;Ok((i, Reply::SingleLine(resp.to_string()))) }tag 和 take_while 讓解析函數(shù)的功能非常直觀地展現(xiàn)出來(lái), 這讓它看著想偽代碼, 但它真的能運(yùn)行!
在函數(shù)中只有 take_while 返回的結(jié)果是我們想要的, 但兩個(gè) tag 又是不可或缺, 對(duì)于這一常見(jiàn)解析模式 nom 提供了 delimited 這個(gè)組合子函數(shù), 這個(gè)組合子函數(shù)接受三個(gè)類似 tag("xx") 這樣的基本函數(shù), 依次應(yīng)用這三個(gè)函數(shù), 如果成功, 則返回第二個(gè)函數(shù)解析的結(jié)果.
所以我們的函數(shù)可以這樣寫(xiě):
fn parse_single_line(i: &str) -> IResult<&str, Reply> {let (i, resp) = delimited(tag("+"),take_while(|c| c != 'r' && c != 'n'),tag("rn"),)(i)?;Ok((i, Reply::SingleLine(String::from(resp)))) }錯(cuò)誤回復(fù)
錯(cuò)誤回復(fù)定義:
錯(cuò)誤回復(fù)和狀態(tài)回復(fù)非常相似, 它們之間的唯一區(qū)別是, 錯(cuò)誤回復(fù)的第一個(gè)字節(jié)是 "-" , 而狀態(tài)回復(fù)的第一個(gè)字節(jié)是 "+"所以錯(cuò)誤回復(fù)解析函數(shù)和上面的差不多:
fn parse_err(i: &str) -> IResult<&str, Reply> {let (i, resp) = delimited(tag("-"),// take_while1 與 take_while 類似, 但要求至少一個(gè)字符符合條件take_while1(|c| c != 'r' && c != 'n'),tag("rn"),)(i)?;Ok((i, Reply::Err(String::from(resp)))) }整數(shù)回復(fù)
整數(shù)回復(fù)就是一個(gè)以 ":" 開(kāi)頭, CRLF 結(jié)尾的字符串表示的整數(shù),整數(shù)回復(fù)結(jié)構(gòu)與前兩種類似, 區(qū)別在于中間是整數(shù), 需要將 take_while1 的返回值轉(zhuǎn)換為整數(shù).
如果沒(méi)有進(jìn)行類型轉(zhuǎn)換解析函數(shù)可以這樣實(shí)現(xiàn):
fn parse_int(i: &str) -> IResult<&str, Reply> {let (i, int) = delimited(tag(":"),// 注意負(fù)數(shù)前綴take_while1(|c: char| c.is_digit(10) || c == '-'),tag("rn"),)(i)?;// ... 類型轉(zhuǎn)換Ok((i, Reply::Int(int))) }注意到 nom 提供的基本解析工廠函數(shù)如 tag 創(chuàng)建的解析函數(shù)返回值都是 IResult, 它與 Result 類似, 可以應(yīng)用 map 運(yùn)算子, 不過(guò)這個(gè) map 需使用 nom 提供的
map(take_while1(|c: char| c.is_digit(10) || c == '-'), |int: &str| int.parse::<i64>().unwrap())通過(guò) nom 的 map 函數(shù)可以把返回值從 IResult<&str, &str> 映射為 IResult<&str, i64>, 最后解析函數(shù)可以寫(xiě)成
fn parse_int(i: &str) -> IResult<&str, Reply> {let (i, int) = delimited(tag(":"),map(take_while1(|c: char| c.is_digit(10) || c == '-'),|int: &str| int.parse::<i64>().unwrap(),),tag("rn"),)(i)?;Ok((i, Reply::Int(int))) }批量回復(fù)
服務(wù)器發(fā)送的內(nèi)容中: - 第一字節(jié)為 "$" 符號(hào) - 接下來(lái)跟著的是表示實(shí)際回復(fù)長(zhǎng)度的數(shù)字值 - 之后跟著一個(gè) CRLF - 再后面跟著的是實(shí)際回復(fù)數(shù)據(jù) - 最末尾是另一個(gè) CRLF同時(shí)批量回復(fù)還有特殊情況
如果被請(qǐng)求的值不存在, 那么批量回復(fù)會(huì)將特殊值 -1 用作回復(fù)的長(zhǎng)度值, 這種回復(fù)稱為空批量回復(fù)(NULL Bulk Reply)此時(shí)協(xié)議要求客戶端返回空對(duì)象, 對(duì)于 Rust 則是 None, 所以 BatchReply 才會(huì)被定義為 BatchReply<Option<String>>.
所以這個(gè)函數(shù)的解析可能稍微復(fù)雜點(diǎn), 但方法與上面沒(méi)有太大差異, 除了新的 take_while_m_n, take_while_m_n 與 take_while 類似, 不同的是它可以指定消耗輸入最小數(shù)和最大數(shù)m, n.
如果是空回復(fù)則嘗試匹配 rn, 如果成功, 直接返回, 否則根據(jù)拿到的回復(fù)長(zhǎng)度, 獲取那么多長(zhǎng)度的字符, 接著應(yīng)該碰到 rn.
fn parse_batch(i: &str) -> IResult<&str, Reply> {let (i, _) = tag("$")(i)?;let (i, len) = (take_while1(|c: char| c.is_digit(10) || c == '-'))(i)?;if len == "-1" {let (i, _) = tag("rn")(i)?;Ok((i, Reply::Batch(None)))} else {let len = len.parse::<usize>().unwrap();let (i, resp) = delimited(tag("rn"), take_while_m_n(len, len, |_| true), tag("rn"))(i)?;Ok((i, Reply::Batch(Some(String::from(resp)))))} }多條批量回復(fù)
多條批量回復(fù)是由多個(gè)回復(fù)組成的數(shù)組, 數(shù)組中的每個(gè)元素都可以是任意類型的回復(fù), 包括多條批量回復(fù)本身。 多條批量回復(fù)的第一個(gè)字節(jié)為 "*" , 后跟一個(gè)字符串表示的整數(shù)值, 這個(gè)值記錄了多條批量回復(fù)所包含的回復(fù)數(shù)量, 再后面是一個(gè) CRLF多條批量回復(fù)其實(shí)是對(duì)上面四種回復(fù)的嵌套, 但需要注意"空白多條批量回復(fù)"和"無(wú)內(nèi)容多條批量回復(fù)"這兩種特殊情況.
空白多條回復(fù)為 "*0rn", 無(wú)內(nèi)容多條批量回復(fù)為 "*-1rn", 在解析時(shí)需要對(duì)這兩種特殊情況進(jìn)行處理. 在其他情況則可以應(yīng)用 nom 提供的 alt 組合子服用之前的四個(gè)解析函數(shù); alt 即"可選的", 它接受多個(gè)解析函數(shù)元組, 依次嘗試應(yīng)用每個(gè)函數(shù), 返回第一個(gè)成功解析結(jié)果或拋出錯(cuò)誤.
同時(shí)對(duì)于重復(fù)應(yīng)用某個(gè)解析函數(shù) m 到 n 次這種模式, nom 提供了 many_m_n 組合子, 對(duì)于 fn parse_item(&str) -> IResult<&str, Reply> 這樣的函數(shù), many_m_n(parse_item, 0, 12) 返回值為 IResult<&str, Vec<Reply>>.
理清邏輯后解析多條批量回復(fù)的解析函數(shù)雖然有些長(zhǎng)但還是很清晰的:
fn parse_multi_batch(i: &str) -> IResult<&str, Reply> {let (i, count) = delimited(tag("*"),take_while1(|c: char| c.is_digit(10) || c == '-'),tag("rn"),)(i)?;if count == "-1" {let (i, _) = tag("rn")(i)?;Ok((i, Reply::MultiBatch(None)))} else {let count = count.parse::<usize>().unwrap();let (i, responses) = many_m_n(count,count,alt((parse_single_line, parse_err, parse_int, parse_batch)),)(i)?;// 做個(gè)嚴(yán)格檢查, 檢查解析到的個(gè)數(shù)與預(yù)期的是否一致if responses.len() != count {Ok((i,Reply::BadReply(format!("expect {} items, got {}", count, responses.len())),))} else {Ok((i, Reply::MultiBatch(Some(responses))))}} }最后用 alt 做個(gè)"匯總"
fn parse(i: &str) -> IResult<&str, Reply> {alt((parse_single_line,parse_err,parse_int,parse_batch,parse_multi_batch,))(i) }至此我們我們的解析函數(shù)到完成了, 為 Reply 實(shí)現(xiàn) Display 特性后對(duì) redis 返回的消息應(yīng)用 parse 然后把解析結(jié)果打印出來(lái)即可驗(yàn)證解析函數(shù)正確性. 完整代碼在
rrdis-cli/src/reply.rs?github.com匯總
完整代碼可以在我的 rrdis-cli 查看. 不知道大家對(duì) nom 的評(píng)價(jià)如何, 我覺(jué)得使用 nom 提供的基本函數(shù)和一系列組合子從最小元素出發(fā), 搭積木似的構(gòu)建出更復(fù)雜的解析函數(shù), 即降低了開(kāi)發(fā)難度, 熟悉之后代碼邏輯還挺清晰的.
整個(gè) rrdis-cli 項(xiàng)目實(shí)現(xiàn) set, get, incr, lrange, rpush 和 ping 這基本命令, 實(shí)現(xiàn)其他命令也是非常簡(jiǎn)單; 并且實(shí)現(xiàn)了絕大部分(還有一些特殊錯(cuò)誤情況沒(méi)處理)協(xié)議解析, 整個(gè)項(xiàng)目代碼量如下
tokei . -------------------------------------------------------------------------------Language Files Lines Code Comments Blanks -------------------------------------------------------------------------------Markdown 1 4 4 0 0Rust 3 332 284 20 28TOML 1 15 12 1 2 -------------------------------------------------------------------------------Total 5 351 300 21 30 -------------------------------------------------------------------------------Rust 代碼只有 332 行, 挺簡(jiǎn)潔的, 估計(jì)比我用 Python 實(shí)現(xiàn)都少.
下一篇使用 nom 寫(xiě)什么還不確定, 隨緣更新吧~
怎么說(shuō)也是萬(wàn)字長(zhǎng)文, 如果覺(jué)得文章可以, 請(qǐng)點(diǎn)個(gè)贊, 謝謝~
總結(jié)
以上是生活随笔為你收集整理的腐蚀rust服务器命令_【使用 Rust 写 Parser】2. 解析Redis协议的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 贾跃亭又有新金主 与第九城市董事长已多次
- 下一篇: linux cmake编译源码,linu