当前位置: 首页 > news >正文

并发服务器框架——zinx

zinx框架

Zinx 是一个用 Go 语言编写的高性能、轻量级的 TCP 服务器框架,它被设计为简单、快速且易于使用。Zinx 提供了一系列的功能,包括但不限于连接管理、数据编解码、业务处理、负载均衡等,适用于构建各种 TCP 网络服务,如游戏服务器、即时通讯服务器等。

下面实现zinx的多个功能包括:路由、全局配置、消息封装、读写分离、消息队列、链接管理等。

在这里插入图片描述
utils包下GlobalObj.go

package utilsimport ("datarace/zinx/ziface""encoding/json""io/ioutil"
)/*
存储一切有关Zinx框架的全局参数,供其他模块使用
一些参数也可以通过 用户根据 zinx.json来配置
*/
type GlobalObj struct {TcpServer        ziface.IServer //当前Zinx的全局Server对象Host             string         //当前服务器主机IPTcpPort          int            //当前服务器主机监听端口号Name             string         //当前服务器名称Version          string         //当前Zinx版本号MaxPacketSize    uint32         //都需数据包的最大值MaxConn          int            //当前服务器主机允许的最大链接个数WorkerPoolSize   uint32         //业务工作Worker池的数量MaxWorkerTaskLen uint32         //业务工作Worker对应负责的任务队列最大任务存储数量ConfFilePath     stringMaxMsgChanLen    int
}/*
定义一个全局的对象
*/
var GlobalObject *GlobalObj// 读取用户的配置文件
func (g *GlobalObj) Reload() {data, err := ioutil.ReadFile("zinx.json")if err != nil {panic(err)}//将json数据解析到struct中//fmt.Printf("json :%s\n", data)err = json.Unmarshal(data, &GlobalObject)if err != nil {panic(err)}
}
func init() {//初始化GlobalObject变量,设置一些默认值GlobalObject = &GlobalObj{Name:             "ZinxServerApp",Version:          "V0.4",TcpPort:          7777,Host:             "0.0.0.0",MaxConn:          12000,MaxPacketSize:    4096,ConfFilePath:     "conf/zinx.json",WorkerPoolSize:   10,MaxWorkerTaskLen: 1024,}//从配置文件中加载一些用户配置的参数GlobalObject.Reload()
}

ziface包

package zifacetype IConnManager interface {Add(conn IConnection)                   //添加链接Remove(conn IConnection)                //删除连接Get(connID uint32) (IConnection, error) //利用ConnID获取链接Len() int                               //获取当前连接ClearConn()                             //删除并停止所有链接
}
package zifaceimport "net"type IConnection interface {Start()Stop()GetConnID() uint32GetTCPConnection() *net.TCPConnRemoteAddr() net.AddrSendMsg(msgId uint32, data []byte) error//直接将Message数据发送给远程的TCP客户端(有缓冲)SendBuffMsg(msgId uint32, data []byte) error //添加带缓冲发送消息接口//设置链接属性SetProperty(key string, value interface{})//获取链接属性GetProperty(key string) (interface{}, error)//移除链接属性RemoveProperty(key string)
}
type HandFunc func(*net.TCPConn, []byte, int) error
package zifacetype IDataPack interface {GetHeadLen() int32Pack(msg IMessage) ([]byte, error)Unpack([]byte) (IMessage, error)
}
package zifacetype IMessage interface {GetDataLen() uint32GetMsgId() uint32GetData() []byteSetMsgId(uint32)SetData([]byte)SetDataLen(uint32)
}
package zifacetype IMsgHandle interface {DoMsgHandler(request IRequest)          //马上以非阻塞方式处理消息AddRouter(msgId uint32, router IRouter) //为消息添加具体的处理逻辑StartWorkerPool()                       //启动worker工作池SendMsgToTaskQueue(request IRequest)    //将消息交给TaskQueue,由worker进行处理
}
package zifacetype IRequest interface {GetConnection() IConnectionGetData() []byteGetMsgID() uint32
}
package zifacetype IRouter interface {PreHandle(req IRequest)Handle(req IRequest)PostHandle(req IRequest)
}
package zifacetype IServer interface {Start()Stop()Serve()AddRouter(msgId uint32, router IRouter)//得到链接管理GetConnMgr() IConnManager//设置该Server的连接创建时Hook函数SetOnConnStart(func(IConnection))//设置该Server的连接断开时的Hook函数SetOnConnStop(func(IConnection))//调用连接OnConnStart Hook函数CallOnConnStart(conn IConnection)//调用连接OnConnStop Hook函数CallOnConnStop(conn IConnection)
}

