您好,欢迎访问一九零五行业门户网

用Go语言编写一个简单的WebSocket推送服务

推送服务实现
基本原理
server 启动以后会注册两个 handler。
websockethandler 用于提供浏览器端发送 upgrade 请求并升级为 websocket 连接。
pushhandler 用于提供外部推送端发送推送数据的请求。
浏览器首先连接 websockethandler (默认地址为 ws://ip:port/ws)升级请求为 websocket 连接,当连接建立之后需要发送注册信息进行注册。这里注册信息中包含一个 token 信息。
server 会对提供的 token 进行验证并获取到相应的 userid(通常来说,一个 userid 可能同时关联许多 token),并保存维护好 token, userid 和 conn(连接)之间的关系。
推送端发送推送数据的请求到 pushhandler(默认地址为 ws://ip:port/push),请求中包含了 userid 字段和 message 字段。server 会根据 userid 获取到所有此时连接到该 server 的 conn,然后将 message 一一进行推送。
由于推送服务的实时性,推送的数据并没有也不需要进行缓存。
代码详解
我在此处会稍微讲述一下代码的基本构成,也顺便说说 go 语言中一些常用的写法和模式(本人也是从其他语言转向 go 语言,毕竟 go 语言也相当年轻。所以有建议的话,敬请提出。)。
由于go语言的发明人和一些主要维护者大都来自于 c/c++ 语言,所以 go 语言的代码也更偏向于 c/c++ 系。
首先先看一下 server 的结构:
// server defines parameters for running websocket server.type server struct {    // address for server to listen on    addr string    // path for websocket request, default /ws.    wspath string    // path for push message, default /push.    pushpath string    // upgrader is for upgrade connection to websocket connection using    // github.com/gorilla/websocket.    //    // if upgrader is nil, default upgrader will be used. default upgrader is    // set readbuffersize and writebuffersize to 1024, and checkorigin always    // returns true.    upgrader *websocket.upgrader    // check token if it's valid and return userid. if token is valid, userid    // must be returned and ok should be true. otherwise ok should be false.    authtoken func(token string) (userid string, ok bool)    // authorize push request. message will be sent if it returns true,    // otherwise the request will be discarded. default nil and push request    // will always be accepted.    pushauth func(r *http.request) bool    wh *websockethandler    ph *pushhandler}
这里说一下 upgrader *websocket.upgrader,这是 gorilla/websocket 包的对象,它用来升级 http 请求。
如果一个结构体参数过多,通常不建议直接初始化,而是使用它提供的 new 方法。这里是:
// newserver creates a new server.func newserver(addr string) *server {    return &server{        addr:     addr,        wspath:   serverdefaultwspath,        pushpath: serverdefaultpushpath,    }}
这也是 go 语言对外提供初始化方法的一种常见用法。
然后 server 使用 listenandserve 方法启动并监听端口,与 http 包的使用类似:
// listenandserve listens on the tcp network address and handle websocket// request.func (s *server) listenandserve() error {    b := &binder{        userid2eventconnmap: make(map[string]*[]eventconn),        connid2useridmap:    make(map[string]string),    }    // websocket request handler    wh := websockethandler{        upgrader: defaultupgrader,        binder:   b,    }    if s.upgrader != nil {        wh.upgrader = s.upgrader    }    if s.authtoken != nil {        wh.calcuseridfunc = s.authtoken    }    s.wh = &wh    http.handle(s.wspath, s.wh)    // push request handler    ph := pushhandler{        binder: b,    }    if s.pushauth != nil {        ph.authfunc = s.pushauth    }    s.ph = &ph    http.handle(s.pushpath, s.ph)    return http.listenandserve(s.addr, nil)}
这里我们生成了两个 handler,分别为 websockethandler 和 pushhandler。websockethandler 负责与浏览器建立连接并传输数据,而 pushhandler 则处理推送端的请求。
可以看到,这里两个 handler 都封装了一个 binder 对象。这个 binder 用于维护 token <-> userid <-> conn 的关系:
// binder is defined to store the relation of userid and eventconntype binder struct {    mu sync.rwmutex    // map stores key: userid and value of related slice of eventconn    userid2eventconnmap map[string]*[]eventconn    // map stores key: connid and value: userid    connid2useridmap map[string]string}
websockethandler
具体看一下 websockethandler 的实现。
// websockethandler defines to handle websocket upgrade request.type websockethandler struct {    // upgrader is used to upgrade request.    upgrader *websocket.upgrader    // binder stores relations about websocket connection and userid.    binder *binder    // calcuseridfunc defines to calculate userid by token. the userid will    // be equal to token if this function is nil.    calcuseridfunc func(token string) (userid string, ok bool)}
很简单的结构。websockethandler 实现了 http.handler 接口:
// first try to upgrade connection to websocket. if success, connection will// be kept until client send close message or server drop them.func (wh *websockethandler) servehttp(w http.responsewriter, r *http.request) {    wsconn, err := wh.upgrader.upgrade(w, r, nil)    if err != nil {        return    }    defer wsconn.close()    // handle websocket request    conn := newconn(wsconn)    conn.afterreadfunc = func(messagetype int, r io.reader) {        var rm registermessage        decoder := json.newdecoder(r)        if err := decoder.decode(&rm); err != nil {            return        }        // calculate userid by token        userid := rm.token        if wh.calcuseridfunc != nil {            uid, ok := wh.calcuseridfunc(rm.token)            if !ok {                return            }            userid = uid        }        // bind        wh.binder.bind(userid, rm.event, conn)    }    conn.beforeclosefunc = func() {        // unbind        wh.binder.unbind(conn)    }    conn.listen()}
首先将传入的 http.request 转换为 websocket.conn,再将其分装为我们自定义的一个 wserver.conn(封装,或者说是组合,是 go 语言的典型用法。记住,go 语言没有继承,只有组合)。
然后设置了 conn 的 afterreadfunc 和 beforeclosefunc 方法,接着启动了 conn.listen()。afterreadfunc 意思是当 conn 读取到数据后,尝试验证并根据 token 计算 userid,然乎 bind 注册绑定。beforeclosefunc 则为 conn 关闭前进行解绑操作。
pushhandler
pushhandler 则容易理解。它解析请求然后推送数据:
// authorize if needed. then decode the request and push message to each// realted websocket connection.func (s *pushhandler) servehttp(w http.responsewriter, r *http.request) {    if r.method != http.methodpost {        w.writeheader(http.statusmethodnotallowed)        return    }    // authorize    if s.authfunc != nil {        if ok := s.authfunc(r); !ok {            w.writeheader(http.statusunauthorized)            return        }    }    // read request    var pm pushmessage    decoder := json.newdecoder(r.body)    if err := decoder.decode(&pm); err != nil {        w.writeheader(http.statusbadrequest)        w.write([]byte(errrequestillegal.error()))        return    }    // validate the data    if pm.userid ==  || pm.event ==  || pm.message ==  {        w.writeheader(http.statusbadrequest)        w.write([]byte(errrequestillegal.error()))        return    }    cnt, err := s.push(pm.userid, pm.event, pm.message)    if err != nil {        w.writeheader(http.statusinternalservererror)        w.write([]byte(err.error()))        return    }    result := strings.newreader(fmt.sprintf(message sent to %d clients, cnt))    io.copy(w, result)}connconn (此处指 wserver.conn) 为 websocket.conn 的包装。// conn wraps websocket.conn with conn. it defines to listen and read// data from conn.type conn struct {    conn *websocket.conn    afterreadfunc   func(messagetype int, r io.reader)    beforeclosefunc func()    once   sync.once    id     string    stopch chan struct{}}
最主要的方法为 listen():
// listen listens for receive data from websocket connection. it blocks// until websocket connection is closed.func (c *conn) listen() {    c.conn.setclosehandler(func(code int, text string) error {        if c.beforeclosefunc != nil {            c.beforeclosefunc()        }        if err := c.close(); err != nil {            log.println(err)        }        message := websocket.formatclosemessage(code, )        c.conn.writecontrol(websocket.closemessage, message, time.now().add(time.second))        return nil    })    // keeps reading from conn util get error.readloop:    for {        select {        case <-c.stopch:            break readloop        default:            messagetype, r, err := c.conn.nextreader()            if err != nil {                // todo: handle read error maybe                break readloop            }            if c.afterreadfunc != nil {                c.afterreadfunc(messagetype, r)            }        }    }}
主要设置了当 websocket 连接关闭时的处理和不停地读取数据。
推荐:golang教程
以上就是用go语言编写一个简单的websocket推送服务的详细内容。
其它类似信息

推荐信息