Actor 模型
基础介绍
Actor 模型是一种并发编程模型,它将并发执行的任务封装为独立的实体(Actor),每个 Actor 都有自己的状态和行为。Actor 之间通过消息传递进行通信,而不是直接共享内存。
due 框架提供的 Actor 模型与众多开源项目提供的 Actor 模型还有所不同。 due 的 Actor 模型不仅提供了基于查找( ctx.Actor )的传统手动消息分发功能,还提供了通过 ctx.BindActor 方法绑定用户(UID)与 Actor,从而实现 Actor 消息的自动分发功能。
何时使用
避免数据竞争的高并发场景。例如:房间类型、帧同步、状态同步等。
示例代码
以下完整示例详见: actor-example
- 创建项目
$ mkdir actor-example- 安装依赖
$ cd actor-example
$ go mod init actor-example
$ go get github.com/dobyte/due/v2@v2.4.2
$ go get github.com/dobyte/due/locate/redis/v2@e5cd009
$ go get github.com/dobyte/due/registry/consul/v2@e5cd009- 启动配置
文件位置: actor-example/etc/etc.toml
# 进程号
pid = "./run/due.pid"
# 开发模式。支持模式:debug、test、release(设置优先级:配置文件 < 环境变量 < 运行参数 < mode.SetMode())
mode = "debug"
# 统一时区设置。项目中的时间获取请使用xtime.Now()
timezone = "Local"
# 容器关闭最大等待时间。支持单位:纳秒(ns)、微秒(us | µs)、毫秒(ms)、秒(s)、分(m)、小时(h)、天(d)。默认为0
shutdownMaxWaitTime = "0s"
[cluster.node]
# 实例ID,集群中唯一。不填写默认自动生成唯一的实例ID
id = ""
# 实例名称
name = "node"
# 内建RPC服务器监听地址。不填写默认随机监听
addr = ":0"
# 是否将内部通信地址暴露到公网。默认为false
expose = false
# 编解码器。可选:json | proto。默认为proto
codec = "json"
# RPC调用超时时间,支持单位:纳秒(ns)、微秒(us | µs)、毫秒(ms)、秒(s)、分(m)、小时(h)、天(d)。默认为3s
timeout = "3s"
# 节点权重,用于节点无状态路由消息的加权轮询策略,权重值必需大于0才生效。默认为1
weight = 1
# 实例元数据
[cluster.node.metadata]
# 键值对,且均为字符串类型。由于注册中心的元数据参数限制,建议将键值对的数量控制在20个以内,键的字符长度控制在127个字符内,值的字符长度控制在512个字符内。
key = "value"
[locate.redis]
# 客户端连接地址
addrs = ["127.0.0.1:6379"]
# 数据库号
db = 0
# 用户名
username = ""
# 密码
password = ""
# 私钥文件
keyFile = ""
# 证书文件
certFile = ""
# CA证书文件
caFile = ""
# 最大重试次数
maxRetries = 3
# key前缀
prefix = "due:locate"
[registry.consul]
# 客户端连接地址,默认为127.0.0.1:8500
addr = "127.0.0.1:8500"
# 是否启用健康检查,默认为true
healthCheck = true
# 健康检查时间间隔(秒),仅在启用健康检查后生效,默认为10
healthCheckInterval = 10
# 健康检查超时时间(秒),仅在启用健康检查后生效,默认为5
healthCheckTimeout = 5
# 是否启用心跳检查,默认为true
heartbeatCheck = true
# 心跳检查时间间隔(秒),仅在启用心跳检查后生效,默认为10
heartbeatCheckInterval = 10
# 健康检测失败后自动注销服务时间(秒),默认为30
deregisterCriticalServiceAfter = 30
[packet]
# 字节序,默认为big。可选:little | big
byteOrder = "big"
# 路由字节数,默认为2字节
routeBytes = 2
# 序列号字节数,默认为2字节
seqBytes = 2
# 消息字节数,默认为5000字节
bufferBytes = 5000
# 是否携带服务器心跳时间
heartbeatTime = false
[log]
# 日志输出级别,可选:debug | info | warn | error | fatal | panic
level = "info"
# 堆栈的最低输出级别,可选:debug | info | warn | error | fatal | panic
stackLevel = "error"
# 时间格式,标准库时间格式
timeFormat = "2006/01/02 15:04:05.000000"
# 输出栈的跳过深度
callSkip = 2
# 是否启用调用文件全路径
callFullPath = true
# 日志输出终端
terminals = ["console", "file"]
# 控制台同步器配置
[log.console]
# 日志输出格式,可选:text | json
format = "text"
# 文件同步器配置
[log.file]
# 输出文件路径
path = "./log/due.log"
# 日志输出格式,可选:text | json
format = "text"
# 文件最大留存时间,d:天、h:时、m:分、s:秒
maxAge = "7d"
# 文件最大尺寸限制,支持单位: B | K | KB | M | MB | G | GB | T | TB | P | PB | E | EB | Z | ZB,默认为100M
maxSize = "100M"
# 文件翻转方式,可选:none | year | month | week | day | hour,默认为none
rotate = "none"
# 文件翻转时是否对文件进行压缩
compress = false- 路由定义
文件位置: actor-example/app/route/route.go
package route
// 路由号
const (
Login = 1 // 登录
Create = 2 // 创建
Join = 3 // 加入
Play = 4 // 游戏
)- 公共基础响应
文件位置: actor-example/app/base/base.go
package base
// 基础响应
type Res struct {
Code int `json:"code"`
}- 授权检测中间件
文件位置: actor-example/app/middleware/auth.go
package middleware
import (
"actor-example/app/base"
"github.com/dobyte/due/v2/cluster/node"
"github.com/dobyte/due/v2/codes"
"github.com/dobyte/due/v2/log"
)
// 认证中间件
func Auth(middleware *node.Middleware, ctx node.Context) {
if ctx.UID() == 0 {
if err := ctx.Response(&base.Res{Code: codes.Unauthorized.Code()}); err != nil {
log.Errorf("response message failed, err: %v", err)
}
} else {
middleware.Next(ctx)
}
}- 核心逻辑
文件位置: actor-example/app/logic/core.go
package logic
import (
"actor-example/app/base"
"actor-example/app/middleware"
"actor-example/app/route"
"hash/fnv"
"github.com/dobyte/due/v2/cluster/node"
"github.com/dobyte/due/v2/codes"
"github.com/dobyte/due/v2/errors"
"github.com/dobyte/due/v2/log"
"github.com/dobyte/due/v2/utils/xconv"
)
const defaultPassword = "123456"
const roomActor = "room"
type core struct {
proxy *node.Proxy
rooms map[int64]*room
}
func NewCore(proxy *node.Proxy) *core {
return &core{proxy: proxy}
}
func (c *core) Init() {
c.proxy.Router().Group(func(group *node.RouterGroup) {
// 登录
group.AddRouteHandler(route.Login, c.login)
// 设置认证中间件
group.Middleware(middleware.Auth)
// 创建
group.AddRouteHandler(route.Create, c.create, node.AuthorizedRoute)
// 加入
group.AddRouteHandler(route.Join, c.join, node.AuthorizedRoute)
// 游戏
group.AddRouteHandler(route.Play, c.play, node.StatefulRoute)
})
}
// 登录
func (c *core) login(ctx node.Context) {
ctx.Task(func(ctx node.Context) {
req := &loginReq{}
res := &loginRes{}
ctx.Defer(func() {
if err := ctx.Response(res); err != nil {
log.Errorf("response message failed: %v", err)
}
})
if err := ctx.Parse(req); err != nil {
log.Errorf("parse request message failed: %v", err)
res.Code = codes.InternalError.Code()
return
}
// 执行登录操作
uid, err := c.doLogin(req)
if err != nil {
res.Code = codes.Convert(err).Code()
return
}
// 绑定网关
if err = ctx.BindGate(uid); err != nil {
log.Errorf("bind gate failed: %v", err)
res.Code = codes.InternalError.Code()
return
}
res.Code = codes.OK.Code()
})
}
// 执行登录操作
func (c *core) doLogin(req *loginReq) (int64, error) {
if req.Password != defaultPassword {
return 0, errors.NewError(codes.InvalidArgument)
}
fnv := fnv.New64a()
fnv.Write([]byte(req.Account))
uid := fnv.Sum64()
return int64(uid), nil
}
// 创建
func (c *core) create(ctx node.Context) {
req := &createReq{}
res := &createRes{}
ctx.Defer(func() {
if err := ctx.Response(res); err != nil {
log.Errorf("response message failed: %v", err)
}
})
if err := ctx.Parse(req); err != nil {
log.Errorf("parse request message failed: %v", err)
res.Code = codes.InternalError.Code()
return
}
// 执行创建操作
r, err := c.doCreate(ctx, req)
if err != nil {
res.Code = codes.Convert(err).Code()
return
}
res.Code = codes.OK.Code()
res.Data = &createResData{RoomID: r.id}
}
// 执行创建操作
func (c *core) doCreate(ctx node.Context, req *createReq) (*room, error) {
var (
err error
actor *node.Actor
)
r := newRoom(int64(len(c.rooms)+1), req.Name, ctx.UID())
if actor, err = c.proxy.Spawn(newRoomProcessor, node.WithActorID(xconv.String(r.id)), node.WithActorArgs(r), node.WithActorKind(roomActor)); err != nil {
log.Errorf("spawn actor faile: %v", err)
return nil, errors.NewError(err, codes.InternalError)
}
defer func() {
if err != nil {
actor.Destroy()
}
}()
if err = ctx.BindActor(actor.Kind(), actor.ID()); err != nil {
log.Errorf("bind actor failed: %v", err)
return nil, errors.NewError(err, codes.InternalError)
}
defer func() {
if err != nil {
ctx.UnbindActor(actor.Kind())
}
}()
if err = ctx.BindNode(); err != nil {
log.Errorf("bind node failed: %v", err)
return nil, errors.NewError(err, codes.InternalError)
}
c.rooms[r.id] = r
return r, nil
}
// 加入
// 多节点场景需要借助共享存储(如Redis)来存储房间信息,以便能够通过房间ID找到对应房间所在的节点,并将消息转发至对应节点服进行处理,此处细节不再赘述。
func (c *core) join(ctx node.Context) {
req := &joinReq{}
res := &joinRes{}
ctx.Defer(func() {
if err := ctx.Response(res); err != nil {
log.Errorf("response message failed: %v", err)
}
})
if err := ctx.Parse(req); err != nil {
log.Errorf("parse request message failed: %v", err)
res.Code = codes.InternalError.Code()
return
}
actor, ok := ctx.Actor(roomActor, xconv.String(req.RoomID))
if !ok {
res.Code = codes.NotFound.Code()
return
}
actor.Next(ctx)
}
// 游戏
func (c *core) play(ctx node.Context) {
res := &base.Res{}
ctx.Defer(func() {
if err := ctx.Response(res); err != nil {
log.Errorf("response message failed: %v", err)
}
})
if err := ctx.Next(); err != nil {
log.Errorf("request next failed: uid = %v route = %v err = %v", ctx.UID(), ctx.Route(), err)
res.Code = codes.IllegalRequest.Code()
}
}- 房间类
文件位置: actor-example/app/logic/room.go
package logic
import (
"github.com/dobyte/due/v2/cluster/node"
"github.com/dobyte/due/v2/codes"
"github.com/dobyte/due/v2/errors"
"github.com/dobyte/due/v2/log"
"github.com/dobyte/due/v2/utils/xconv"
)
type room struct {
id int64
name string
creator int64
members map[int64]struct{}
}
func newRoom(id int64, name string, creator int64) *room {
r := &room{}
r.id = id
r.name = name
r.creator = creator
r.members = make(map[int64]struct{})
r.members[r.creator] = struct{}{}
return r
}
// 加入
func (r *room) doJoin(ctx node.Context) (err error) {
uid := ctx.UID()
if _, ok := r.members[uid]; ok {
return errors.NewError(codes.IllegalRequest)
}
if err = ctx.BindNode(); err != nil {
return errors.NewError(codes.InternalError, err)
}
defer func() {
if err != nil {
ctx.UnbindNode()
}
}()
if err = ctx.BindActor(roomActor, xconv.String(r.id)); err != nil {
return errors.NewError(codes.InternalError, err)
}
r.members[uid] = struct{}{}
return nil
}
// 游戏
func (r *room) doPlay(ctx node.Context, req *playReq) (err error) {
uid := ctx.UID()
if _, ok := r.members[uid]; !ok {
return errors.NewError(codes.IllegalRequest)
}
if req.Action != "move" {
return errors.NewError(codes.InvalidArgument)
}
log.Infof("play, uid: %v, action: %v", uid, req.Action)
return nil
}- 房间 actor 消息处理器类
文件位置: actor-example/app/logic/room_processor.go
package logic
import (
"actor-example/app/route"
"github.com/dobyte/due/v2/cluster/node"
"github.com/dobyte/due/v2/codes"
"github.com/dobyte/due/v2/log"
)
type roomProcessor struct {
node.BaseProcessor
actor *node.Actor
room *room
}
func newRoomProcessor(actor *node.Actor, args ...any) node.Processor {
return &roomProcessor{
actor: actor,
room: args[0].(*room),
}
}
// Init 初始化处理器
func (p *roomProcessor) Init() {
// 加入
p.actor.AddRouteHandler(route.Join, p.join)
// 游戏
p.actor.AddRouteHandler(route.Play, p.play)
}
// 加入
func (p *roomProcessor) join(ctx node.Context) {
res := &joinRes{}
ctx.Defer(func() {
if err := ctx.Response(res); err != nil {
log.Errorf("response message failed: %v", err)
}
})
// 执行加入操作
if err := p.room.doJoin(ctx); err != nil {
res.Code = codes.Convert(err).Code()
return
}
res.Code = codes.OK.Code()
}
// 游戏
func (p *roomProcessor) play(ctx node.Context) {
req := &playReq{}
res := &playRes{}
ctx.Defer(func() {
if err := ctx.Response(res); err != nil {
log.Errorf("response message failed: %v", err)
}
})
if err := ctx.Parse(req); err != nil {
log.Errorf("parse request message failed: %v", err)
res.Code = codes.InternalError.Code()
return
}
// 执行游戏操作
if err := p.room.doPlay(ctx, req); err != nil {
res.Code = codes.Convert(err).Code()
return
}
res.Code = codes.OK.Code()
}- 运行示例
$ cd actor-example
$ go run main.go
____ __ ________
/ __ \/ / / / ____/
/ / / / / / / __/
/ /_/ / /_/ / /___
/_____/\____/_____/
┌──────────────────────────────────────────────────────┐
| [Website] https://github.com/dobyte/due |
| [Version] v2.4.2 |
└──────────────────────────────────────────────────────┘
┌────────────────────────Global────────────────────────┐
| PID: 28608 |
| Mode: debug |
| Time: 2025-10-30 10:27:37.7758377 +0800 CST |
└──────────────────────────────────────────────────────┘
┌─────────────────────────Node─────────────────────────┐
| ID: 00619b9e-b538-11f0-a216-f4f19e1f0070 |
| Name: node |
| Link: 192.168.2.202:57582 |
| Codec: json |
| Locator: redis |
| Registry: consul |
| Encryptor: - |
| Transporter: - |
└──────────────────────────────────────────────────────┘