znet包
connection

package znetimport ("datarace/zinx/utils""datarace/zinx/ziface""errors""fmt""io""net""sync"
)type Connection struct {//当前Conn属于哪个ServerTcpServer ziface.IServer //当前conn属于哪个server,在conn初始化的时候添加即可//当前连接的socket TCP套接字Conn *net.TCPConn//当前连接的ID 也可以称作为SessionID,ID全局唯一ConnID uint32//当前连接的关闭状态isClosed bool//消息管理MsgId和对应处理方法的消息管理模块MsgHandler ziface.IMsgHandle//告知该链接已经退出/停止的channelExitBuffChan chan bool//无缓冲管道,用于读、写两个goroutine之间的消息通信msgChan chan []byte//有关冲管道,用于读、写两个goroutine之间的消息通信msgBuffChan chan []byte//链接属性property map[string]interface{}//保护链接属性修改的锁propertyLock sync.RWMutex
}// 创建连接的方法
func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {//初始化Conn属性c := &Connection{TcpServer:    server, //将隶属的server传递进来Conn:         conn,ConnID:       connID,isClosed:     false,MsgHandler:   msgHandler,ExitBuffChan: make(chan bool, 1),msgChan:      make(chan []byte),msgBuffChan:  make(chan []byte, utils.GlobalObject.MaxMsgChanLen), //不要忘记初始化property:     make(map[string]interface{}),                        //对链接属性map初始化}//将新创建的Conn添加到链接管理中c.TcpServer.GetConnMgr().Add(c) //将当前新创建的连接添加到ConnManager中return c
}// 设置链接属性
func (c *Connection) SetProperty(key string, value interface{}) {c.propertyLock.Lock()defer c.propertyLock.Unlock()c.property[key] = value
}// 获取链接属性
func (c *Connection) GetProperty(key string) (interface{}, error) {c.propertyLock.RLock()defer c.propertyLock.RUnlock()if value, ok := c.property[key]; ok {return value, nil} else {return nil, errors.New("no property found")}
}// 移除链接属性
func (c *Connection) RemoveProperty(key string) {c.propertyLock.Lock()defer c.propertyLock.Unlock()delete(c.property, key)
}
func (c *Connection) startReader() {fmt.Println("[Reader Goroutine is running]")defer fmt.Println(c.RemoteAddr().String(), "[conn Reader exit!]")defer c.Stop()for {// 创建拆包解包的对象dp := NewDataPack()//读取客户端的Msg headheadData := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {fmt.Println("read msg head error ", err)break}//拆包,得到msgid 和 datalen 放在msg中msg, err := dp.Unpack(headData)if err != nil {fmt.Println("unpack error ", err)break}//根据 dataLen 读取 data,放在msg.Data中var data []byteif msg.GetDataLen() > 0 {data = make([]byte, msg.GetDataLen())if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {fmt.Println("read msg data error ", err)continue}}msg.SetData(data)//得到当前客户端请求的Request数据req := Request{conn: c,msg:  msg,}//从绑定好的消息和对应的处理方法中执行对应的Handle方法if utils.GlobalObject.WorkerPoolSize > 0 {//已经启动工作池机制,将消息交给Worker处理c.MsgHandler.SendMsgToTaskQueue(&req)} else {//从绑定好的消息和对应的处理方法中执行对应的Handle方法go c.MsgHandler.DoMsgHandler(&req)}}
}// 启动连接,让当前连接开始工作
func (c *Connection) Start() {//1 开启用户从客户端读取数据流程的Goroutinego c.startReader()//2 开启用于写回客户端数据流程的Goroutinego c.StartWriter()//按照用户传递进来的创建连接时需要处理的业务,执行钩子方法c.TcpServer.CallOnConnStart(c)
}//func (c *Connection) Start() {
//	go c.startReader()
//	for {
//		select {
//		case <-c.ExitBuffChan:
//			return
//		}
//	}
//}// 停止连接,结束当前连接状态M
func (c *Connection) Stop() {fmt.Println("Conn Stop()...ConnID = ", c.ConnID)//如果当前链接已经关闭if c.isClosed == true {return}c.isClosed = true//==================//如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用c.TcpServer.CallOnConnStop(c)//==================// 关闭socket链接c.Conn.Close()//关闭Writerc.ExitBuffChan <- true//将链接从连接管理器中删除c.TcpServer.GetConnMgr().Remove(c)//关闭该链接全部管道close(c.ExitBuffChan)close(c.msgBuffChan)
}/*
写消息Goroutine, 用户将数据发送给客户端
*/
func (c *Connection) StartWriter() {fmt.Println("[Writer Goroutine is running]")defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]")for {select {case data := <-c.msgChan://有数据要写给客户端if _, err := c.Conn.Write(data); err != nil {fmt.Println("Send Data error:, ", err, " Conn Writer exit")return}//针对有缓冲channel需要些的数据处理case data, ok := <-c.msgBuffChan:if ok {//有数据要写给客户端if _, err := c.Conn.Write(data); err != nil {fmt.Println("Send Buff Data error:, ", err, " Conn Writer exit")return}} else {breakfmt.Println("msgBuffChan is Closed")}case <-c.ExitBuffChan:return}}
}// 直接将Message数据发送数据给远程的TCP客户端
func (c *Connection) SendMsg(msgId uint32, data []byte) error {if c.isClosed == true {return errors.New("Connection closed when send msg")}//将data封包,并且发送dp := NewDataPack()msg, err := dp.Pack(NewMsgPackage(msgId, data))if err != nil {fmt.Println("Pack error msg id = ", msgId)return errors.New("Pack error msg ")}//写回客户端c.msgChan <- msg //将之前直接回写给conn.Write的方法 改为 发送给Channel 供Writer读取return nil
}// 从当前连接获取原始的socket TCPConn
func (c *Connection) GetTCPConnection() *net.TCPConn {return c.Conn
}// 获取当前连接ID
func (c *Connection) GetConnID() uint32 {return c.ConnID
}// 获取远程客户端地址信息
func (c *Connection) RemoteAddr() net.Addr {return c.Conn.RemoteAddr()
}// // 直接将Message数据发送数据给远程的TCP客户端
//
//	func (c *Connection) SendMsg(msgId uint32, data []byte) error {
//		if c.isClosed == true {
//			return errors.New("Connection closed when send msg")
//		}
//		//将data封包,并且发送
//		dp := NewDataPack()
//		msg, err := dp.Pack(NewMsgPackage(msgId, data))
//		if err != nil {
//			fmt.Println("Pack error msg id = ", msgId)
//			return errors.New("Pack error msg ")
//		}
//		//写回客户端
//		if _, err := c.Conn.Write(msg); err != nil {
//			fmt.Println("Write msg id ", msgId, " error ")
//			c.ExitBuffChan <- true
//			return errors.New("conn Write error")
//		}
//		return nil
//	}
func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error {if c.isClosed == true {return errors.New("Connection closed when send buff msg")}//将data封包,并且发送dp := NewDataPack()msg, err := dp.Pack(NewMsgPackage(msgId, data))if err != nil {fmt.Println("Pack error msg id = ", msgId)return errors.New("Pack error msg ")}//写回客户端c.msgBuffChan <- msgreturn nil
}

