事件总线

基础介绍

事件总线是一种通信机制,用于在系统内部的发布者与订阅者之间传递事件,实现松耦合的异步通信。

何时使用

不同组件、服务、进程之间需要解耦通信的场景。

如何使用

以下以 nats 事件总线为例展示如何使用,其他事件总线的使用方法类似。

  1. 安装依赖
$ go get github.com/dobyte/due/v2@v2.4.2
$ go get github.com/dobyte/due/eventbus/nats/v2@e5cd009
  1. 导入依赖
import (
	"github.com/dobyte/due/v2/eventbus"
	"github.com/dobyte/due/eventbus/nats/v2"
)
  1. 设置事件总线
// 在全局设置事件总线
eventbus.SetEventbus(nats.NewEventbus())
  1. 使用事件总线
// 订阅事件
eventbus.Subscribe(context.Background(), "topic", func(evt *eventbus.Event) {
    // TODO: 处理事件
    // ...
})
 
// 发布事件
eventbus.Publish(context.Background(), "topic", []byte("hello"))
  1. 接口文档

https://pkg.go.dev/github.com/dobyte/due/v2/eventbus

  1. 全部实现

nats 事件总线示例

以下完整示例详见: eventbus-nats-example

  1. 创建项目
$ mkdir eventbus-nats-example
  1. 安装依赖
$ cd eventbus-nats-example
$ go mod init eventbus-nats-example
$ go get github.com/dobyte/due/v2@v2.4.2
$ go get github.com/dobyte/due/eventbus/nats/v2@e5cd009
  1. 启动配置

文件位置: eventbus-nats-example/etc/etc.toml

[eventbus.nats]
    # 客户端连接地址,默认为nats://127.0.0.1:4222
    url = "nats://127.0.0.1:4222"
    # 客户端连接超时时间,支持单位:纳秒(ns)、微秒(us | µs)、毫秒(ms)、秒(s)、分(m)、小时(h)、天(d)。默认为2s
    timeout = "2s"
    # key前缀
    prefix = "due:eventbus"
  1. 编写示例

文件位置: eventbus-nats-example/main.go

package main
 
import (
	"context"
	"time"
 
	"github.com/dobyte/due/eventbus/nats/v2"
	"github.com/dobyte/due/v2/eventbus"
	"github.com/dobyte/due/v2/log"
)
 
type Payload struct {
	Message string `json:"message"`
}
 
func main() {
	// 设置事件总线
	eventbus.SetEventbus(nats.NewEventbus())
 
	var (
		ctx   = context.Background()
		topic = "message"
	)
 
	// 订阅事件
	if err := eventbus.Subscribe(ctx, topic, func(evt *eventbus.Event) {
		payload := &Payload{}
 
		if err := evt.Payload.Scan(payload); err != nil {
			log.Errorf("scan payload failed: %v", err)
			return
		}
 
		log.Infof("evt id: %s, topic: %s, payload message: %s", evt.ID, evt.Topic, payload.Message)
	}); err != nil {
		log.Errorf("subscribe event failed: %v", err)
		return
	}
 
	ticker := time.NewTicker(1 * time.Second)
	defer ticker.Stop()
 
	for {
		<-ticker.C
 
		// 发布事件
		if err := eventbus.Publish(ctx, topic, &Payload{
			Message: "hello world",
		}); err != nil {
			log.Errorf("publish event failed: %v", err)
		}
	}
}
  1. 运行示例
$ cd eventbus-nats-example
$ go run main.go
INFO[2025/10/25 18:58:57.774823] main.go:34 evt id: 9b033e9b-b191-11f0-b0b7-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:58:58.755565] main.go:34 evt id: 9b9bea3f-b191-11f0-b0b7-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:58:59.755049] main.go:34 evt id: 9c346739-b191-11f0-b0b7-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:59:00.756371] main.go:34 evt id: 9cccf95b-b191-11f0-b0b7-f4f19e1f0070, topic: message, payload message: hello world

redis 事件总线示例

以下完整示例详见: eventbus-redis-example

  1. 创建项目
$ mkdir eventbus-redis-example
  1. 安装依赖
$ cd eventbus-redis-example
$ go mod init eventbus-redis-example
$ go get github.com/dobyte/due/v2@v2.4.2
$ go get github.com/dobyte/due/eventbus/redis/v2@e5cd009
  1. 启动配置

文件位置: eventbus-redis-example/etc/etc.toml

[eventbus.redis]
    # 客户端连接地址
    addrs = ["127.0.0.1:6379"]
    # 数据库号
    db = 0
    # 用户名
    username = ""
    # 密码
    password = ""
    # 私钥文件
    keyFile = ""
    # 证书文件
    certFile = ""
    # CA证书文件
    caFile = ""
    # 最大重试次数
    maxRetries = 3
    # key前缀
    prefix = "due:eventbus"
  1. 编写示例

