# 4.4 事件总线
# 4.4.1 基础介绍
事件总线是一种通信机制,用于在系统内部的发布者与订阅者之间传递事件,实现松耦合的异步通信。
# 4.4.2 何时使用
不同组件、服务、进程之间需要解耦通信的场景。
# 4.4.3 如何使用
以下以nats事件总线为例展示如何使用,其他事件总线的使用方法类似。
- 安装依赖
$ go get github.com/dobyte/due/v2@v2.4.2
$ go get github.com/dobyte/due/eventbus/nats/v2@e5cd009
1
2
2
- 导入依赖
import (
"github.com/dobyte/due/v2/eventbus"
"github.com/dobyte/due/eventbus/nats/v2"
)
1
2
3
4
2
3
4
- 设置事件总线
// 在全局设置事件总线
eventbus.SetEventbus(nats.NewEventbus())
1
2
2
- 使用事件总线
// 订阅事件
eventbus.Subscribe(context.Background(), "topic", func(evt *eventbus.Event) {
// TODO: 处理事件
// ...
})
// 发布事件
eventbus.Publish(context.Background(), "topic", []byte("hello"))
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
- 接口文档
https://pkg.go.dev/github.com/dobyte/due/v2/eventbus (opens new window)
- 全部实现
- nats (opens new window)
- redis (opens new window)
- kafka (opens new window)
- process (opens new window)
# 4.4.4 nats事件总线示例
以下完整示例详见:eventbus-nats-example (opens new window)
- 创建项目
$ mkdir eventbus-nats-example
1
- 安装依赖
$ cd eventbus-nats-example
$ go mod init eventbus-nats-example
$ go get github.com/dobyte/due/v2@v2.4.2
$ go get github.com/dobyte/due/eventbus/nats/v2@e5cd009
1
2
3
4
2
3
4
- 启动配置
文件位置:eventbus-nats-example/etc/etc.toml (opens new window)
[eventbus.nats]
# 客户端连接地址,默认为nats://127.0.0.1:4222
url = "nats://127.0.0.1:4222"
# 客户端连接超时时间,支持单位:纳秒(ns)、微秒(us | µs)、毫秒(ms)、秒(s)、分(m)、小时(h)、天(d)。默认为2s
timeout = "2s"
# key前缀
prefix = "due:eventbus"
1
2
3
4
5
6
7
2
3
4
5
6
7
- 编写示例
文件位置:eventbus-nats-example/main.go (opens new window)
package main
import (
"context"
"time"
"github.com/dobyte/due/eventbus/nats/v2"
"github.com/dobyte/due/v2/eventbus"
"github.com/dobyte/due/v2/log"
)
type Payload struct {
Message string `json:"message"`
}
func main() {
// 设置事件总线
eventbus.SetEventbus(nats.NewEventbus())
var (
ctx = context.Background()
topic = "message"
)
// 订阅事件
if err := eventbus.Subscribe(ctx, topic, func(evt *eventbus.Event) {
payload := &Payload{}
if err := evt.Payload.Scan(payload); err != nil {
log.Errorf("scan payload failed: %v", err)
return
}
log.Infof("evt id: %s, topic: %s, payload message: %s", evt.ID, evt.Topic, payload.Message)
}); err != nil {
log.Errorf("subscribe event failed: %v", err)
return
}
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
<-ticker.C
// 发布事件
if err := eventbus.Publish(ctx, topic, &Payload{
Message: "hello world",
}); err != nil {
log.Errorf("publish event failed: %v", err)
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
- 运行示例
$ cd eventbus-nats-example
$ go run main.go
INFO[2025/10/25 18:58:57.774823] main.go:34 evt id: 9b033e9b-b191-11f0-b0b7-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:58:58.755565] main.go:34 evt id: 9b9bea3f-b191-11f0-b0b7-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:58:59.755049] main.go:34 evt id: 9c346739-b191-11f0-b0b7-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:59:00.756371] main.go:34 evt id: 9cccf95b-b191-11f0-b0b7-f4f19e1f0070, topic: message, payload message: hello world
1
2
3
4
5
6
2
3
4
5
6
# 4.4.5 redis事件总线示例
以下完整示例详见:eventbus-redis-example (opens new window)
- 创建项目
$ mkdir eventbus-redis-example
1
- 安装依赖
$ cd eventbus-redis-example
$ go mod init eventbus-redis-example
$ go get github.com/dobyte/due/v2@v2.4.2
$ go get github.com/dobyte/due/eventbus/redis/v2@e5cd009
1
2
3
4
2
3
4
- 启动配置
文件位置:eventbus-redis-example/etc/etc.toml (opens new window)
[eventbus.redis]
# 客户端连接地址
addrs = ["127.0.0.1:6379"]
# 数据库号
db = 0
# 用户名
username = ""
# 密码
password = ""
# 私钥文件
keyFile = ""
# 证书文件
certFile = ""
# CA证书文件
caFile = ""
# 最大重试次数
maxRetries = 3
# key前缀
prefix = "due:eventbus"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
- 编写示例
文件位置:eventbus-redis-example/main.go (opens new window)
package main
import (
"context"
"time"
"github.com/dobyte/due/eventbus/redis/v2"
"github.com/dobyte/due/v2/eventbus"
"github.com/dobyte/due/v2/log"
)
type Payload struct {
Message string `json:"message"`
}
func main() {
// 设置事件总线
eventbus.SetEventbus(redis.NewEventbus())
var (
ctx = context.Background()
topic = "message"
)
// 订阅事件
if err := eventbus.Subscribe(ctx, topic, func(evt *eventbus.Event) {
payload := &Payload{}
if err := evt.Payload.Scan(payload); err != nil {
log.Errorf("scan payload failed: %v", err)
return
}
log.Infof("evt id: %s, topic: %s, payload message: %s", evt.ID, evt.Topic, payload.Message)
}); err != nil {
log.Errorf("subscribe event failed: %v", err)
return
}
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
<-ticker.C
// 发布事件
if err := eventbus.Publish(ctx, topic, &Payload{
Message: "hello world",
}); err != nil {
log.Errorf("publish event failed: %v", err)
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
- 运行示例
$ cd eventbus-redis-example
$ go run main.go
INFO[2025/10/25 18:34:58.299463] main.go:34 evt id: 4104fd7a-b18e-11f0-ae1c-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:34:59.281495] main.go:34 evt id: 419d953c-b18e-11f0-ae1c-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:35:00.280929] main.go:34 evt id: 4236264d-b18e-11f0-ae1c-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:35:01.281501] main.go:34 evt id: 42cebf5a-b18e-11f0-ae1c-f4f19e1f0070, topic: message, payload message: hello world
1
2
3
4
5
6
2
3
4
5
6
# 4.4.6 kafka事件总线示例
以下完整示例详见:eventbus-kafka-example (opens new window)
- 创建项目
$ mkdir eventbus-kafka-example
1
- 安装依赖
$ cd eventbus-kafka-example
$ go mod init eventbus-kafka-example
$ go get github.com/dobyte/due/v2@v2.4.2
$ go get github.com/dobyte/due/eventbus/kafka/v2@e5cd009
1
2
3
4
2
3
4
- 启动配置
文件位置:eventbus-kafka-example/etc/etc.toml (opens new window)
[eventbus.kafka]
# 客户端连接地址
addrs = ["127.0.0.1:9092"]
# Kafka版本,默认为无版本
version = ""
# key前缀
prefix = "due:eventbus"
# 是否自动创建topic,默认为false
autoCreateTopic = true
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
- 编写示例
文件位置:eventbus-kafka-example/main.go (opens new window)
package main
import (
"context"
"time"
"github.com/dobyte/due/eventbus/kafka/v2"
"github.com/dobyte/due/v2/eventbus"
"github.com/dobyte/due/v2/log"
)
type Payload struct {
Message string `json:"message"`
}
func main() {
// 设置事件总线
eventbus.SetEventbus(kafka.NewEventbus())
var (
ctx = context.Background()
topic = "message"
)
// 订阅事件
if err := eventbus.Subscribe(ctx, topic, func(evt *eventbus.Event) {
payload := &Payload{}
if err := evt.Payload.Scan(payload); err != nil {
log.Errorf("scan payload failed: %v", err)
return
}
log.Infof("evt id: %s, topic: %s, payload message: %s", evt.ID, evt.Topic, payload.Message)
}); err != nil {
log.Errorf("subscribe event failed: %v", err)
return
}
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
<-ticker.C
// 发布事件
if err := eventbus.Publish(ctx, topic, &Payload{
Message: "hello world",
}); err != nil {
log.Errorf("publish event failed: %v", err)
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
- 运行示例
$ cd eventbus-redis-example
$ go run main.go
INFO[2025/10/25 18:34:58.299463] main.go:34 evt id: 4104fd7a-b18e-11f0-ae1c-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:34:59.281495] main.go:34 evt id: 419d953c-b18e-11f0-ae1c-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:35:00.280929] main.go:34 evt id: 4236264d-b18e-11f0-ae1c-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:35:01.281501] main.go:34 evt id: 42cebf5a-b18e-11f0-ae1c-f4f19e1f0070, topic: message, payload message: hello world
1
2
3
4
5
6
2
3
4
5
6