ConnManager

package znetimport ("datarace/zinx/ziface""errors""fmt""sync"
)type ConnManager struct {connections map[uint32]ziface.IConnectionconnLock    sync.RWMutex
}/*
创建一个链接管理
*/
func NewConnManager() *ConnManager {return &ConnManager{connections: make(map[uint32]ziface.IConnection),}
}// 添加链接
func (connMgr *ConnManager) Add(conn ziface.IConnection) {//保护共享资源Map 加写锁connMgr.connLock.Lock()defer connMgr.connLock.Unlock()//将conn连接添加到ConnMananger中connMgr.connections[conn.GetConnID()] = connfmt.Println("connection add to ConnManager successfully: conn num = ", connMgr.Len())
}// 删除连接
func (connMgr *ConnManager) Remove(conn ziface.IConnection) {//保护共享资源Map 加写锁connMgr.connLock.Lock()defer connMgr.connLock.Unlock()//删除连接信息delete(connMgr.connections, conn.GetConnID())fmt.Println("connection Remove ConnID=", conn.GetConnID(), " successfully: conn num = ", connMgr.Len())
}// 利用ConnID获取链接
func (connMgr *ConnManager) Get(connID uint32) (ziface.IConnection, error) {//保护共享资源Map 加读锁connMgr.connLock.RLock()defer connMgr.connLock.RUnlock()if conn, ok := connMgr.connections[connID]; ok {return conn, nil} else {return nil, errors.New("connection not found")}
}// 获取当前连接
func (connMgr *ConnManager) Len() int {return len(connMgr.connections)
}// 清除并停止所有连接
func (connMgr *ConnManager) ClearConn() {//保护共享资源Map 加写锁connMgr.connLock.Lock()defer connMgr.connLock.Unlock()//停止并删除全部的连接信息for connID, conn := range connMgr.connections {//停止conn.Stop()//删除delete(connMgr.connections, connID)}fmt.Println("Clear All Connections successfully: conn num = ", connMgr.Len())
}