文件位置: eventbus-redis-example/main.go

package main
 
import (
	"context"
	"time"
 
	"github.com/dobyte/due/eventbus/redis/v2"
	"github.com/dobyte/due/v2/eventbus"
	"github.com/dobyte/due/v2/log"
)
 
type Payload struct {
	Message string `json:"message"`
}
 
func main() {
	// 设置事件总线
	eventbus.SetEventbus(redis.NewEventbus())
 
	var (
		ctx   = context.Background()
		topic = "message"
	)
 
	// 订阅事件
	if err := eventbus.Subscribe(ctx, topic, func(evt *eventbus.Event) {
		payload := &Payload{}
 
		if err := evt.Payload.Scan(payload); err != nil {
			log.Errorf("scan payload failed: %v", err)
			return
		}
 
		log.Infof("evt id: %s, topic: %s, payload message: %s", evt.ID, evt.Topic, payload.Message)
	}); err != nil {
		log.Errorf("subscribe event failed: %v", err)
		return
	}
 
	ticker := time.NewTicker(1 * time.Second)
	defer ticker.Stop()
 
	for {
		<-ticker.C
 
		// 发布事件
		if err := eventbus.Publish(ctx, topic, &Payload{
			Message: "hello world",
		}); err != nil {
			log.Errorf("publish event failed: %v", err)
		}
	}
}
  1. 运行示例
$ cd eventbus-redis-example
$ go run main.go
INFO[2025/10/25 18:34:58.299463] main.go:34 evt id: 4104fd7a-b18e-11f0-ae1c-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:34:59.281495] main.go:34 evt id: 419d953c-b18e-11f0-ae1c-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:35:00.280929] main.go:34 evt id: 4236264d-b18e-11f0-ae1c-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:35:01.281501] main.go:34 evt id: 42cebf5a-b18e-11f0-ae1c-f4f19e1f0070, topic: message, payload message: hello world

kafka 事件总线示例

以下完整示例详见: eventbus-kafka-example

  1. 创建项目
$ mkdir eventbus-kafka-example
  1. 安装依赖
$ cd eventbus-kafka-example
$ go mod init eventbus-kafka-example
$ go get github.com/dobyte/due/v2@v2.4.2
$ go get github.com/dobyte/due/eventbus/kafka/v2@e5cd009
  1. 启动配置

文件位置: eventbus-kafka-example/etc/etc.toml

[eventbus.kafka]
    # 客户端连接地址
    addrs = ["127.0.0.1:9092"]
    # Kafka版本,默认为无版本
    version = ""
    # key前缀
    prefix = "due:eventbus"
    # 是否自动创建topic,默认为false
    autoCreateTopic = true
  1. 编写示例

文件位置: eventbus-kafka-example/main.go

package main
 
import (
	"context"
	"time"
 
	"github.com/dobyte/due/eventbus/kafka/v2"
	"github.com/dobyte/due/v2/eventbus"
	"github.com/dobyte/due/v2/log"
)
 
type Payload struct {
	Message string `json:"message"`
}
 
func main() {
	// 设置事件总线
	eventbus.SetEventbus(kafka.NewEventbus())
 
	var (
		ctx   = context.Background()
		topic = "message"
	)
 
	// 订阅事件
	if err := eventbus.Subscribe(ctx, topic, func(evt *eventbus.Event) {
		payload := &Payload{}
 
		if err := evt.Payload.Scan(payload); err != nil {
			log.Errorf("scan payload failed: %v", err)
			return
		}
 
		log.Infof("evt id: %s, topic: %s, payload message: %s", evt.ID, evt.Topic, payload.Message)
	}); err != nil {
		log.Errorf("subscribe event failed: %v", err)
		return
	}
 
	ticker := time.NewTicker(1 * time.Second)
	defer ticker.Stop()
 
	for {
		<-ticker.C
 
		// 发布事件
		if err := eventbus.Publish(ctx, topic, &Payload{
			Message: "hello world",
		}); err != nil {
			log.Errorf("publish event failed: %v", err)
		}
	}
}
  1. 运行示例
$ cd eventbus-redis-example
$ go run main.go
INFO[2025/10/25 18:34:58.299463] main.go:34 evt id: 4104fd7a-b18e-11f0-ae1c-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:34:59.281495] main.go:34 evt id: 419d953c-b18e-11f0-ae1c-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:35:00.280929] main.go:34 evt id: 4236264d-b18e-11f0-ae1c-f4f19e1f0070, topic: message, payload message: hello world
INFO[2025/10/25 18:35:01.281501] main.go:34 evt id: 42cebf5a-b18e-11f0-ae1c-f4f19e1f0070, topic: message, payload message: hello world