# 2.9 Actor模型

# 2.9.1 基础介绍

Actor模型是一种并发编程模型,它将并发执行的任务封装为独立的实体(Actor),每个Actor都有自己的状态和行为。Actor之间通过消息传递进行通信,而不是直接共享内存。

due (opens new window)框架提供的Actor模型与众多开源项目提供的Actor模型还有所不同。due (opens new window)的Actor模型不仅提供了基于查找(ctx.Actor (opens new window))的传统手动消息分发功能,还提供了通过ctx.BindActor (opens new window)方法绑定用户(UID)与Actor,从而实现Actor消息的自动分发功能。

# 2.9.2 何时使用

避免数据竞争的高并发场景。例如:房间类型、帧同步、状态同步等。

# 2.9.3 示例代码

以下完整示例详见:actor-example (opens new window)

  1. 创建项目
$ mkdir actor-example
1
  1. 安装依赖
$ cd actor-example
$ go mod init actor-example
$ go get github.com/dobyte/due/v2@v2.4.2
$ go get github.com/dobyte/due/locate/redis/v2@e5cd009
$ go get github.com/dobyte/due/registry/consul/v2@e5cd009
1
2
3
4
5
  1. 启动配置

文件位置:actor-example/etc/etc.toml (opens new window)

# 进程号
pid = "./run/due.pid"
# 开发模式。支持模式:debug、test、release(设置优先级:配置文件 < 环境变量 < 运行参数 < mode.SetMode())
mode = "debug"
# 统一时区设置。项目中的时间获取请使用xtime.Now()
timezone = "Local"
# 容器关闭最大等待时间。支持单位:纳秒(ns)、微秒(us | µs)、毫秒(ms)、秒(s)、分(m)、小时(h)、天(d)。默认为0
shutdownMaxWaitTime = "0s"

[cluster.node]
    # 实例ID,集群中唯一。不填写默认自动生成唯一的实例ID
    id = ""
    # 实例名称
    name = "node"
    # 内建RPC服务器监听地址。不填写默认随机监听
    addr = ":0"
    # 是否将内部通信地址暴露到公网。默认为false
    expose = false
    # 编解码器。可选:json | proto。默认为proto
    codec = "json"
    # RPC调用超时时间,支持单位:纳秒(ns)、微秒(us | µs)、毫秒(ms)、秒(s)、分(m)、小时(h)、天(d)。默认为3s
    timeout = "3s"
    # 节点权重,用于节点无状态路由消息的加权轮询策略,权重值必需大于0才生效。默认为1
    weight = 1
    # 实例元数据
    [cluster.node.metadata]
        # 键值对,且均为字符串类型。由于注册中心的元数据参数限制,建议将键值对的数量控制在20个以内,键的字符长度控制在127个字符内,值得字符长度控制在512个字符内。
        key = "value"

[locate.redis]
    # 客户端连接地址
    addrs = ["127.0.0.1:6379"]
    # 数据库号
    db = 0
    # 用户名
    username = ""
    # 密码
    password = ""
    # 私钥文件
    keyFile = ""
    # 证书文件
    certFile = ""
    # CA证书文件
    caFile = ""
    # 最大重试次数
    maxRetries = 3
    # key前缀
    prefix = "due:locate"

[registry.consul]
    # 客户端连接地址,默认为127.0.0.1:8500
    addr = "127.0.0.1:8500"
    # 是否启用健康检查,默认为true
    healthCheck = true
    # 健康检查时间间隔(秒),仅在启用健康检查后生效,默认为10
    healthCheckInterval = 10
    # 健康检查超时时间(秒),仅在启用健康检查后生效,默认为5
    healthCheckTimeout = 5
    # 是否启用心跳检查,默认为true
    heartbeatCheck = true
    # 心跳检查时间间隔(秒),仅在启用心跳检查后生效,默认为10
    heartbeatCheckInterval = 10
    # 健康检测失败后自动注销服务时间(秒),默认为30
    deregisterCriticalServiceAfter = 30

[packet]
    # 字节序,默认为big。可选:little | big
    byteOrder = "big"
    # 路由字节数,默认为2字节
    routeBytes = 2
    # 序列号字节数,默认为2字节
    seqBytes = 2
    # 消息字节数,默认为5000字节
    bufferBytes = 5000
    # 是否携带服务器心跳时间
    heartbeatTime = false