datapack

package znetimport ("bytes""datarace/zinx/utils""datarace/zinx/ziface""encoding/binary""errors"
)type DataPack struct{}func NewDataPack() *DataPack {return &DataPack{}
}
func (dp *DataPack) GetHeadLen() uint32 {//Id uint32(4字节) +  DataLen uint32(4字节)return 8
}// 封包方法(压缩数据)
func (dp *DataPack) Pack(msg ziface.IMessage) ([]byte, error) {//创建一个存放bytes字节的缓冲dataBuff := bytes.NewBuffer([]byte{})//写dataLenif err := binary.Write(dataBuff, binary.LittleEndian, msg.GetDataLen()); err != nil {return nil, err}//写msgIDif err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgId()); err != nil {return nil, err}//写data数据if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetData()); err != nil {return nil, err}return dataBuff.Bytes(), nil
}
func (dp *DataPack) Unpack(binaryData []byte) (ziface.IMessage, error) {//创建一个从输入二进制数据的ioReaderdataBuff := bytes.NewReader(binaryData)//只解压head的信息,得到dataLen和msgIDmsg := &Message{}//读dataLenif err := binary.Read(dataBuff, binary.LittleEndian, &msg.DataLen); err != nil {return nil, err}//读msgIDif err := binary.Read(dataBuff, binary.LittleEndian, &msg.Id); err != nil {return nil, err}//判断dataLen的长度是否超出我们允许的最大包长度if utils.GlobalObject.MaxPacketSize > 0 && msg.DataLen > utils.GlobalObject.MaxPacketSize {return nil, errors.New("Too large msg data recieved")}//这里只需要把head的数据拆包出来就可以了,然后再通过head的长度,再从conn读取一次数据return msg, nil
}

message

package znettype Message struct {Id      uint32DataLen uint32Data    []byte
}func NewMsgPackage(id uint32, data []byte) *Message {return &Message{Id:      id,DataLen: uint32(len(data)),Data:    data,}
}// 获取消息数据段长度
func (msg *Message) GetDataLen() uint32 {return msg.DataLen
}// 获取消息ID
func (msg *Message) GetMsgId() uint32 {return msg.Id
}// 获取消息内容
func (msg *Message) GetData() []byte {return msg.Data
}// 设置消息数据段长度
func (msg *Message) SetDataLen(len uint32) {msg.DataLen = len
}// 设计消息ID
func (msg *Message) SetMsgId(msgId uint32) {msg.Id = msgId
}// 设计消息内容
func (msg *Message) SetData(data []byte) {msg.Data = data
}

msgHandler

