增加 怎么使用 Golang 的 Channel 做广播.
Signed-off-by: chen.yang <chen.yang@yuzhen-iot.com>
This commit is contained in:
parent
075ca1c6e9
commit
765ef02a7c
|
@ -0,0 +1,250 @@
|
|||
# [怎么使用 Golang 的 Channel 做广播](https://blog.csdn.net/shenshouer/article/details/53401553)
|
||||
|
||||
- [怎么使用 Golang 的 Channel 做广播](#怎么使用-golang-的-channel-做广播)
|
||||
- [1. 当监听者数量已知时](#1-当监听者数量已知时)
|
||||
- [2. 当监听者数量为未知时](#2-当监听者数量为未知时)
|
||||
- [3. 最后一个问题:绝对不要让一个 goroutine 挂起](#3-最后一个问题绝对不要让一个-goroutine-挂起)
|
||||
|
||||
使用 golang 中的 channel 做广播需要使用到 golang 并发模式中的扇出模式,也就是说多个接入点监听一个输入源。这种模式的结果是,只要输入源输入一个消息,任何一个监听者都能获取到这个消息。这里仅有一个例外就是 channel 关闭。这个关闭将所有监听者都关闭,这就是扇出模式。删除模式简单定义为:多个函数可以从同一个 channel 读取数据,直到这个 channel 关闭。
|
||||
|
||||
## 1. 当监听者数量已知时
|
||||
|
||||
让每个 worker 监听专有的广播 channel, 并且从主 channel 中派发消息到每一个专有的广播 channel 中。
|
||||
|
||||
```go
|
||||
type worker struct {
|
||||
name string
|
||||
source chan interface{}
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
func (w *worker) Start() {
|
||||
w.source = make(chan interface{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case msg := <-w.source:
|
||||
fmt.Println("==========>> ", w.name, msg)
|
||||
case <-w.quit: // 后面解释
|
||||
fmt.Println(w.name, " quit!")
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
```
|
||||
|
||||
此时定义两个 worker:
|
||||
|
||||
```go
|
||||
workers := []*worker{&worker{}, &worker{}}
|
||||
for _, w := range workers {
|
||||
w.Start()
|
||||
}
|
||||
```
|
||||
|
||||
派发消息:
|
||||
|
||||
```go
|
||||
go func() {
|
||||
msg := "test"
|
||||
count := 0
|
||||
var sendMsg string
|
||||
for {
|
||||
select {
|
||||
case <-globalQuit:
|
||||
fmt.Println("Stop send message")
|
||||
return
|
||||
case <-time.Tick(500 * time.Millisecond):
|
||||
count++
|
||||
sendMsg = fmt.Sprintf("%s-%d", msg, count)
|
||||
fmt.Println("Send message is ", sendMsg)
|
||||
for _, wk := range workers {
|
||||
wk.source <- sendMsg
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
```
|
||||
|
||||
## 2. 当监听者数量为未知时
|
||||
|
||||
在这种情况下,上述解决办法依然可行。唯一不同的地方在于,无论什么时候需要一个新的 worker 时,仅仅只需要新建一个 worker,并开启它,然后 push 到 workers 的 slice 中。但这种方式需要一个线程安全的 slice,需要一个同步锁。其实现如下:
|
||||
|
||||
```go
|
||||
type threadSafeSlice struct {
|
||||
sync.Mutex
|
||||
workers []*worker
|
||||
}
|
||||
|
||||
func (slice *threadSafeSlice) Push(w *worker) {
|
||||
slice.Lock()
|
||||
defer slice.Unlock()
|
||||
|
||||
slice.workers = append(slice.workers, w)
|
||||
}
|
||||
|
||||
func (slice *threadSafeSlice) Iter(routine func(*worker)) {
|
||||
slice.Lock()
|
||||
defer slice.Unlock()
|
||||
|
||||
for _, worker := range slice.workers {
|
||||
routine(worker)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
任何时候,需要一个新的 worker 时:
|
||||
|
||||
```go
|
||||
w := &worker{}
|
||||
w.Start()
|
||||
threadSafeSlice.Push(w)
|
||||
```
|
||||
|
||||
然后派发消息修改如下伪代码所示:
|
||||
|
||||
```go
|
||||
go func() {
|
||||
for {
|
||||
msg := <- ch
|
||||
threadSafeSlice.Iter(func(w *worker) { w.source <- msg })
|
||||
}
|
||||
}()
|
||||
```
|
||||
|
||||
## 3. 最后一个问题:绝对不要让一个 goroutine 挂起
|
||||
|
||||
一个好的实践是:绝对不要让一个 goroutine 挂起。所以当完成监听后,必须关闭所有激活的 goroutine。这将通过 worker 中的 quitchannel 进行:
|
||||
|
||||
首先,创建一个全局的 quit 信号 channel:
|
||||
|
||||
```go
|
||||
globalQuit := make(chan struct{})
|
||||
```
|
||||
|
||||
并且在任何一个新建的 worker 时,将 globalQuit 分配给这个 worker 的 quit channel
|
||||
|
||||
```go
|
||||
worker.quit = globalQuit
|
||||
```
|
||||
|
||||
然后当需要关闭所有的 worker 时,仅仅只需要这么做:
|
||||
|
||||
```go
|
||||
close(globalQuit)
|
||||
```
|
||||
|
||||
因此 close 将被所有监听的 goroutine 所接受。所有的 goroutine 将被返回。
|
||||
|
||||
最后完善后的代码如下所示:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
type threadSafeSlice struct {
|
||||
sync.Mutex
|
||||
workers []*worker
|
||||
}
|
||||
|
||||
func (slice *threadSafeSlice) Push(w *worker) {
|
||||
slice.Lock()
|
||||
defer slice.Unlock()
|
||||
|
||||
slice.workers = append(slice.workers, w)
|
||||
}
|
||||
|
||||
func (slice *threadSafeSlice) Iter(routine func(*worker)) {
|
||||
slice.Lock()
|
||||
defer slice.Unlock()
|
||||
|
||||
for _, worker := range slice.workers {
|
||||
routine(worker)
|
||||
}
|
||||
}
|
||||
|
||||
type worker struct {
|
||||
name string
|
||||
source chan interface{}
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
func (w *worker) Start() {
|
||||
w.source = make(chan interface{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case msg := <-w.source:
|
||||
fmt.Println("==========>> ", w.name, msg)
|
||||
case <-w.quit: // 后面解释
|
||||
fmt.Println(w.name, " quit!")
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func main() {
|
||||
globalQuit := make(chan struct{})
|
||||
|
||||
tss := &threadSafeSlice{}
|
||||
|
||||
// 1 秒钟添加一个新的 worker 至 slice 中
|
||||
go func() {
|
||||
name := "worker"
|
||||
for i := 0; i < 10; i++ {
|
||||
time.Sleep(1 * time.Second)
|
||||
w := &worker{
|
||||
name: fmt.Sprintf("%s%d", name, i),
|
||||
quit: globalQuit,
|
||||
}
|
||||
w.Start()
|
||||
tss.Push(w)
|
||||
}
|
||||
}()
|
||||
|
||||
// 派发消息
|
||||
go func() {
|
||||
msg := "test"
|
||||
count := 0
|
||||
var sendMsg string
|
||||
for {
|
||||
select {
|
||||
case <-globalQuit:
|
||||
fmt.Println("Stop send message")
|
||||
return
|
||||
case <-time.Tick(500 * time.Millisecond):
|
||||
count++
|
||||
sendMsg = fmt.Sprintf("%s-%d", msg, count)
|
||||
fmt.Println("Send message is ", sendMsg)
|
||||
tss.Iter(func(w *worker) { w.source <- sendMsg })
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// 截获退出信号
|
||||
c := make(chan os.Signal, 1)
|
||||
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
for sig := range c {
|
||||
switch sig {
|
||||
case syscall.SIGINT, syscall.SIGTERM: // 获取退出信号时,关闭 globalQuit, 让所有监听者退出
|
||||
close(globalQuit)
|
||||
time.Sleep(1 * time.Second)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
Loading…
Reference in New Issue