# 4.4 事件总线

# 4.4.1 基础介绍

事件总线是一种通信机制,用于在系统内部的发布者与订阅者之间传递事件,实现松耦合的异步通信。

# 4.4.2 何时使用

不同组件、服务、进程之间需要解耦通信的场景。

# 4.4.3 如何使用

以下以nats事件总线为例展示如何使用,其他事件总线的使用方法类似。

  1. 安装依赖
$ go get github.com/dobyte/due/v2@v2.4.2
$ go get github.com/dobyte/due/eventbus/nats/v2@e5cd009
1
2
  1. 导入依赖
import (
	"github.com/dobyte/due/v2/eventbus"
	"github.com/dobyte/due/eventbus/nats/v2"
)
1
2
3
4
  1. 设置事件总线
// 在全局设置事件总线
eventbus.SetEventbus(nats.NewEventbus())
1
2
  1. 使用事件总线
// 订阅事件
eventbus.Subscribe(context.Background(), "topic", func(evt *eventbus.Event) {
    // TODO: 处理事件
    // ...
})

// 发布事件
eventbus.Publish(context.Background(), "topic", []byte("hello"))
1
2
3
4
5
6
7
8
  1. 接口文档

https://pkg.go.dev/github.com/dobyte/due/v2/eventbus (opens new window)

  1. 全部实现

# 4.4.4 nats事件总线示例

以下完整示例详见:eventbus-nats-example (opens new window)

  1. 创建项目
$ mkdir eventbus-nats-example
1
  1. 安装依赖
$ cd eventbus-nats-example
$ go mod init eventbus-nats-example
$ go get github.com/dobyte/due/v2@v2.4.2
$ go get github.com/dobyte/due/eventbus/nats/v2@e5cd009
1
2
3
4
  1. 启动配置

文件位置:eventbus-nats-example/etc/etc.toml (opens new window)

[eventbus.nats]
    # 客户端连接地址,默认为nats://127.0.0.1:4222
    url = "nats://127.0.0.1:4222"
    # 客户端连接超时时间,支持单位:纳秒(ns)、微秒(us | µs)、毫秒(ms)、秒(s)、分(m)、小时(h)、天(d)。默认为2s
    timeout = "2s"
    # key前缀
    prefix = "due:eventbus"
1
2
3
4
5
6
7
  1. 编写示例

文件位置:eventbus-nats-example/main.go (opens new window)

package main

import (
	"context"
	"time"

	"github.com/dobyte/due/eventbus/nats/v2"
	"github.com/dobyte/due/v2/eventbus"
	"github.com/dobyte/due/v2/log"
)

type Payload struct {
	Message string `json:"message"`
}

func main() {
	// 设置事件总线
	eventbus.SetEventbus(nats.NewEventbus())

	var (
		ctx   = context.Background()
		topic = "message"
	)

	// 订阅事件
	if err := eventbus.Subscribe(ctx, topic, func(evt *eventbus.Event) {
		payload := &Payload{}

		if err := evt.Payload.Scan(payload); err != nil {
			log.Errorf("scan payload failed: %v", err)
			return
		}

		log.Infof("evt id: %s, topic: %s, payload message: %s", evt.ID, evt.Topic, payload.Message)
	}); err != nil {
		log.Errorf("subscribe event failed: %v", err)
		return
	}

	ticker := time.NewTicker(1 * time.Second)
	defer ticker.Stop()

	for {
		<-ticker.C

		// 发布事件
		if err := eventbus.Publish(ctx, topic, &Payload{
			Message: "hello world",
		}); err != nil {
			log.Errorf("publish event failed: %v", err)
		}
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
  1. 运行示例
$ cd eventbus-nats-example
$ go run main.go
INFO[2025/10/25 18:58:57.774823] main.go:34 evt id: 9b033e9b-b191-11f0-b0b7-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:58:58.755565] main.go:34 evt id: 9b9bea3f-b191-11f0-b0b7-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:58:59.755049] main.go:34 evt id: 9c346739-b191-11f0-b0b7-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:59:00.756371] main.go:34 evt id: 9cccf95b-b191-11f0-b0b7-f4f19e1f0070, topic: message, payload message: hello world
1
2
3
4
5
6

# 4.4.5 redis事件总线示例

以下完整示例详见:eventbus-redis-example (opens new window)

  1. 创建项目
$ mkdir eventbus-redis-example
1
  1. 安装依赖
$ cd eventbus-redis-example
$ go mod init eventbus-redis-example
$ go get github.com/dobyte/due/v2@v2.4.2
$ go get github.com/dobyte/due/eventbus/redis/v2@e5cd009
1
2
3
4
  1. 启动配置

文件位置:eventbus-redis-example/etc/etc.toml (opens new window)

