当前位置: 首页 > news >正文

skynet 游戏服务器探索(1)--熟悉skynet(网络)

因为做游戏服务器开发,大多数都跟脚本打交道,要么是lua,要么是python,要么是php,方便热更新的只有lua与php, php相关的游戏服务器开发,参考我另外的文章

https://blog.csdn.net/guoyilongedu/article/details/121049511

lua脚本相关的,自己也实现了一套,只不过不支持多线程,单进程的,基于libevent与luajit,上线过两款游戏项目,功能是挺全的,支持mysql,redis,定时任务相关。后面会贴出来,一起参考一下。

学skynet,因为skynet太出名了,是所有游戏服务器开发人员都绕不过去的,有必要或多或少熟悉一些,最好能自己动手实操一下,才能真正感受到skynet的魅力。

https://github.com/cloudwu/skynet

下载zip 文件,解压

cd skynet    #进入skynet目录
make linux    #编译

注意 因为skynet 用到c++11 相关的特性,所以gcc 必须支持C++11,所以版本必须大于4.9

初始怎么用 参考这篇文章

https://blog.csdn.net/qq_37717687/article/details/121766657

我们要关注的是游戏服务器的搭建,所以第一步 我们要关注的skynet 网络功能

之前用上面链接里面的一个echo 例子 试了一下,发现 一个问题 ,读取出来的数据包不完整 ,代码如下

local skynet = require "skynet"
local socket = require "skynet.socket"local clients = {}function connect(fd, addr)--启用连接,开始等待接收客户端消息print(fd .. " connected addr:" .. addr)socket.start(fd)clients[fd] = {}--消息处理while true dolocal readdata = socket.read(fd) --利用协程实现阻塞模式--正常接收if readdata ~= nil thenprint(fd .. " recv " .. readdata)for k,v in pairs(clients) do --广播socket.write(k, readdata)end--断开连接else print(fd .. " close ")socket.close(fd)clients[fd] = nilend    end
endskynet.start(function()local listenfd = socket.listen("0.0.0.0", 8888) --监听所有ip,端口8888socket.start(listenfd, connect) --新客户端发起连接时,conncet方法将被调用。
end)

这就很要命,研究了一下,是因为 skynet 的网络消息格式是固定的 2字节头(消息长度)+消息内容。

skynet 网络相关的是流式读取,也就是尝试读取的,一个数据包过来,可能要读好几次,至于为什么这么设计,可能是与内存池有关,内存池的分配都是按块的,所以才分批读取,后面单独再研究

单个socket每次从内核尝试读取的数据字节数为sz(第6行),这个值保存在s->p.size中,初始是MIN_READ_BUFFER(64b),当实际读到的数据等于sz时,sz扩大一倍(8-9行);如果小于sz的一半,则设置sz为原来的一半(10-11行)。

比如,客户端发了一个1kb的数据,socket线程会从内核里依次读取64b,128b,256b,512b,64b数据,总共需读取5次,即会向gateserver服务发5条消息,一个TCP包被切割成5个数据块。第5次尝试读取1024b数据,所以可能会读到其他TCP包的数据(只要客户端有发送其他数据)。接下来,客户端再发一个1kb的数据,socket线程只需从内核读取一次即可。

https://www.cnblogs.com/cnxkey/articles/15945319.html

