Actor 模型

基础介绍

Actor 模型是一种并发编程模型,它将并发执行的任务封装为独立的实体(Actor),每个 Actor 都有自己的状态和行为。Actor 之间通过消息传递进行通信,而不是直接共享内存。

due 框架提供的 Actor 模型与众多开源项目提供的 Actor 模型还有所不同。 due 的 Actor 模型不仅提供了基于查找( ctx.Actor )的传统手动消息分发功能,还提供了通过 ctx.BindActor 方法绑定用户(UID)与 Actor,从而实现 Actor 消息的自动分发功能。

何时使用

避免数据竞争的高并发场景。例如:房间类型、帧同步、状态同步等。

示例代码

以下完整示例详见: actor-example

  1. 创建项目
$ mkdir actor-example
  1. 安装依赖
$ cd actor-example
$ go mod init actor-example
$ go get github.com/dobyte/due/v2@v2.4.2
$ go get github.com/dobyte/due/locate/redis/v2@e5cd009
$ go get github.com/dobyte/due/registry/consul/v2@e5cd009
  1. 启动配置

文件位置: actor-example/etc/etc.toml

# 进程号
pid = "./run/due.pid"
# 开发模式。支持模式:debug、test、release(设置优先级:配置文件 < 环境变量 < 运行参数 < mode.SetMode())
mode = "debug"
# 统一时区设置。项目中的时间获取请使用xtime.Now()
timezone = "Local"
# 容器关闭最大等待时间。支持单位:纳秒(ns)、微秒(us | µs)、毫秒(ms)、秒(s)、分(m)、小时(h)、天(d)。默认为0
shutdownMaxWaitTime = "0s"
 
[cluster.node]
    # 实例ID,集群中唯一。不填写默认自动生成唯一的实例ID
    id = ""
    # 实例名称
    name = "node"
    # 内建RPC服务器监听地址。不填写默认随机监听
    addr = ":0"
    # 是否将内部通信地址暴露到公网。默认为false
    expose = false
    # 编解码器。可选:json | proto。默认为proto
    codec = "json"
    # RPC调用超时时间,支持单位:纳秒(ns)、微秒(us | µs)、毫秒(ms)、秒(s)、分(m)、小时(h)、天(d)。默认为3s
    timeout = "3s"
    # 节点权重,用于节点无状态路由消息的加权轮询策略,权重值必需大于0才生效。默认为1
    weight = 1
    # 实例元数据
    [cluster.node.metadata]
        # 键值对,且均为字符串类型。由于注册中心的元数据参数限制,建议将键值对的数量控制在20个以内,键的字符长度控制在127个字符内,值的字符长度控制在512个字符内。
        key = "value"
 
[locate.redis]
    # 客户端连接地址
    addrs = ["127.0.0.1:6379"]
    # 数据库号
    db = 0
    # 用户名
    username = ""
    # 密码
    password = ""
    # 私钥文件
    keyFile = ""
    # 证书文件
    certFile = ""
    # CA证书文件
    caFile = ""
    # 最大重试次数
    maxRetries = 3
    # key前缀
    prefix = "due:locate"
 
[registry.consul]
    # 客户端连接地址,默认为127.0.0.1:8500
    addr = "127.0.0.1:8500"
    # 是否启用健康检查,默认为true
    healthCheck = true
    # 健康检查时间间隔(秒),仅在启用健康检查后生效,默认为10
    healthCheckInterval = 10
    # 健康检查超时时间(秒),仅在启用健康检查后生效,默认为5
    healthCheckTimeout = 5
    # 是否启用心跳检查,默认为true
    heartbeatCheck = true
    # 心跳检查时间间隔(秒),仅在启用心跳检查后生效,默认为10
    heartbeatCheckInterval = 10
    # 健康检测失败后自动注销服务时间(秒),默认为30
    deregisterCriticalServiceAfter = 30
 
[packet]
    # 字节序,默认为big。可选:little | big
    byteOrder = "big"
    # 路由字节数,默认为2字节
    routeBytes = 2
    # 序列号字节数,默认为2字节
    seqBytes = 2
    # 消息字节数,默认为5000字节
    bufferBytes = 5000
    # 是否携带服务器心跳时间
    heartbeatTime = false
 
