目录

GO第三方库-melody

melody 可以轻松帮我们实现 websocket 的广播.

动机

  • 最近在看 golang 有没有什么关于 websocket 的第三方库,看到最多的就是 github.com/gorilla/websocket 这个库,但是这个库只提供看最基础的一些封装,如果要实现更高级一点的功能,就需要自己封装,感觉太麻烦了。

  • 然后后面发现了 github.com/olahol/melody 这个库,这个库在 gorilla/websocket 的基础上又做了一些封装,广播很方便。

  • 不过,我的场景是希望可以分组广播,如果用它来做也不是不行,就是需要自己封装,也挺费劲的,后面又发现了一个库,“github.com/maoqide/ws-notifier” ,这个库,就已经帮我们实现了我们想要的功能。

接下来我们就简单看看 melody 和 ws-notifier 的简单用法。

melody

  • melody 官方有 2 个样例,一个是简单的 chat 聊天模式,一个是类似共享画板的样例,这个样例提供了更多高级的特性。

  • chat 的模式,所有连接上来的会话,都会收到相同的广播信息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
    "net/http"

    "github.com/olahol/melody"
)

func main() {
    m := melody.New()

    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        http.ServeFile(w, r, "index.html")
    })

    http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
        m.HandleRequest(w, r)
    })

    m.HandleMessage(func(s *melody.Session, msg []byte) {
        m.Broadcast(msg)
    })

    http.ListenAndServe(":5000", nil)
}

ws-notifier

  • ws-notifier 是在 melody 的基础上,自己做了一些封装,使得消息可以分组发送,比如,所有连接到同一个 uri 的会话,就是一个组,只有这个组的会话能收到广播信息。

  • 官方的例子是,设置一个定时任务,模拟给连接上来的同个组的会话进行消息广播。我们这里给他改造一下,模拟用户连接上来后,在其他地方做了某些操作,然后给指定 groupID 内的成员发送广播。

  • var n = notifier.Default() ,作为全局变量,我们这里是 demo 用,如果变量要给其他包调用,可以改成首字母大写,或通过函数给他返回出去。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package main

import (
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "strings"
    "time"

    notifier "github.com/maoqide/ws-notifier"
)

var n = notifier.Default()

func main() {
    fmt.Println("hello")

    http.HandleFunc("/mon", handleNotifierMon)
    http.HandleFunc("/", handleWsFunc)

    // Simulate sending messages periodically
    go func() {
        ticker := time.NewTicker(time.Second * 2)
        for {
            <-ticker.C
        sendMsg("ticker_ws")
        }
    }()

    http.ListenAndServe(":5000", nil)
}

func handleNotifierMon(w http.ResponseWriter, r *http.Request) {
    ret, _ := json.MarshalIndent(DebugInfo(), "", "\t")
    w.Write(ret)
}

func handleWsFunc(w http.ResponseWriter, r *http.Request) {
    prefix := "ticker_"

    group := strings.Trim(r.RequestURI, "/")
    // should be random generated
    sessionID := "123456"

    groupID := prefix + group
    n.Notify(groupID, tickerWorker, time.Hour*24)
    n.HandleRequestWithKeys(w, r, map[string]interface{}{"group": groupID, "id": groupID + "_" + sessionID})
}

func tickerWorker(groupID string, sigChan chan int8, n *notifier.Notifier) error {
    worker := fmt.Sprintf("ticker_worker_%s_%d", groupID, time.Now().Unix())
    fmt.Printf("worker: %s\n", worker)

    defer func() {
        select {
        case sigChan <- 0:
            log.Printf("ticker worker: %s exit", worker)
        case <-time.After(time.Second * 3):
            log.Printf("ticker worker: %s exit after 3s delaying", worker)
        }
    }()

    signal := <-sigChan
    log.Printf("receice stop signal %d for ticker worker: %s", signal, worker)
    return nil
}

func sendMsg(groupId string) {
    err := n.GroupBroadcast([]byte(fmt.Sprintf("%s", groupId)), groupId)
    if err != nil {
        log.Printf("err: %v", err)
    }
}

// NotifierState describe notifier states
type NotifierState struct {
    Sessions map[string][]string `json:"sessions"`
    Workers  []string            `json:"workers"`
}

// DebugInfo return debug info for websocket notifier
func DebugInfo() *NotifierState {
    state := NotifierState{}
    n := notifier.Default()
    state.Sessions = n.SessionManager.ShowSessions()
    state.Workers = n.ShowWorkers()
    return &state
}

总结

  1. 别人已经封装好了,就优先使用别人的库,不然自己去弄太麻烦了。

  2. 按分组发送消息的功能在某些场景还是挺好用的,比如实时显示任务执行日志,uri 用的任务 id ,同个 uri 就是同个任务,所有连上来的会话都可以收到实时的任务日志。