[log]
    # 日志输出级别,可选:debug | info | warn | error | fatal | panic
    level = "info"
    # 堆栈的最低输出级别,可选:debug | info | warn | error | fatal | panic
    stackLevel = "error"
    # 时间格式,标准库时间格式
    timeFormat = "2006/01/02 15:04:05.000000"
    # 输出栈的跳过深度
    callSkip = 2
    # 是否启用调用文件全路径
    callFullPath = true
    # 日志输出终端
    terminals = ["console", "file"]
    # 控制台同步器配置
    [log.console]
        # 日志输出格式,可选:text | json
        format = "text"
    # 文件同步器配置
    [log.file]
        # 输出文件路径
        path = "./log/due.log"
        # 日志输出格式,可选:text | json
        format = "text"
        # 文件最大留存时间,d:天、h:时、m:分、s:秒
        maxAge = "7d"
        # 文件最大尺寸限制,支持单位: B | K | KB | M | MB | G | GB | T | TB | P | PB | E | EB | Z | ZB,默认为100M
        maxSize = "100M"
        # 文件翻转方式,可选:none | year | month | week | day | hour,默认为none
        rotate = "none"
        # 文件翻转时是否对文件进行压缩
        compress = false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
  1. 路由定义

文件位置:actor-example/app/route/route.go (opens new window)

package route

// 路由号
const (
	Login  = 1 // 登录
	Create = 2 // 创建
	Join   = 3 // 加入
	Play   = 4 // 游戏
)
1
2
3
4
5
6
7
8
9
  1. 公共基础响应

文件位置:actor-example/app/base/base.go (opens new window)

package base

// 基础响应
type Res struct {
	Code int `json:"code"`
}
1
2
3
4
5
6
  1. 授权检测中间件

文件位置:actor-example/app/middleware/auth.go (opens new window)

package middleware

import (
	"actor-example/app/base"

	"github.com/dobyte/due/v2/cluster/node"
	"github.com/dobyte/due/v2/codes"
	"github.com/dobyte/due/v2/log"
)