[log]
    # 日志输出级别,可选:debug | info | warn | error | fatal | panic
    level = "info"
    # 堆栈的最低输出级别,可选:debug | info | warn | error | fatal | panic
    stackLevel = "error"
    # 时间格式,标准库时间格式
    timeFormat = "2006/01/02 15:04:05.000000"
    # 输出栈的跳过深度
    callSkip = 2
    # 是否启用调用文件全路径
    callFullPath = true
    # 日志输出终端
    terminals = ["console", "file"]
    # 控制台同步器配置
    [log.console]
        # 日志输出格式,可选:text | json
        format = "text"
    # 文件同步器配置
    [log.file]
        # 输出文件路径
        path = "./log/due.log"
        # 日志输出格式,可选:text | json
        format = "text"
        # 文件最大留存时间,d:天、h:时、m:分、s:秒
        maxAge = "7d"
        # 文件最大尺寸限制,支持单位: B | K | KB | M | MB | G | GB | T | TB | P | PB | E | EB | Z | ZB,默认为100M
        maxSize = "100M"
        # 文件翻转方式,可选:none | year | month | week | day | hour,默认为none
        rotate = "none"
        # 文件翻转时是否对文件进行压缩
        compress = false
  1. 路由定义

文件位置: actor-example/app/route/route.go

package route
 
// 路由号
const (
	Login  = 1 // 登录
	Create = 2 // 创建
	Join   = 3 // 加入
	Play   = 4 // 游戏
)
  1. 公共基础响应

文件位置: actor-example/app/base/base.go

package base
 
// 基础响应
type Res struct {
	Code int `json:"code"`
}
  1. 授权检测中间件

文件位置: actor-example/app/middleware/auth.go

package middleware
 
import (
	"actor-example/app/base"
 
	"github.com/dobyte/due/v2/cluster/node"
	"github.com/dobyte/due/v2/codes"
	"github.com/dobyte/due/v2/log"
)
 
// 认证中间件
func Auth(middleware *node.Middleware, ctx node.Context) {
	if ctx.UID() == 0 {
		if err := ctx.Response(&base.Res{Code: codes.Unauthorized.Code()}); err != nil {
			log.Errorf("response message failed, err: %v", err)
		}
	} else {
		middleware.Next(ctx)
	}
}
  1. 核心逻辑

文件位置: actor-example/app/logic/core.go

package logic
 
import (
	"actor-example/app/base"
	"actor-example/app/middleware"
	"actor-example/app/route"
	"hash/fnv"
 
	"github.com/dobyte/due/v2/cluster/node"
	"github.com/dobyte/due/v2/codes"
	"github.com/dobyte/due/v2/errors"
	"github.com/dobyte/due/v2/log"
	"github.com/dobyte/due/v2/utils/xconv"
)
 
const defaultPassword = "123456"
 
const roomActor = "room"
 
type core struct {
	proxy *node.Proxy
	rooms map[int64]*room
}
 
func NewCore(proxy *node.Proxy) *core {
	return &core{proxy: proxy}
}
 
func (c *core) Init() {
	c.proxy.Router().Group(func(group *node.RouterGroup) {
		// 登录
		group.AddRouteHandler(route.Login, c.login)
		// 设置认证中间件
		group.Middleware(middleware.Auth)
		// 创建
		group.AddRouteHandler(route.Create, c.create, node.AuthorizedRoute)
		// 加入
		group.AddRouteHandler(route.Join, c.join, node.AuthorizedRoute)
		// 游戏
		group.AddRouteHandler(route.Play, c.play, node.StatefulRoute)
	})
}
 
// 登录
func (c *core) login(ctx node.Context) {
	ctx.Task(func(ctx node.Context) {
		req := &loginReq{}
		res := &loginRes{}
		ctx.Defer(func() {
			if err := ctx.Response(res); err != nil {
				log.Errorf("response message failed: %v", err)
			}
		})
 
		if err := ctx.Parse(req); err != nil {
			log.Errorf("parse request message failed: %v", err)
			res.Code = codes.InternalError.Code()
			return
		}
 
		// 执行登录操作
		uid, err := c.doLogin(req)
		if err != nil {
			res.Code = codes.Convert(err).Code()
			return
		}
 
		// 绑定网关
		if err = ctx.BindGate(uid); err != nil {
			log.Errorf("bind gate failed: %v", err)
			res.Code = codes.InternalError.Code()
			return
		}
 
		res.Code = codes.OK.Code()
	})
}
 
