说明
本文档按照实验楼–Go 并发服务器框架 Zinx 入门的文档同步学习记录(大部分内容相同)
https://www.lanqiao.cn/courses/1639
主要有以下原因:
1、模仿大神写教程的风格
2、验证每一个步骤,而不是简简单单的复制教程中的代码。简单重现
实验介绍
我们现在就需要给 Zinx 添加消息队列和多任务 Worker 机制了。

知识点
准备工作
我们可以通过 worker 的数量来限定处理业务的固定 goroutine 数量,而不是无限制的开辟 Goroutine,虽然我们知道 go 的调度算法已经做的很极致了,但是大数量的 Goroutine 依然会带来一些不必要的环境切换成本,这些本应该是服务器应该节省掉的成本。我们可以用消息队列来缓冲 worker 工作的数据。
初步我们的设计结构如下图:

创建消息队列
首先,处理消息队列的部分,我们应该集成到MsgHandler模块下,因为属于我们消息模块范畴内的。
zinx/znet/msghandler.go
1 2 3 4 5 6 7 8 9 10 11 12 13
| type MsgHandle struct { Apis map[uint32]ziface.IRouter WorkerPoolSize uint32 TaskQueue []chan ziface.IRequest } func NewMsgHandle() *MsgHandle { return &MsgHandle{ Apis: make(map[uint32]ziface.IRouter), WorkerPoolSize:utils.GlobalObject.WorkerPoolSize, TaskQueue:make([]chan ziface.IRequest, utils.GlobalObject.WorkerPoolSize), } }
|
这里添加两个成员:
WokerPoolSize
:作为工作池的数量,因为 TaskQueue 中的每个队列应该是和一个 Worker 对应的,所以我们在创建 TaskQueue 中队列数量要和 Worker 的数量一致。
TaskQueue
真是一个 Request 请求信息的 channel 集合。用来缓冲提供 worker 调用的 Request 请求信息,worker 会从对应的队列中获取客户端的请求数据并且处理掉。
当然WorkerPoolSize最好也可以从GlobalObject获取,并且zinx.json配置文件可以手动配置。
zinx/utils/globalobj.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
|
type GlobalObj struct {
TcpServer ziface.IServer Host string TcpPort int Name string
Version string MaxPacketSize uint32 MaxConn int WorkerPoolSize uint32 MaxWorkerTaskLen uint32
ConfFilePath string }
func init() { GlobalObject = &GlobalObj{ Name: "ZinxServerApp", Version: "V0.4", TcpPort: 7777, Host: "0.0.0.0", MaxConn: 12000, MaxPacketSize: 4096, ConfFilePath: "conf/zinx.json", WorkerPoolSize: 10, MaxWorkerTaskLen: 1024, } GlobalObject.Reload() }
|
创建及启动 Worker 工作池
现在添加 Worker 工作池,先定义一些启动工作池的接口。
zinx/ziface/imsghandler.go
1 2 3 4 5 6 7 8 9
|
type IMsgHandle interface{ DoMsgHandler(request IRequest) AddRouter(msgId uint32, router IRouter) StartWorkerPool() SendMsgToTaskQueue(request IRequest) }
|
zinx/znet/msghandler.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) { fmt.Println("Worker ID = ", workerID, " is started.") for { select { case request := <-taskQueue: mh.DoMsgHandler(request) } } }
func (mh *MsgHandle) StartWorkerPool() { for i:= 0; i < int(mh.WorkerPoolSize); i++ { mh.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen) go mh.StartOneWorker(i, mh.TaskQueue[i]) } }
|
StartWorkerPool()方法是启动 Worker 工作池,这里根据用户配置好的WorkerPoolSize的数量来启动,然后分别给每个 Worker 分配一个TaskQueue,然后用一个 goroutine 来承载一个 Worker 的工作业务。
StartOneWorker()方法就是一个 Worker 的工作业务,每个 worker 是不会退出的(目前没有设定 worker 的停止工作机制),会永久的从对应的 TaskQueue 中等待消息,并处理。
发送消息给消息队列
现在,worker 工作池已经准备就绪了,那么就需要有一个给到 worker 工作池消息的入口,我们再定义一个方法
zinx/znet/msghandler.go
1 2 3 4 5 6 7 8 9 10
| func (mh *MsgHandle)SendMsgToTaskQueue(request ziface.IRequest) { workerID := request.GetConnection().GetConnID() % mh.WorkerPoolSize fmt.Println("Add ConnID=", request.GetConnection().GetConnID()," request msgID=", request.GetMsgID(), "to workerID=", workerID) mh.TaskQueue[workerID] <- request }
|
SendMsgToTaskQueue()作为工作池的数据入口,这里面采用的是轮询的分配机制,因为不同链接信息都会调用这个入口,那么到底应该由哪个 worker 处理该链接的请求处理,整理用的是一个简单的求模运算。用余数和 workerID 的匹配来进行分配。
最终将 request 请求数据发送给对应 worker 的 TaskQueue,那么对应的 worker 的 Goroutine 就会处理该链接请求了。
Zinx-V0.8 代码实现
好了,现在需要将消息队列和多任务 worker 机制集成到我们 Zinx 的中了。我们在 Server 的Start()方法中,在服务端 Accept 之前,启动 Worker 工作池。
zinx/znet/server.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| func (s *Server) Start() { go func() { s.msgHandler.StartWorkerPool() addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port)) if err != nil { fmt.Println("resolve tcp addr err: ", err) return } } }() }
|
其次,当我们已经得到客户端的连接请求过来数据的时候,我们应该将数据发送给 Worker 工作池进行处理。
所以应该在 Connection 的StartReader()方法中修改:
zinx/znet/connection.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
func (c *Connection) StartReader() { fmt.Println("Reader Goroutine is running") defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!") defer c.Stop() for { req := Request{ conn:c, msg:msg, } if utils.GlobalObject.WorkerPoolSize > 0 { c.MsgHandler.SendMsgToTaskQueue(&req) } else { go c.MsgHandler.DoMsgHandler(&req) } } }
|
这里并没有强制使用多任务 Worker 机制,而是判断用户配置WorkerPoolSize的个数,如果大于 0,那么我就启动多任务机制处理链接请求消息,如果=0 或者<0 那么,我们依然只是之前的开启一个临时的 Goroutine 处理客户端请求消息。
测试
测试代码和 V0.6、V0.7 的代码一样。因为 Zinx 框架对外接口没有发生改变。
实验总结
我们今天实现了 Zinx 框架的工作池,同时解答了上一节中为什么我们的测试代码无需修改。我们的 Zinx 对外提供服务的接口没有改变,所以测试文件不需要修改。