package znetimport ("datarace/zinx/utils""datarace/zinx/ziface""fmt""strconv"
)type MsgHandle struct {Apis           map[uint32]ziface.IRouterWorkerPoolSize uint32TaskQueue      []chan ziface.IRequest
}func NewMsgHandle() *MsgHandle {return &MsgHandle{Apis:           make(map[uint32]ziface.IRouter),WorkerPoolSize: utils.GlobalObject.WorkerPoolSize,TaskQueue:      make([]chan ziface.IRequest, utils.GlobalObject.WorkerPoolSize),}
}// 马上以非阻塞方式处理消息
func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) {handler, ok := mh.Apis[request.GetMsgID()]if !ok {fmt.Println("api msgId = ", request.GetMsgID(), " is not FOUND!")return}//执行对应处理方法handler.PreHandle(request)handler.Handle(request)handler.PostHandle(request)
}// 为消息添加具体的处理逻辑
func (mh *MsgHandle) AddRouter(msgId uint32, router ziface.IRouter) {//1 判断当前msg绑定的API处理方法是否已经存在if _, ok := mh.Apis[msgId]; ok {panic("repeated api , msgId = " + strconv.Itoa(int(msgId)))}//2 添加msg与api的绑定关系mh.Apis[msgId] = routerfmt.Println("Add api msgId = ", msgId)
}// 启动一个Worker工作流程
func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) {fmt.Println("Worker ID = ", workerID, " is started.")//不断的等待队列中的消息for {select {//有消息则取出队列的Request,并执行绑定的业务方法case request := <-taskQueue:mh.DoMsgHandler(request)}}
}// 启动worker工作池
func (mh *MsgHandle) StartWorkerPool() {//遍历需要启动worker的数量,依此启动for i := 0; i < int(mh.WorkerPoolSize); i++ {//一个worker被启动//给当前worker对应的任务队列开辟空间mh.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen)//启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来go mh.StartOneWorker(i, mh.TaskQueue[i])}
}// 将消息交给TaskQueue,由worker进行处理
func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) {//根据ConnID来分配当前的连接应该由哪个worker负责处理//轮询的平均分配法则//得到需要处理此条连接的workerIDworkerID := request.GetConnection().GetConnID() % mh.WorkerPoolSizefmt.Println("Add ConnID=", request.GetConnection().GetConnID(), " request msgID=", request.GetMsgID(), "to workerID=", workerID)//将请求消息发送给任务队列mh.TaskQueue[workerID] <- request
}

request

package znetimport ("datarace/zinx/ziface"
)type Request struct {conn ziface.IConnectionmsg  ziface.IMessage
}func (r *Request) GetConnection() ziface.IConnection {return r.conn
}// 获取请求消息的数据
func (r *Request) GetData() []byte {return r.msg.GetData()
}// 获取请求的消息的ID
func (r *Request) GetMsgID() uint32 {return r.msg.GetMsgId()
}

router

package znetimport "datarace/zinx/ziface"type BaseRouter struct{}func (br *BaseRouter) PreHandle(request ziface.IRequest)  {}
func (br *BaseRouter) Handle(request ziface.IRequest)     {}
func (br *BaseRouter) PostHandle(request ziface.IRequest) {}

server