// 执行登录操作
func (c *core) doLogin(req *loginReq) (int64, error) {
	if req.Password != defaultPassword {
		return 0, errors.NewError(codes.InvalidArgument)
	}
 
	fnv := fnv.New64a()
	fnv.Write([]byte(req.Account))
	uid := fnv.Sum64()
 
	return int64(uid), nil
}
 
// 创建
func (c *core) create(ctx node.Context) {
	req := &createReq{}
	res := &createRes{}
	ctx.Defer(func() {
		if err := ctx.Response(res); err != nil {
			log.Errorf("response message failed: %v", err)
		}
	})
 
	if err := ctx.Parse(req); err != nil {
		log.Errorf("parse request message failed: %v", err)
		res.Code = codes.InternalError.Code()
		return
	}
 
	// 执行创建操作
	r, err := c.doCreate(ctx, req)
	if err != nil {
		res.Code = codes.Convert(err).Code()
		return
	}
 
	res.Code = codes.OK.Code()
	res.Data = &createResData{RoomID: r.id}
}
 
// 执行创建操作
func (c *core) doCreate(ctx node.Context, req *createReq) (*room, error) {
	var (
		err   error
		actor *node.Actor
	)
 
	r := newRoom(int64(len(c.rooms)+1), req.Name, ctx.UID())
 
	if actor, err = c.proxy.Spawn(newRoomProcessor, node.WithActorID(xconv.String(r.id)), node.WithActorArgs(r), node.WithActorKind(roomActor)); err != nil {
		log.Errorf("spawn actor faile: %v", err)
		return nil, errors.NewError(err, codes.InternalError)
	}
 
	defer func() {
		if err != nil {
			actor.Destroy()
		}
	}()
 
	if err = ctx.BindActor(actor.Kind(), actor.ID()); err != nil {
		log.Errorf("bind actor failed: %v", err)
		return nil, errors.NewError(err, codes.InternalError)
	}
 
	defer func() {
		if err != nil {
			ctx.UnbindActor(actor.Kind())
		}
	}()
 
	if err = ctx.BindNode(); err != nil {
		log.Errorf("bind node failed: %v", err)
		return nil, errors.NewError(err, codes.InternalError)
	}
 
	c.rooms[r.id] = r
 
	return r, nil
}
 
// 加入
// 多节点场景需要借助共享存储(如Redis)来存储房间信息,以便能够通过房间ID找到对应房间所在的节点,并将消息转发至对应节点服进行处理,此处细节不再赘述。
func (c *core) join(ctx node.Context) {
	req := &joinReq{}
	res := &joinRes{}
	ctx.Defer(func() {
		if err := ctx.Response(res); err != nil {
			log.Errorf("response message failed: %v", err)
		}
	})
 
	if err := ctx.Parse(req); err != nil {
		log.Errorf("parse request message failed: %v", err)
		res.Code = codes.InternalError.Code()
		return
	}
 
	actor, ok := ctx.Actor(roomActor, xconv.String(req.RoomID))
	if !ok {
		res.Code = codes.NotFound.Code()
		return
	}
 
	actor.Next(ctx)
}
 
// 游戏
func (c *core) play(ctx node.Context) {
	res := &base.Res{}
	ctx.Defer(func() {
		if err := ctx.Response(res); err != nil {
			log.Errorf("response message failed: %v", err)
		}
	})
 
	if err := ctx.Next(); err != nil {
		log.Errorf("request next failed: uid = %v route = %v err = %v", ctx.UID(), ctx.Route(), err)
		res.Code = codes.IllegalRequest.Code()
	}
}
  1. 房间类

文件位置: actor-example/app/logic/room.go

package logic
 
import (
	"github.com/dobyte/due/v2/cluster/node"
	"github.com/dobyte/due/v2/codes"
	"github.com/dobyte/due/v2/errors"
	"github.com/dobyte/due/v2/log"
	"github.com/dobyte/due/v2/utils/xconv"
)
 
type room struct {
	id      int64
	name    string
	creator int64
	members map[int64]struct{}
}
 