[eventbus.redis]
    # 客户端连接地址
    addrs = ["127.0.0.1:6379"]
    # 数据库号
    db = 0
    # 用户名
    username = ""
    # 密码
    password = ""
    # 私钥文件
    keyFile = ""
    # 证书文件
    certFile = ""
    # CA证书文件
    caFile = ""
    # 最大重试次数
    maxRetries = 3
    # key前缀
    prefix = "due:eventbus"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
  1. 编写示例

文件位置:eventbus-redis-example/main.go (opens new window)

package main

import (
	"context"
	"time"

	"github.com/dobyte/due/eventbus/redis/v2"
	"github.com/dobyte/due/v2/eventbus"
	"github.com/dobyte/due/v2/log"
)

type Payload struct {
	Message string `json:"message"`
}

func main() {
	// 设置事件总线
	eventbus.SetEventbus(redis.NewEventbus())

	var (
		ctx   = context.Background()
		topic = "message"
	)

	// 订阅事件
	if err := eventbus.Subscribe(ctx, topic, func(evt *eventbus.Event) {
		payload := &Payload{}

		if err := evt.Payload.Scan(payload); err != nil {
			log.Errorf("scan payload failed: %v", err)
			return
		}

		log.Infof("evt id: %s, topic: %s, payload message: %s", evt.ID, evt.Topic, payload.Message)
	}); err != nil {
		log.Errorf("subscribe event failed: %v", err)
		return
	}

	ticker := time.NewTicker(1 * time.Second)
	defer ticker.Stop()

	for {
		<-ticker.C

		// 发布事件
		if err := eventbus.Publish(ctx, topic, &Payload{
			Message: "hello world",
		}); err != nil {
			log.Errorf("publish event failed: %v", err)
		}
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
  1. 运行示例
$ cd eventbus-redis-example
$ go run main.go
INFO[2025/10/25 18:34:58.299463] main.go:34 evt id: 4104fd7a-b18e-11f0-ae1c-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:34:59.281495] main.go:34 evt id: 419d953c-b18e-11f0-ae1c-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:35:00.280929] main.go:34 evt id: 4236264d-b18e-11f0-ae1c-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:35:01.281501] main.go:34 evt id: 42cebf5a-b18e-11f0-ae1c-f4f19e1f0070, topic: message, payload message: hello world
1
2
3
4
5
6

# 4.4.6 kafka事件总线示例

以下完整示例详见:eventbus-kafka-example (opens new window)

  1. 创建项目
$ mkdir eventbus-kafka-example
1
  1. 安装依赖
$ cd eventbus-kafka-example
$ go mod init eventbus-kafka-example
$ go get github.com/dobyte/due/v2@v2.4.2
$ go get github.com/dobyte/due/eventbus/kafka/v2@e5cd009
1
2
3
4
  1. 启动配置

文件位置:eventbus-kafka-example/etc/etc.toml (opens new window)

[eventbus.kafka]
    # 客户端连接地址
    addrs = ["127.0.0.1:9092"]
    # Kafka版本,默认为无版本
    version = ""
    # key前缀
    prefix = "due:eventbus"
    # 是否自动创建topic,默认为false
    autoCreateTopic = true
1
2
3
4
5
6
7
8
9
  1. 编写示例

文件位置:eventbus-kafka-example/main.go (opens new window)

package main

import (
	"context"
	"time"

	"github.com/dobyte/due/eventbus/kafka/v2"
	"github.com/dobyte/due/v2/eventbus"
	"github.com/dobyte/due/v2/log"
)

type Payload struct {
	Message string `json:"message"`
}

func main() {
	// 设置事件总线
	eventbus.SetEventbus(kafka.NewEventbus())

	var (
		ctx   = context.Background()
		topic = "message"
	)

	// 订阅事件
	if err := eventbus.Subscribe(ctx, topic, func(evt *eventbus.Event) {
		payload := &Payload{}

		if err := evt.Payload.Scan(payload); err != nil {
			log.Errorf("scan payload failed: %v", err)
			return
		}

		log.Infof("evt id: %s, topic: %s, payload message: %s", evt.ID, evt.Topic, payload.Message)
	}); err != nil {
		log.Errorf("subscribe event failed: %v", err)
		return
	}

	ticker := time.NewTicker(1 * time.Second)
	defer ticker.Stop()

	for {
		<-ticker.C

		// 发布事件
		if err := eventbus.Publish(ctx, topic, &Payload{
			Message: "hello world",
		}); err != nil {
			log.Errorf("publish event failed: %v", err)
		}
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
  1. 运行示例
$ cd eventbus-redis-example
$ go run main.go
INFO[2025/10/25 18:34:58.299463] main.go:34 evt id: 4104fd7a-b18e-11f0-ae1c-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:34:59.281495] main.go:34 evt id: 419d953c-b18e-11f0-ae1c-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:35:00.280929] main.go:34 evt id: 4236264d-b18e-11f0-ae1c-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:35:01.281501] main.go:34 evt id: 42cebf5a-b18e-11f0-ae1c-f4f19e1f0070, topic: message, payload message: hello world
1
2
3
4
5
6