package znetimport ("datarace/zinx/utils""datarace/zinx/ziface""fmt""net""time"
)// iServer 接口实现,定义一个Server服务类
type Server struct {//服务器的名称Name string//tcp4 or otherIPVersion string//服务绑定的IP地址IP string//服务绑定的端口Port int//当前Server的消息管理模块,用来绑定MsgId和对应的处理方法msgHandler ziface.IMsgHandle//当前Server的链接管理器ConnMgr ziface.IConnManager//新增两个hook函数原型//该Server的连接创建时Hook函数OnConnStart func(conn ziface.IConnection)//该Server的连接断开时的Hook函数OnConnStop func(conn ziface.IConnection)
}// 得到链接管理
func (s *Server) GetConnMgr() ziface.IConnManager {return s.ConnMgr
}// ============== 实现 ziface.IServer 里的全部接口方法 ========
// 开启网络服务
func (s *Server) Start() {fmt.Printf("[START] Server listenner at IP: %s, Port %d, is starting\n", s.IP, s.Port)fmt.Printf("[START] Server name: %s,listenner at IP: %s, Port %d is starting\n", s.Name, s.IP, s.Port)fmt.Printf("[Zinx] Version: %s, MaxConn: %d,  MaxPacketSize: %d\n",utils.GlobalObject.Version,utils.GlobalObject.MaxConn,utils.GlobalObject.MaxPacketSize)//开启一个go去做服务端Linster业务go func() {//1 获取一个TCP的Addrs.msgHandler.StartWorkerPool()addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))if err != nil {fmt.Println("resolve tcp addr err: ", err)return}//2 监听服务器地址listenner, err := net.ListenTCP(s.IPVersion, addr)if err != nil {fmt.Println("listen", s.IPVersion, "err", err)return}//已经监听成功fmt.Println("start Zinx server  ", s.Name, " succ, now listenning...")var cid uint32cid = 0//3 启动server网络连接业务for {//3.1 阻塞等待客户端建立连接请求conn, err := listenner.AcceptTCP()if err != nil {fmt.Println("Accept err ", err)continue}//=============//3.2 设置服务器最大连接控制,如果超过最大连接,那么则关闭此新的连接if s.ConnMgr.Len() >= utils.GlobalObject.MaxConn {conn.Close()continue}//=============//3.3 处理该新连接请求的 业务 方法, 此时应该有 handler 和 conn是绑定的dealConn := NewConntion(s, conn, cid, s.msgHandler)cid++//3.4 启动当前链接的处理业务go dealConn.Start()//go func() {//	//不断的循环从客户端获取数据//	for {//		buf := make([]byte, 512)//		cnt, err := conn.Read(buf)//		if err != nil {//			fmt.Println("recv buf err ", err)//			continue//		}//		//回显//		if _, err := conn.Write(buf[:cnt]); err != nil {//			fmt.Println("write back buf err ", err)//			continue//		}//	}//}()}}()
}
func (s *Server) Stop() {fmt.Println("[STOP] Zinx server , name ", s.Name)//将其他需要清理的连接信息或者其他信息 也要一并停止或者清理s.ConnMgr.ClearConn()
}
func (s *Server) Serve() {s.Start()//TODO Server.Serve() 是否在启动服务的时候 还要处理其他的事情呢 可以在这里添加//阻塞,否则主Go退出, listenner的go将会退出for {time.Sleep(10 * time.Second)}
}
func (s *Server) AddRouter(msgId uint32, router ziface.IRouter) {s.msgHandler.AddRouter(msgId, router)fmt.Println("Add Router SUCC!! msgID = ", msgId)
}/*
创建一个服务器句柄
*/
func NewServer() *Server {utils.GlobalObject.Reload()s := &Server{Name:       utils.GlobalObject.Name,IPVersion:  "tcp4",IP:         utils.GlobalObject.Host,Port:       utils.GlobalObject.TcpPort,msgHandler: NewMsgHandle(),   //msgHandler 初始化ConnMgr:    NewConnManager(), //创建ConnManage}return s
}// 设置该Server的连接创建时Hook函数
func (s *Server) SetOnConnStart(hookFunc func(ziface.IConnection)) {s.OnConnStart = hookFunc
}// 设置该Server的连接断开时的Hook函数
func (s *Server) SetOnConnStop(hookFunc func(ziface.IConnection)) {s.OnConnStop = hookFunc
}// 调用连接OnConnStart Hook函数
func (s *Server) CallOnConnStart(conn ziface.IConnection) {if s.OnConnStart != nil {fmt.Println("---> CallOnConnStart....")s.OnConnStart(conn)}
}// 调用连接OnConnStop Hook函数
func (s *Server) CallOnConnStop(conn ziface.IConnection) {if s.OnConnStop != nil {fmt.Println("---> CallOnConnStop....")s.OnConnStop(conn)}
}

客户端

package mainimport ("datarace/zinx/znet""fmt""io""net""time"
)/*
模拟客户端
*/
func main() {fmt.Println("Client Test ... start")//3秒之后发起测试请求,给服务端开启服务的机会time.Sleep(3 * time.Second)conn, err := net.Dial("tcp", "127.0.0.1:7777")if err != nil {fmt.Println("client start err, exit!")return}for {//发封包message消息dp := znet.NewDataPack()msg, _ := dp.Pack(znet.NewMsgPackage(0, []byte("Zinx V0.8 Client0 Test Message")))_, err := conn.Write(msg)if err != nil {fmt.Println("write error err ", err)return}//先读出流中的head部分headData := make([]byte, dp.GetHeadLen())_, err = io.ReadFull(conn, headData) //ReadFull 会把msg填充满为止if err != nil {fmt.Println("read head error")break}//将headData字节流 拆包到msg中msgHead, err := dp.Unpack(headData)if err != nil {fmt.Println("server unpack err:", err)return}if msgHead.GetDataLen() > 0 {//msg 是有data数据的,需要再次读取data数据msg := msgHead.(*znet.Message)msg.Data = make([]byte, msg.GetDataLen())//根据dataLen从io中读取字节流_, err := io.ReadFull(conn, msg.Data)if err != nil {fmt.Println("server unpack data err:", err)return}fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data))}time.Sleep(1 * time.Second)}
}

服务端

package mainimport ("datarace/zinx/ziface""datarace/zinx/znet""fmt"
)// ping test 自定义路由
type PingRouter struct {znet.BaseRouter
}// Ping Handle
func (this *PingRouter) Handle(request ziface.IRequest) {fmt.Println("Call PingRouter Handle")//先读取客户端的数据,再回写ping...ping...pingfmt.Println("recv from client : msgId=", request.GetMsgID(), ", data=", string(request.GetData()))err := request.GetConnection().SendBuffMsg(0, []byte("ping...ping...ping"))if err != nil {fmt.Println(err)}
}type HelloZinxRouter struct {znet.BaseRouter
}// HelloZinxRouter Handle
func (this *HelloZinxRouter) Handle(request ziface.IRequest) {fmt.Println("Call HelloZinxRouter Handle")//先读取客户端的数据,再回写ping...ping...pingfmt.Println("recv from client : msgId=", request.GetMsgID(), ", data=", string(request.GetData()))err := request.GetConnection().SendBuffMsg(1, []byte("Hello Zinx Router V0.10"))if err != nil {fmt.Println(err)}
}// 创建连接的时候执行
func DoConnectionBegin(conn ziface.IConnection) {fmt.Println("DoConnecionBegin is Called ... ")//=============设置两个链接属性,在连接创建之后===========fmt.Println("Set conn Name, Home done!")conn.SetProperty("Name", "Aceld")conn.SetProperty("Home", "https://www.jianshu.com/u/35261429b7f1")//===================================================err := conn.SendMsg(2, []byte("DoConnection BEGIN..."))if err != nil {fmt.Println(err)}
}// 连接断开的时候执行
func DoConnectionLost(conn ziface.IConnection) {//============在连接销毁之前,查询conn的Name,Home属性=====if name, err := conn.GetProperty("Name"); err == nil {fmt.Println("Conn Property Name = ", name)}if home, err := conn.GetProperty("Home"); err == nil {fmt.Println("Conn Property Home = ", home)}//===================================================fmt.Println("DoConneciotnLost is Called ... ")
}
func main() {//创建一个server句柄s := znet.NewServer()//注册链接hook回调函数s.SetOnConnStart(DoConnectionBegin)s.SetOnConnStop(DoConnectionLost)//配置路由s.AddRouter(0, &PingRouter{})s.AddRouter(1, &HelloZinxRouter{})//开启服务s.Serve()
}

在这里插入图片描述
在这里插入图片描述

http://www.lryc.cn/news/516113.html

相关文章:

  • Unity 中计算射线和平面相交距离的原理
  • 浅谈棋牌游戏开发流程七:反外挂与安全体系——守护游戏公平与玩家体验
  • 《无力逃脱》V1.0.15.920(59069)官方中文版
  • 六种主流服务器的选择与使用
  • TiDB 升级至高版本提示'mysql.tidb_runaway_watch' doesn't exist 问题处理
  • GRU-PFG:利用图神经网络从股票因子中提取股票间相关性
  • 数字化供应链创新解决方案在零售行业的应用研究——以开源AI智能名片S2B2C商城小程序为例
  • 安卓Activity执行finish后onNewIntent也执行了
  • 数据结构.期末复习.学习笔记(c语言)
  • Kafaka安装与启动教程
  • 根据docker file 编译镜像
  • 联邦学习的 AI 大模型微调中,加性、选择性、重参数化和混合微调
  • android 外挂modem模块实现Telephony相关功能(上网,发短信,打电话)
  • 【计算机视觉技术 - 人脸生成】2.GAN网络的构建和训练
  • 数据中台与数据治理服务方案[50页PPT]
  • 【Qt】将控件均匀分布到圆环上
  • 第四、五章补充:线代本质合集(B站:小崔说数)
  • 2025年贵州省职业院校技能大赛信息安全管理与评估赛项规程
  • 松鼠状态机流转-@Transit
  • 微信小程序调用 WebAssembly 烹饪指南
  • # LeetCode Problem 2038: 如果相邻两个颜色均相同则删除当前颜色 (Winner of the Game)
  • Redis面试相关
  • 4.CSS文本属性
  • Mongo高可用架构解决方案
  • Rabbitmq 业务异常与未手动确认场景及解决方案
  • linux,centos7.6安装禅道
  • java基础之代理
  • 计算机网络——期末复习(6)期末考试样例2(含答案)
  • JavaScript 获取DOM对象
  • 一文讲明白朴素贝叶斯算法及其计算公式(入门普及)