# 2.8 多协程模型
# 2.8.1 基础介绍
在due (opens new window)框架中,多协程模型就是将原本在单线程中的任务通过ctx.Task投递到协程池中进行处理,任务处理顺序也由原本的有序变为无序,并发处理需要使用同步原语。
# 2.8.2 何时使用多协程模型
不考虑执行顺序、阻塞IO密集型等场景。例如:数据库操作、远程服务调用等。
# 2.8.3 示例代码
以下完整示例详见:multiple-coroutines-example (opens new window)
- 创建项目
$ mkdir multiple-coroutines-example
1
- 安装依赖
$ cd multiple-coroutines-example
$ go mod init multiple-coroutines-example
$ go get github.com/dobyte/due/v2@v2.4.2
$ go get github.com/dobyte/due/locate/redis/v2@e5cd009
$ go get github.com/dobyte/due/registry/consul/v2@e5cd009
1
2
3
4
5
2
3
4
5
- 启动配置
文件位置:multiple-coroutines-example/etc/etc.toml (opens new window)
# 进程号
pid = "./run/due.pid"
# 开发模式。支持模式:debug、test、release(设置优先级:配置文件 < 环境变量 < 运行参数 < mode.SetMode())
mode = "debug"
# 统一时区设置。项目中的时间获取请使用xtime.Now()
timezone = "Local"
# 容器关闭最大等待时间。支持单位:纳秒(ns)、微秒(us | µs)、毫秒(ms)、秒(s)、分(m)、小时(h)、天(d)。默认为0
shutdownMaxWaitTime = "0s"
[cluster.node]
# 实例ID,集群中唯一。不填写默认自动生成唯一的实例ID
id = ""
# 实例名称
name = "node"
# 内建RPC服务器监听地址。不填写默认随机监听
addr = ":0"
# 是否将内部通信地址暴露到公网。默认为false
expose = false
# 编解码器。可选:json | proto。默认为proto
codec = "json"
# RPC调用超时时间,支持单位:纳秒(ns)、微秒(us | µs)、毫秒(ms)、秒(s)、分(m)、小时(h)、天(d)。默认为3s
timeout = "3s"
# 节点权重,用于节点无状态路由消息的加权轮询策略,权重值必需大于0才生效。默认为1
weight = 1
# 实例元数据
[cluster.node.metadata]
# 键值对,且均为字符串类型。由于注册中心的元数据参数限制,建议将键值对的数量控制在20个以内,键的字符长度控制在127个字符内,值得字符长度控制在512个字符内。
key = "value"
[locate.redis]
# 客户端连接地址
addrs = ["127.0.0.1:6379"]
# 数据库号
db = 0
# 用户名
username = ""
# 密码
password = ""
# 私钥文件
keyFile = ""
# 证书文件
certFile = ""
# CA证书文件
caFile = ""
# 最大重试次数
maxRetries = 3
# key前缀
prefix = "due:locate"
[registry.consul]
# 客户端连接地址,默认为127.0.0.1:8500
addr = "127.0.0.1:8500"
# 是否启用健康检查,默认为true
healthCheck = true
# 健康检查时间间隔(秒),仅在启用健康检查后生效,默认为10
healthCheckInterval = 10
# 健康检查超时时间(秒),仅在启用健康检查后生效,默认为5
healthCheckTimeout = 5
# 是否启用心跳检查,默认为true
heartbeatCheck = true
# 心跳检查时间间隔(秒),仅在启用心跳检查后生效,默认为10
heartbeatCheckInterval = 10
# 健康检测失败后自动注销服务时间(秒),默认为30
deregisterCriticalServiceAfter = 30
[packet]
# 字节序,默认为big。可选:little | big
byteOrder = "big"
# 路由字节数,默认为2字节
routeBytes = 2
# 序列号字节数,默认为2字节
seqBytes = 2
# 消息字节数,默认为5000字节
bufferBytes = 5000
# 是否携带服务器心跳时间
heartbeatTime = false
[log]
# 日志输出级别,可选:debug | info | warn | error | fatal | panic
level = "info"
# 堆栈的最低输出级别,可选:debug | info | warn | error | fatal | panic
stackLevel = "error"
# 时间格式,标准库时间格式
timeFormat = "2006/01/02 15:04:05.000000"
# 输出栈的跳过深度
callSkip = 2
# 是否启用调用文件全路径
callFullPath = true
# 日志输出终端
terminals = ["console", "file"]
# 控制台同步器配置
[log.console]
# 日志输出格式,可选:text | json
format = "text"
# 文件同步器配置
[log.file]
# 输出文件路径
path = "./log/due.log"
# 日志输出格式,可选:text | json
format = "text"
# 文件最大留存时间,d:天、h:时、m:分、s:秒
maxAge = "7d"
# 文件最大尺寸限制,支持单位: B | K | KB | M | MB | G | GB | T | TB | P | PB | E | EB | Z | ZB,默认为100M
maxSize = "100M"
# 文件翻转方式,可选:none | year | month | week | day | hour,默认为none
rotate = "none"
# 文件翻转时是否对文件进行压缩
compress = false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
- 编写示例
文件位置:multiple-coroutines-example/main.go (opens new window)
package main
import (
"fmt"
"github.com/dobyte/due/locate/redis/v2"
"github.com/dobyte/due/registry/consul/v2"
"github.com/dobyte/due/v2"
"github.com/dobyte/due/v2/cluster/node"
"github.com/dobyte/due/v2/codes"
"github.com/dobyte/due/v2/log"
"github.com/dobyte/due/v2/utils/xtime"
)
// 路由号
const greet = 1
func main() {
// 创建容器
container := due.NewContainer()
// 创建用户定位器
locator := redis.NewLocator()
// 创建服务发现
registry := consul.NewRegistry()
// 创建节点组件
component := node.NewNode(
node.WithLocator(locator),
node.WithRegistry(registry),
)
// 初始化应用
initApp(component.Proxy())
// 添加节点组件
container.Add(component)
// 启动容器
container.Serve()
}
// 初始化应用
func initApp(proxy *node.Proxy) {
proxy.Router().AddRouteHandler(greet, greetHandler)
}
// 请求
type greetReq struct {
Message string `json:"message"`
}
// 响应
type greetRes struct {
Code int `json:"code"`
Message string `json:"message"`
}
// 路由处理器
// 将原本在单线程中的任务通过ctx.Task投递到协程池中进行处理,任务处理顺序也由原本的有序变为无序,并发处理需要使用同步原语。
func greetHandler(ctx node.Context) {
ctx.Task(func(ctx node.Context) {
req := &greetReq{}
res := &greetRes{}
ctx.Defer(func() {
if err := ctx.Response(res); err != nil {
log.Errorf("response message failed: %v", err)
}
})
if err := ctx.Parse(req); err != nil {
log.Errorf("parse request message failed: %v", err)
res.Code = codes.InternalError.Code()
return
}
log.Info(req.Message)
res.Code = codes.OK.Code()
res.Message = fmt.Sprintf("I'm tcp server, and the current time is: %s", xtime.Now().Format(xtime.DateTime))
})
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
- 运行示例
$ cd multiple-coroutines-example
$ go run main.go
____ __ ________
/ __ \/ / / / ____/
/ / / / / / / __/
/ /_/ / /_/ / /___
/_____/\____/_____/
┌──────────────────────────────────────────────────────┐
| [Website] https://github.com/dobyte/due |
| [Version] v2.4.2 |
└──────────────────────────────────────────────────────┘
┌────────────────────────Global────────────────────────┐
| PID: 38216 |
| Mode: debug |
| Time: 2025-10-30 10:23:05.3178508 +0800 CST |
└──────────────────────────────────────────────────────┘
┌─────────────────────────Node─────────────────────────┐
| ID: 5dfbb498-b537-11f0-adcd-f4f19e1f0070 |
| Name: node |
| Link: 192.168.2.202:55923 |
| Codec: json |
| Locator: redis |
| Registry: consul |
| Encryptor: - |
| Transporter: - |
└──────────────────────────────────────────────────────┘
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
← 2.7 单线程模型 2.9 Actor模型 →