static int
forward_message_tcp(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message * result) {int sz = s->p.size;char * buffer = MALLOC(sz);int n = (int)read(s->fd, buffer, sz);if (n<0) {FREE(buffer);switch(errno) {case EINTR:break;case AGAIN_WOULDBLOCK:skynet_error(NULL, "socket-server: EAGAIN capture.");break;default:// close when errorforce_close(ss, s, l, result);result->data = strerror(errno);return SOCKET_ERR;}return -1;}if (n==0) {FREE(buffer);force_close(ss, s, l, result);return SOCKET_CLOSE;}if (s->type == SOCKET_TYPE_HALFCLOSE) {// discard recv dataFREE(buffer);return -1;}stat_read(ss,s,n);if (n == sz) {s->p.size *= 2;} else if (sz > MIN_READ_BUFFER && n*2 < sz) {s->p.size /= 2;}result->opaque = s->opaque;result->id = s->id;result->ud = n;result->data = buffer;return SOCKET_DATA;
}

第一次读取的字节 就是 #define MIN_READ_BUFFER 64

每次读满之后,字节扩大一倍

 if (n == sz) {s->p.size *= 2;
}

了解了这里 发现 改成如下这样

local skynet = require "skynet"local socket = require "skynet.socket"
local protobuf = require "protobuf" 
local cjson = require "cjson"local clients = {}
local logic_service = nil-- skynet.register_protocol {-- name = "client",-- id = skynet.PTYPE_CLIENT,    -- unpack = skynet.tostring,   --- 将C point 转换为lua 二进制字符串
-- }function connect(fd, addr)-- 启用连接 开始等待接收客户端消息print(" fd = ".. fd .. " connected addr:" .. addr)socket.start(fd)clients[fd] = {}-- 消息处理 不断循环接收while true dolocal readdata = socket.read(fd) -- 利用协程实现阻塞模式-- 正常接收if readdata ~= nil then--local ret_data = "fd = " .. fd .. " ==> " .. string.byte(readdata, 2);--readdata = string.unpack('>', readdata);print("-----------> ret_data " .. readdata);local s = readdata:byte(1) * 256 + readdata:byte(2) --数据包的长度print(" s = " .. s)local s1 = readdata:byte(3) * 256 + readdata:byte(4) --base64的长度print(" s1 = " .. s1)-- local real_data = readdata:sub(5, 100);-- --local base64_data = string.unpack(">s2", real_data)-- print("-----------> ret_data " .. real_data);--readdata = skynet.tostring(readdata)--local data_tb = protobuf.decode("cs.Person", readdata)--print(data_tb)--print(data_tb.name)-- print(data_tb.id)-- for _,v in ipairs(data_tb.phone) do-- print("\t" .. v.number, v.type)-- end--print(fd .. " recv " .. readdata)--local ret_data = "fd = " .. fd .. " ==> " .. readdata;if logic_service ~= nil thenlocal msg_tb = {}msg_tb['fd'] = fdmsg_tb['content'] = readdata;local stringbuffer = protobuf.encode("cs.Person", { name = "linsh", id = 1,email = readdata,}) -- 发送protobuf 数据local ret_msg_protobuf = skynet.call(logic_service, "lua", "doMsg_protobuf_data", stringbuffer)--ret_data = "fd = " .. fd .. " ==> " .. ret_msglocal ret_tb = protobuf.decode("cs.Person", ret_msg_protobuf)print(ret_tb.name)print(ret_tb.id)for _,v in ipairs(ret_tb.phone) doprint("\t" .. v.number, v.type)endret_data = cjson.encode(ret_tb)print(" ret msg from logic_service fd = " .. fd .. " ==> " .. ret_data)endif fd % 2 == 1 thenskynet.sleep(1000)endfor k, v in pairs(clients) dosocket.write(k, ret_data)end-- 断开连接else print(fd .. "close")socket.close(fd)clients[fd] = nilendend
endlocal CMD = {}function CMD.broad_msg_to_user(source, msg_content)print("=========> " .. msg_content)for k, v in pairs(clients) dosocket.write(k, msg_content)end
endskynet.start(function()logic_service = skynet.newservice("logic")local listenfd = socket.listen("0.0.0.0", 8888) -- 监听所有ip,端口8888socket.start(listenfd, connect) --新客户端发起连接时,conncet方法将被调用protobuf.register_file "./examples/protos/Person.pb" skynet.error("register:Person.pb") skynet.dispatch("lua", function(session, source, cmd, ...)local f = assert(CMD[cmd])-- f(source, ...)f(source, ...)end)end)

发现读取时,还是不行, 读取出来的总是不完整,了解到skynet 是分批读取,必须这些分批读取拼接起来,组成一个完整的数据包,所以需要一个netpack的中间层 来拼接数据包

--[[
协议分析:长度信息法
skynet提供的C语言编写的netpack模块,它能高效解析2字节长度信息的协议
]]local skynet = require "skynet"
local socketdriver = require "skynet.socketdriver"
local netpack = require "skynet.netpack"--不能同时包含socketdriver和socket,因为socket里已经register_protocol了socket类型
-- local socket = require "skynet.socket"local queue -- message queue
local clients = {}--解码底层传来的SOCKET类型消息
function socket_unpack(msg, size)return netpack.filter(queue, msg, size)
end--处理底层传来的SOCKET类型消息
function socket_dispatch(_, _, q, type, ...)skynet.error("socket_dispatch type:" .. (type or "nil"))queue = qif type == "open" thenprocess_connect(...)    elseif type == "data" thenprocess_msg(...)    elseif type == "more" thenprocess_more(...)elseif type == "close" thenprocess_close(...)elseif type == "error" thenprocess_error(...)elseif type == "warning" thenprocess_warning(...)end    
end--有新连接
function process_connect(fd, addr)skynet.error("new conn fd:" .. fd .. " addr:" .. addr)socketdriver.start(fd)    clients[fd] = {}clients[fd].last_time = skynet.time()
end--关闭连接
function process_close(fd)skynet.error("close fd:" .. fd)if clients[fd] thenclients[fd] = nilend
end--发送错误
function process_error(fd, error)skynet.error("error fd:" .. fd .. " error:" .. error)if clients[fd] thenclients[fd] = nilend
end--发送警告
function process_warning(fd, size)skynet.error("warning fd:" .. fd .. " size:" .. size)
end--刚好收到一条完整消息
function process_msg(fd, msg, size)local str = netpack.tostring(msg, size)skynet.error("recv from fd:" .. fd .. " str:" .. str .. " size = " .. size)for k, v in pairs(clients) doskynet.error(" all fd = " .. k)if k ~= fd then--socket.write(k, msg_content)socketdriver.send(k, str)endendif fd % 2 == 0 then--skynet.sleep(1000)endsocketdriver.send(fd, str)clients[fd].last_time = skynet.time()end--收到多于1条消息时
function process_more()for fd, msg, size in netpack.pop, queue doprint("process_more = fd = " .. fd .. " size = " .. size)skynet.fork(process_msg, fd, msg, size) --开启协程,协程保证了process_msg执行的时序性end
endfunction check_clients(timeout)while true doskynet.sleep(timeout)local now_time = skynet.time()skynet.error(" check clients now time = " .. now_time)local gap_time = 0for k, v in pairs(clients) doskynet.error(" all fd = " .. k .. " last_time = " .. v.last_time)gap_time = now_time - v.last_timeif gap_time >= 30 thensocketdriver.close(k)--clients[k] = nil--socketdriver.shutdown(k)end--socketdriver.send(k, " hearbeat fd = " .. k)endend
endskynet.start(function ()--注册SOCKET类型信息skynet.register_protocol({name = "socket",id = skynet.PTYPE_SOCKET,unpack = socket_unpack,dispatch = socket_dispatch,})--注册Lua类型消息--开启监听local listenfd = socketdriver.listen("0.0.0.0", 8887)socketdriver.start(listenfd)skynet.fork(check_clients, 3000)
end)

这样就能完整收包 解包

客户端测试代码 python脚本 有粘包 也能正常解

import socket
import threading
import struct
import sys
#import Person_pb2global recvThreadExitdef recvMsg(socket_obj):while True:print(" recvThreadExit = " + str(recvThreadExit))if recvThreadExit == 1:breakret = str(obj.recv(1024))print("recv msg :")print(ret)if len(ret) == 0 :#obj.shutdown(socket.SHUT_RDWR)#obj.close()print("close ==>")break# person = Person_pb2.Person()
# person.name = "ayuliao"
# person.id = 6
# person.email = "xxx@xx.com"
#person.phone = "13229483229"
# person_protobuf_data = person.SerializeToString()
#print(f'person_protobuf_data: {person_protobuf_data}')recvThreadExit = 0obj = socket.socket()obj.connect(('192.168.1.200', 8887))recv_thread = threading.Thread(target=recvMsg, args=(obj,))
recv_thread.start()while True:#a = raw_input()#print(a)inp = raw_input("input string: ").strip(' ') if inp == "quit" :#obj.shutdown(socket.SHUT_RDWR)obj.close()recvThreadExit = 1print("sssssssssssssssssseeeeeeeeeee")break#print(inp)##obj.sendall(bytes(inp, encoding="utf-8"))print("len = " + str(len(inp)) )#fmt_str = ">1h1h1h" + str(len(inp)) + "s"fmt_str = ">1h" + str(len(inp)) + "s"print(fmt_str)send_data = struct.pack(fmt_str, len(inp), bytes(inp))print(send_data + " len = " + str(len(send_data)) )obj.send(send_data)#obj.send(send_data)#obj.send(send_data)#obj.send(send_data)#obj.send(send_data)if inp == "q":breakprint(" exti ==>")
sys.exit(3)

网络的一个坑点就是这个,切不可拿其他人的示例来用,要自己多测试。

http://www.lryc.cn/news/10780.html

相关文章:

  • select、poll、epoll
  • rollup的基本使用 基本配置与处理各种文件
  • ubuntu-debian系-redhat系
  • Altium Designer 18中原理图DRC编译和PCB DRC检查-AD DRC
  • zipfile — 访问 ZIP 压缩文件
  • 检查nmos管是否损坏
  • 第七章 - 聚合函数(count,avg,sum,max,min)和一些数学函数
  • Typescript的原始据类型和Any类型
  • [python入门㊼] - python类的高级函数
  • 【Windows】使用Fiddler 工具对手机进行接口监听
  • SpringCloudAlibab-nacos
  • 从一致性角度考虑推荐冷启动长尾推荐问题(二)
  • 电脑c盘满了怎么清理,c盘空间清理
  • vite的基本使用
  • JavaScript 字符串(String) 对象
  • 小知识点:Mac M1/M2 VMware Fusion 安装 Centos 7.9(ARM 64 版本)
  • Nginx 新增模块 http_image_filter_module 来实现动态生成缩略图
  • detach,主线程终止后子线程会结束吗
  • 2023年云计算的发展趋势如何?还值得学习就业吗?
  • ROS2 入门应用 请求和应答(C++)
  • 华为机试题:HJ73 计算日期到天数转换(python)
  • 将springboot项目生成可依赖的jar,并引入到项目中
  • 小红书搜索关键词布局指南,这4种词一定要把握好
  • 安全研发人员能力模型窥探
  • 【面试总结】Linux篇·操作及原理篇
  • C++中如何实现用异或运算找出数组中只出现一次的数字???
  • 红黑树的历史和由来。
  • 蓝库云|制造业数字化转型为何转不动?资料处理很重要
  • 【python学习笔记】 :Lambda 函数
  • Nginx的proxy buffer参数设置