// 认证中间件
func Auth(middleware *node.Middleware, ctx node.Context) {
	if ctx.UID() == 0 {
		if err := ctx.Response(&base.Res{Code: codes.Unauthorized.Code()}); err != nil {
			log.Errorf("response message failed, err: %v", err)
		}
	} else {
		middleware.Next(ctx)
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
  1. 核心逻辑

文件位置:actor-example/app/logic/core.go (opens new window)

package logic

import (
	"actor-example/app/base"
	"actor-example/app/middleware"
	"actor-example/app/route"
	"hash/fnv"

	"github.com/dobyte/due/v2/cluster/node"
	"github.com/dobyte/due/v2/codes"
	"github.com/dobyte/due/v2/errors"
	"github.com/dobyte/due/v2/log"
	"github.com/dobyte/due/v2/utils/xconv"
)

const defaultPassword = "123456"

const roomActor = "room"

type core struct {
	proxy *node.Proxy
	rooms map[int64]*room
}

func NewCore(proxy *node.Proxy) *core {
	return &core{proxy: proxy}
}

func (c *core) Init() {
	c.proxy.Router().Group(func(group *node.RouterGroup) {
		// 登录
		group.AddRouteHandler(route.Login, c.login)
		// 设置认证中间件
		group.Middleware(middleware.Auth)
		// 创建
		group.AddRouteHandler(route.Create, c.create, node.AuthorizedRoute)
		// 加入
		group.AddRouteHandler(route.Join, c.join, node.AuthorizedRoute)
		// 游戏
		group.AddRouteHandler(route.Play, c.play, node.StatefulRoute)
	})
}

// 登录
func (c *core) login(ctx node.Context) {
	ctx.Task(func(ctx node.Context) {
		req := &loginReq{}
		res := &loginRes{}
		ctx.Defer(func() {
			if err := ctx.Response(res); err != nil {
				log.Errorf("response message failed: %v", err)
			}
		})

		if err := ctx.Parse(req); err != nil {
			log.Errorf("parse request message failed: %v", err)
			res.Code = codes.InternalError.Code()
			return
		}

		// 执行登录操作
		uid, err := c.doLogin(req)
		if err != nil {
			res.Code = codes.Convert(err).Code()
			return
		}

		// 绑定网关
		if err = ctx.BindGate(uid); err != nil {
			log.Errorf("bind gate failed: %v", err)
			res.Code = codes.InternalError.Code()
			return
		}

		res.Code = codes.OK.Code()
	})
}

// 执行登录操作
func (c *core) doLogin(req *loginReq) (int64, error) {
	if req.Password != defaultPassword {
		return 0, errors.NewError(codes.InvalidArgument)
	}

	fnv := fnv.New64a()
	fnv.Write([]byte(req.Account))
	uid := fnv.Sum64()

	return int64(uid), nil
}

// 创建
func (c *core) create(ctx node.Context) {
	req := &createReq{}
	res := &createRes{}
	ctx.Defer(func() {
		if err := ctx.Response(res); err != nil {
			log.Errorf("response message failed: %v", err)
		}
	})

	if err := ctx.Parse(req); err != nil {
		log.Errorf("parse request message failed: %v", err)
		res.Code = codes.InternalError.Code()
		return
	}

	// 执行创建操作
	r, err := c.doCreate(ctx, req)
	if err != nil {
		res.Code = codes.Convert(err).Code()
		return
	}

	res.Code = codes.OK.Code()
	res.Data = &createResData{RoomID: r.id}
}

// 执行创建操作
func (c *core) doCreate(ctx node.Context, req *createReq) (*room, error) {
	var (
		err   error
		actor *node.Actor
	)

	r := newRoom(int64(len(c.rooms)+1), req.Name, ctx.UID())

	if actor, err = c.proxy.Spawn(newRoomProcessor, node.WithActorID(xconv.String(r.id)), node.WithActorArgs(r), node.WithActorKind(roomActor)); err != nil {
		log.Errorf("spawn actor faile: %v", err)
		return nil, errors.NewError(err, codes.InternalError)
	}

	defer func() {
		if err != nil {
			actor.Destroy()
		}
	}()

	if err = ctx.BindActor(actor.Kind(), actor.ID()); err != nil {
		log.Errorf("bind actor failed: %v", err)
		return nil, errors.NewError(err, codes.InternalError)
	}

	defer func() {
		if err != nil {
			ctx.UnbindActor(actor.Kind())
		}
	}()

	if err = ctx.BindNode(); err != nil {
		log.Errorf("bind node failed: %v", err)
		return nil, errors.NewError(err, codes.InternalError)
	}

	c.rooms[r.id] = r

	return r, nil
}

// 加入
// 多节点场景需要借助共享存储(如Redis)来存储房间信息,以便能够通过房间ID找到对应房间所在的节点,并将消息转发至对应节点服进行处理,此处细节不再赘述。
func (c *core) join(ctx node.Context) {
	req := &joinReq{}
	res := &joinRes{}
	ctx.Defer(func() {
		if err := ctx.Response(res); err != nil {
			log.Errorf("response message failed: %v", err)
		}
	})

	if err := ctx.Parse(req); err != nil {
		log.Errorf("parse request message failed: %v", err)
		res.Code = codes.InternalError.Code()
		return
	}

	actor, ok := ctx.Actor(roomActor, xconv.String(req.RoomID))
	if !ok {
		res.Code = codes.NotFound.Code()
		return
	}

	actor.Next(ctx)
}

// 游戏
func (c *core) play(ctx node.Context) {
	res := &base.Res{}
	ctx.Defer(func() {
		if err := ctx.Response(res); err != nil {
			log.Errorf("response message failed: %v", err)
		}
	})

	if err := ctx.Next(); err != nil {
		log.Errorf("request next failed: uid = %v route = %v err = %v", ctx.UID(), ctx.Route(), err)
		res.Code = codes.IllegalRequest.Code()
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
  1. 房间类

文件位置:actor-example/app/logic/room.go (opens new window)

package logic

import (
	"github.com/dobyte/due/v2/cluster/node"
	"github.com/dobyte/due/v2/codes"
	"github.com/dobyte/due/v2/errors"
	"github.com/dobyte/due/v2/log"
	"github.com/dobyte/due/v2/utils/xconv"
)

type room struct {
	id      int64
	name    string
	creator int64
	members map[int64]struct{}
}

func newRoom(id int64, name string, creator int64) *room {
	r := &room{}
	r.id = id
	r.name = name
	r.creator = creator
	r.members = make(map[int64]struct{})
	r.members[r.creator] = struct{}{}

	return r
}

// 加入
func (r *room) doJoin(ctx node.Context) (err error) {
	uid := ctx.UID()

	if _, ok := r.members[uid]; ok {
		return errors.NewError(codes.IllegalRequest)
	}

	if err = ctx.BindNode(); err != nil {
		return errors.NewError(codes.InternalError, err)
	}

	defer func() {
		if err != nil {
			ctx.UnbindNode()
		}
	}()

	if err = ctx.BindActor(roomActor, xconv.String(r.id)); err != nil {
		return errors.NewError(codes.InternalError, err)
	}

	r.members[uid] = struct{}{}

	return nil
}

// 游戏
func (r *room) doPlay(ctx node.Context, req *playReq) (err error) {
	uid := ctx.UID()

	if _, ok := r.members[uid]; !ok {
		return errors.NewError(codes.IllegalRequest)
	}

	if req.Action != "move" {
		return errors.NewError(codes.InvalidArgument)
	}

	log.Infof("play, uid: %v, action: %v", uid, req.Action)

	return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
  1. 房间actor消息处理器类

文件位置:actor-example/app/logic/room_processor.go (opens new window)

package logic

import (
	"actor-example/app/route"

	"github.com/dobyte/due/v2/cluster/node"
	"github.com/dobyte/due/v2/codes"
	"github.com/dobyte/due/v2/log"
)

type roomProcessor struct {
	node.BaseProcessor
	actor *node.Actor
	room  *room
}

func newRoomProcessor(actor *node.Actor, args ...any) node.Processor {
	return &roomProcessor{
		actor: actor,
		room:  args[0].(*room),
	}
}

// Init 初始化处理器
func (p *roomProcessor) Init() {
	// 加入
	p.actor.AddRouteHandler(route.Join, p.join)
	// 游戏
	p.actor.AddRouteHandler(route.Play, p.play)
}

// 加入
func (p *roomProcessor) join(ctx node.Context) {
	res := &joinRes{}
	ctx.Defer(func() {
		if err := ctx.Response(res); err != nil {
			log.Errorf("response message failed: %v", err)
		}
	})

	// 执行加入操作
	if err := p.room.doJoin(ctx); err != nil {
		res.Code = codes.Convert(err).Code()
		return
	}

	res.Code = codes.OK.Code()
}

// 游戏
func (p *roomProcessor) play(ctx node.Context) {
	req := &playReq{}
	res := &playRes{}
	ctx.Defer(func() {
		if err := ctx.Response(res); err != nil {
			log.Errorf("response message failed: %v", err)
		}
	})

	if err := ctx.Parse(req); err != nil {
		log.Errorf("parse request message failed: %v", err)
		res.Code = codes.InternalError.Code()
		return
	}

	// 执行游戏操作
	if err := p.room.doPlay(ctx, req); err != nil {
		res.Code = codes.Convert(err).Code()
		return
	}

	res.Code = codes.OK.Code()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
  1. 运行示例
$ cd actor-example
$ go run main.go
                    ____  __  ________
                   / __ \/ / / / ____/
                  / / / / / / / __/
                 / /_/ / /_/ / /___
                /_____/\____/_____/
┌──────────────────────────────────────────────────────┐
| [Website] https://github.com/dobyte/due              |
| [Version] v2.4.2                                     |
└──────────────────────────────────────────────────────┘
┌────────────────────────Global────────────────────────┐
| PID: 28608                                           |
| Mode: debug                                          |
| Time: 2025-10-30 10:27:37.7758377 +0800 CST          |
└──────────────────────────────────────────────────────┘
┌─────────────────────────Node─────────────────────────┐
| ID: 00619b9e-b538-11f0-a216-f4f19e1f0070             |
| Name: node                                           |
| Link: 192.168.2.202:57582                            |
| Codec: json                                          |
| Locator: redis                                       |
| Registry: consul                                     |
| Encryptor: -                                         |
| Transporter: -                                       |
└──────────────────────────────────────────────────────┘
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26