func newRoom(id int64, name string, creator int64) *room {
	r := &room{}
	r.id = id
	r.name = name
	r.creator = creator
	r.members = make(map[int64]struct{})
	r.members[r.creator] = struct{}{}
 
	return r
}
 
// 加入
func (r *room) doJoin(ctx node.Context) (err error) {
	uid := ctx.UID()
 
	if _, ok := r.members[uid]; ok {
		return errors.NewError(codes.IllegalRequest)
	}
 
	if err = ctx.BindNode(); err != nil {
		return errors.NewError(codes.InternalError, err)
	}
 
	defer func() {
		if err != nil {
			ctx.UnbindNode()
		}
	}()
 
	if err = ctx.BindActor(roomActor, xconv.String(r.id)); err != nil {
		return errors.NewError(codes.InternalError, err)
	}
 
	r.members[uid] = struct{}{}
 
	return nil
}
 
// 游戏
func (r *room) doPlay(ctx node.Context, req *playReq) (err error) {
	uid := ctx.UID()
 
	if _, ok := r.members[uid]; !ok {
		return errors.NewError(codes.IllegalRequest)
	}
 
	if req.Action != "move" {
		return errors.NewError(codes.InvalidArgument)
	}
 
	log.Infof("play, uid: %v, action: %v", uid, req.Action)
 
	return nil
}
  1. 房间 actor 消息处理器类

文件位置: actor-example/app/logic/room_processor.go

package logic
 
import (
	"actor-example/app/route"
 
	"github.com/dobyte/due/v2/cluster/node"
	"github.com/dobyte/due/v2/codes"
	"github.com/dobyte/due/v2/log"
)
 
type roomProcessor struct {
	node.BaseProcessor
	actor *node.Actor
	room  *room
}
 
func newRoomProcessor(actor *node.Actor, args ...any) node.Processor {
	return &roomProcessor{
		actor: actor,
		room:  args[0].(*room),
	}
}
 
// Init 初始化处理器
func (p *roomProcessor) Init() {
	// 加入
	p.actor.AddRouteHandler(route.Join, p.join)
	// 游戏
	p.actor.AddRouteHandler(route.Play, p.play)
}
 
// 加入
func (p *roomProcessor) join(ctx node.Context) {
	res := &joinRes{}
	ctx.Defer(func() {
		if err := ctx.Response(res); err != nil {
			log.Errorf("response message failed: %v", err)
		}
	})
 
	// 执行加入操作
	if err := p.room.doJoin(ctx); err != nil {
		res.Code = codes.Convert(err).Code()
		return
	}
 
	res.Code = codes.OK.Code()
}
 
// 游戏
func (p *roomProcessor) play(ctx node.Context) {
	req := &playReq{}
	res := &playRes{}
	ctx.Defer(func() {
		if err := ctx.Response(res); err != nil {
			log.Errorf("response message failed: %v", err)
		}
	})
 
	if err := ctx.Parse(req); err != nil {
		log.Errorf("parse request message failed: %v", err)
		res.Code = codes.InternalError.Code()
		return
	}
 
	// 执行游戏操作
	if err := p.room.doPlay(ctx, req); err != nil {
		res.Code = codes.Convert(err).Code()
		return
	}
 
	res.Code = codes.OK.Code()
}
  1. 运行示例
$ cd actor-example
$ go run main.go
                    ____  __  ________
                   / __ \/ / / / ____/
                  / / / / / / / __/
                 / /_/ / /_/ / /___
                /_____/\____/_____/
┌──────────────────────────────────────────────────────┐
| [Website] https://github.com/dobyte/due              |
| [Version] v2.4.2                                     |
└──────────────────────────────────────────────────────┘
┌────────────────────────Global────────────────────────┐
| PID: 28608                                           |
| Mode: debug                                          |
| Time: 2025-10-30 10:27:37.7758377 +0800 CST          |
└──────────────────────────────────────────────────────┘
┌─────────────────────────Node─────────────────────────┐
| ID: 00619b9e-b538-11f0-a216-f4f19e1f0070             |
| Name: node                                           |
| Link: 192.168.2.202:57582                            |
| Codec: json                                          |
| Locator: redis                                       |
| Registry: consul                                     |
| Encryptor: -                                         |
| Transporter: -                                       |
└──────────────────────────────────────────────────────┘