GO实现Redis:GO实现Redis协议解析器(2)
- 本文实现Redis的协议层,协议层负责解析指令,然后将指令交给核心database执行
- echo database用来测试协议层的代码
- https://github.com/csgopher/go-redis
RESP协议
RESP是客户端与服务端通信的协议,格式有五种:
正常回复:以“+”开头,以“\r\n”结尾的字符串形式
错误回复:以“-”开头,以“\r\n”结尾的字符串形式
整数:以“:”开头,以“\r\n”结尾的字符串形式
多行字符串:以“$”开头,后跟实际发送字节数,再以“\r\n”开头和结尾
$3\r\nabc\r\n
数组:以“*”开头,后跟成员个数
SET key value
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
客户端和服务器发送的命令或数据一律以 \r\n (CRLF)作为换行符。
当我们输入*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n这样一串命令,服务端接收到的是如下的命令:
*3\r\n
$3\r\n
SET\r\n
$3\r\n
key\r\n
$5\r\n
value\r\n
interface/resp/conn.go
type Connection interface {Write([]byte) errorGetDBIndex() intSelectDB(int)
}interface/resp/reply.go
type Reply interface {ToBytes() []byte
}
Connection接口:Redis客户端的一个连接
Write:给客户端回复消息
GetDBIndex:Redis有16个DB
Reply接口:响应接口
resp/reply/consts.go
type PongReply struct{}var pongBytes = []byte("+PONG\r\n")func (r *PongReply) ToBytes() []byte {return pongBytes
}var thePongReply = new(PongReply)func MakePongReply() *PongReply {return thePongReply
}type OkReply struct{}var okBytes = []byte("+OK\r\n")func (r *OkReply) ToBytes() []byte {return okBytes
}var theOkReply = new(OkReply)func MakeOkReply() *OkReply {return theOkReply
}var nullBulkBytes = []byte("$-1\r\n")type NullBulkReply struct{}func (r *NullBulkReply) ToBytes() []byte {return nullBulkBytes
}func MakeNullBulkReply() *NullBulkReply {return &NullBulkReply{}
}var emptyMultiBulkBytes = []byte("*0\r\n")type EmptyMultiBulkReply struct{}func (r *EmptyMultiBulkReply) ToBytes() []byte {return emptyMultiBulkBytes
}type NoReply struct{}var noBytes = []byte("")func (r *NoReply) ToBytes() []byte {return noBytes
}
定义五种回复:回复pong,ok,null,空数组,空
resp/reply/reply.go
type ErrorReply interface {Error() stringToBytes() []byte
}
ErrorReply:定义错误接口
resp/reply/errors.go
type UnknownErrReply struct{}var unknownErrBytes = []byte("-Err unknown\r\n")func (r *UnknownErrReply) ToBytes() []byte {return unknownErrBytes
}func (r *UnknownErrReply) Error() string {return "Err unknown"
}type ArgNumErrReply struct {Cmd string
}func (r *ArgNumErrReply) ToBytes() []byte {return []byte("-ERR wrong number of arguments for '" + r.Cmd + "' command\r\n")
}func (r *ArgNumErrReply) Error() string {return "ERR wrong number of arguments for '" + r.Cmd + "' command"
}func MakeArgNumErrReply(cmd string) *ArgNumErrReply {return &ArgNumErrReply{Cmd: cmd,}
}type SyntaxErrReply struct{}var syntaxErrBytes = []byte("-Err syntax error\r\n")
var theSyntaxErrReply = &SyntaxErrReply{}func MakeSyntaxErrReply() *SyntaxErrReply {return theSyntaxErrReply
}func (r *SyntaxErrReply) ToBytes() []byte {return syntaxErrBytes
}func (r *SyntaxErrReply) Error() string {return "Err syntax error"
}type WrongTypeErrReply struct{}var wrongTypeErrBytes = []byte("-WRONGTYPE Operation against a key holding the wrong kind of value\r\n")func (r *WrongTypeErrReply) ToBytes() []byte {return wrongTypeErrBytes
}func (r *WrongTypeErrReply) Error() string {return "WRONGTYPE Operation against a key holding the wrong kind of value"
}type ProtocolErrReply struct {Msg string
}func (r *ProtocolErrReply) ToBytes() []byte {return []byte("-ERR Protocol error: '" + r.Msg + "'\r\n")
}func (r *ProtocolErrReply) Error() string {return "ERR Protocol error: '" + r.Msg
}
errors定义5种错误:UnknownErrReply 未知错误,ArgNumErrReply 参数个数错误,SyntaxErrReply 语法错误,WrongTypeErrReply 数据类型错误,ProtocolErrReply 协议错误
resp/reply/reply.go
var (nullBulkReplyBytes = []byte("$-1")// 协议的结尾CRLF = "\r\n"
)type BulkReply struct {Arg []byte
}func MakeBulkReply(arg []byte) *BulkReply {return &BulkReply{Arg: arg,}
}func (r *BulkReply) ToBytes() []byte {if len(r.Arg) == 0 {return nullBulkReplyBytes}return []byte("$" + strconv.Itoa(len(r.Arg)) + CRLF + string(r.Arg) + CRLF)
}type MultiBulkReply struct {Args [][]byte
}func (r *MultiBulkReply) ToBytes() []byte {argLen := len(r.Args)var buf bytes.Bufferbuf.WriteString("*" + strconv.Itoa(argLen) + CRLF)for _, arg := range r.Args {if arg == nil {buf.WriteString("$-1" + CRLF)} else {buf.WriteString("$" + strconv.Itoa(len(arg)) + CRLF + string(arg) + CRLF)}}return buf.Bytes()
}func MakeMultiBulkReply(args [][]byte) *MultiBulkReply {return &MultiBulkReply{Args: args,}
}type StatusReply struct {Status string
}func MakeStatusReply(status string) *StatusReply {return &StatusReply{Status: status,}
}func (r *StatusReply) ToBytes() []byte {return []byte("+" + r.Status + CRLF)
}type IntReply struct {Code int64
}func MakeIntReply(code int64) *IntReply {return &IntReply{Code: code,}
}func (r *IntReply) ToBytes() []byte {return []byte(":" + strconv.FormatInt(r.Code, 10) + CRLF)
}type StandardErrReply struct {Status string
}func (r *StandardErrReply) ToBytes() []byte {return []byte("-" + r.Status + CRLF)
}func (r *StandardErrReply) Error() string {return r.Status
}func MakeErrReply(status string) *StandardErrReply {return &StandardErrReply{Status: status,}
}func IsErrorReply(reply resp.Reply) bool {return reply.ToBytes()[0] == '-'
}
BulkReply:回复一个字符串
MultiBulkReply:回复字符串数组
StatusReply:状态回复
IntReply:数字回复
StandardErrReply:标准错误回复
IsErrorReply:判断是否为错误回复
ToBytes:将字符串转成RESP协议规定的格式
resp/parser/parser.go
type Payload struct {Data resp.ReplyErr error
}type readState struct {readingMultiLine bool expectedArgsCount int msgType byte args [][]byte bulkLen int64
}func (s *readState) finished() bool {return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount
}func ParseStream(reader io.Reader) <-chan *Payload {ch := make(chan *Payload)go parse0(reader, ch)return ch
}func parse0(reader io.Reader, ch chan<- *Payload) {......
}
Payload结构体:客服端给我们发的数据
- Reply:客户端与服务端互相发的数据都称为Reply
readState结构体:
- readingMultiLine:解析单行还是多行数据
- expectedArgsCount:应该读取的参数个数
- msgType:消息类型
- args:消息内容
- bulkLen:数据长度
finished方法:判断解析是否完成
ParseStream方法:异步解析数据后放入管道,返回管道数据
func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {var msg []bytevar err errorif state.bulkLen == 0 {msg, err = bufReader.ReadBytes('\n')if err != nil {return nil, true, err}if len(msg) == 0 || msg[len(msg)-2] != '\r' {return nil, false, errors.New("protocol error: " + string(msg))}} else {msg = make([]byte, state.bulkLen+2)_, err = io.ReadFull(bufReader, msg)if err != nil {return nil, true, err}if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' {return nil, false, errors.New("protocol error: " + string(msg))}state.bulkLen = 0}return msg, false, nil
}
readLine:一行一行的读取。读正常的行,以\n分隔。读正文中包含\r\n字符的行时,state.bulkLen加上换行符\r\n(state.bulkLen+2)
func parseMultiBulkHeader(msg []byte, state *readState) error {var err errorvar expectedLine uint64expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)if err != nil {return errors.New("protocol error: " + string(msg))}if expectedLine == 0 {state.expectedArgsCount = 0return nil} else if expectedLine > 0 {state.msgType = msg[0]state.readingMultiLine = truestate.expectedArgsCount = int(expectedLine)state.args = make([][]byte, 0, expectedLine)return nil} else {return errors.New("protocol error: " + string(msg))}
}func parseBulkHeader(msg []byte, state *readState) error {var err errorstate.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)if err != nil {return errors.New("protocol error: " + string(msg))}if state.bulkLen == -1 { // null bulkreturn nil} else if state.bulkLen > 0 {state.msgType = msg[0]state.readingMultiLine = truestate.expectedArgsCount = 1state.args = make([][]byte, 0, 1)return nil} else {return errors.New("protocol error: " + string(msg))}
}
parseMultiBulkHeader:解析数组的头部,设置期望的行数和相关参数。
parseBulkHeader:解析多行字符串的头部。
func parseSingleLineReply(msg []byte) (resp.Reply, error) {str := strings.TrimSuffix(string(msg), "\r\n")var result resp.Replyswitch msg[0] {case '+': // status replyresult = reply.MakeStatusReply(str[1:])case '-': // err replyresult = reply.MakeErrReply(str[1:])case ':': // int replyval, err := strconv.ParseInt(str[1:], 10, 64)if err != nil {return nil, errors.New("protocol error: " + string(msg))}result = reply.MakeIntReply(val)}return result, nil
}func readBody(msg []byte, state *readState) error {line := msg[0 : len(msg)-2]var err errorif line[0] == '$' {// bulk replystate.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64)if err != nil {return errors.New("protocol error: " + string(msg))}if state.bulkLen <= 0 { // null bulk in multi bulksstate.args = append(state.args, []byte{})state.bulkLen = 0}} else {state.args = append(state.args, line)}return nil
}
parseSingleLineReply:解析单行命令
readBody:读取多行的命令,如果是开头,设置bulkLen,读取下一行时根据这个+2,不是开头,设置bulkLen,读取下一行时根据这个+2,不是开头,设置bulkLen,读取下一行时根据这个+2,不是开头则直接添加到args
func parse0(reader io.Reader, ch chan<- *Payload) {defer func() {if err := recover(); err != nil {logger.Error(string(debug.Stack()))}}()bufReader := bufio.NewReader(reader)var state readStatevar err errorvar msg []bytefor {var ioErr boolmsg, ioErr, err = readLine(bufReader, &state)if err != nil {if ioErr {ch <- &Payload{Err: err,}close(ch)return}ch <- &Payload{Err: err,}state = readState{}continue}if !state.readingMultiLine {if msg[0] == '*' {// multi bulk replyerr = parseMultiBulkHeader(msg, &state)if err != nil {ch <- &Payload{Err: errors.New("protocol error: " + string(msg)),}state = readState{}continue}if state.expectedArgsCount == 0 {ch <- &Payload{Data: &reply.EmptyMultiBulkReply{},}state = readState{}continue}} else if msg[0] == '$' { // bulk replyerr = parseBulkHeader(msg, &state)if err != nil {ch <- &Payload{Err: errors.New("protocol error: " + string(msg)),}state = readState{} // reset statecontinue}if state.bulkLen == -1 { // null bulk replych <- &Payload{Data: &reply.NullBulkReply{},}state = readState{} // reset statecontinue}} else {// single line replyresult, err := parseSingleLineReply(msg)ch <- &Payload{Data: result,Err: err,}state = readState{} // reset statecontinue}} else {// read bulk replyerr = readBody(msg, &state)if err != nil {ch <- &Payload{Err: errors.New("protocol error: " + string(msg)),}state = readState{} // reset statecontinue}// if sending finishedif state.finished() {var result resp.Replyif state.msgType == '*' {result = reply.MakeMultiBulkReply(state.args)} else if state.msgType == '$' {result = reply.MakeBulkReply(state.args[0])}ch <- &Payload{Data: result,Err: err,}state = readState{}}}}
}
parse0:解析指令,解析完成后通过channel发出去
resp/connection/conn.go
type Connection struct {conn net.ConnwaitingReply wait.Waitmu sync.Mutex // 避免多个协程往客户端中写selectedDB int
}func NewConn(conn net.Conn) *Connection {return &Connection{conn: conn,}
}func (c *Connection) RemoteAddr() net.Addr {return c.conn.RemoteAddr()
}func (c *Connection) Close() error {c.waitingReply.WaitWithTimeout(10 * time.Second)_ = c.conn.Close()return nil
}func (c *Connection) Write(b []byte) error {if len(b) == 0 {return nil}c.mu.Lock()c.waitingReply.Add(1)defer func() {c.waitingReply.Done()c.mu.Unlock()}()_, err := c.conn.Write(b)return err
}func (c *Connection) GetDBIndex() int {return c.selectedDB
}func (c *Connection) SelectDB(dbNum int) {c.selectedDB = dbNum
}
之前写的EchoHandler是用户传过来什么,我们传回去什么。现在要写一个RespHandler来代替EchoHandler,让解析器来解析。RespHandler中要有一个管理客户端连接的结构体Connection。
Connection:客户端连接,在协议层的handler中会用到
resp/handler/handler.go
var (unknownErrReplyBytes = []byte("-ERR unknown\r\n")
)type RespHandler struct {activeConn sync.Mapdb databaseface.Databaseclosing atomic.Boolean
}func MakeHandler() *RespHandler {var db databaseface.Databasedb = database.NewEchoDatabase()return &RespHandler{db: db,}
}func (h *RespHandler) closeClient(client *connection.Connection) {_ = client.Close()h.db.AfterClientClose(client)h.activeConn.Delete(client)
}func (h *RespHandler) Handle(ctx context.Context, conn net.Conn) {if h.closing.Get() {// closing handler refuse new connection_ = conn.Close()}client := connection.NewConn(conn)h.activeConn.Store(client, 1)ch := parser.ParseStream(conn)for payload := range ch {if payload.Err != nil {if payload.Err == io.EOF ||payload.Err == io.ErrUnexpectedEOF ||strings.Contains(payload.Err.Error(), "use of closed network connection") {// connection closedh.closeClient(client)logger.Info("connection closed: " + client.RemoteAddr().String())return}// protocol errerrReply := reply.MakeErrReply(payload.Err.Error())err := client.Write(errReply.ToBytes())if err != nil {h.closeClient(client)logger.Info("connection closed: " + client.RemoteAddr().String())return}continue}if payload.Data == nil {logger.Error("empty payload")continue}r, ok := payload.Data.(*reply.MultiBulkReply)if !ok {logger.Error("require multi bulk reply")continue}result := h.db.Exec(client, r.Args)if result != nil {_ = client.Write(result.ToBytes())} else {_ = client.Write(unknownErrReplyBytes)}}
}func (h *RespHandler) Close() error {logger.Info("handler shutting down...")h.closing.Set(true)// TODO: concurrent waith.activeConn.Range(func(key interface{}, val interface{}) bool {client := key.(*connection.Connection)_ = client.Close()return true})h.db.Close()return nil
}
RespHandler:和之前的echo类似,加了核心层的db.exec执行解析的指令
interface/database/database.go
type CmdLine = [][]bytetype Database interface {Exec(client resp.Connection, args [][]byte) resp.ReplyAfterClientClose(c resp.Connection)Close()
}type DataEntity struct {Data interface{}
}
Exec:核心层的执行
AfterClientClose:关闭之后的善后方法
CmdLine:二维字节数组的指令别名
DataEntity:表示Redis的数据,包括string, list, set等等
database/echo_database.go
type EchoDatabase struct {
}func NewEchoDatabase() *EchoDatabase {return &EchoDatabase{}
}func (e EchoDatabase) Exec(client resp.Connection, args [][]byte) resp.Reply {return reply.MakeMultiBulkReply(args)
}func (e EchoDatabase) AfterClientClose(c resp.Connection) {logger.Info("EchoDatabase AfterClientClose")
}func (e EchoDatabase) Close() {logger.Info("EchoDatabase Close")
}
echo_database:测试协议层
Exec:指令解析后,再使用MakeMultiBulkReply包装一下返回去
main.go
err := tcp.ListenAndServeWithSignal(&tcp.Config{Address: fmt.Sprintf("%s:%d",config.Properties.Bind,config.Properties.Port),},handler.MakeHandler())
if err != nil {logger.Error(err)
}
main改成刚才写的:handler.MakeHandler()