firetower

module
v0.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2026 License: MIT

README

firetower logo

Downloads Build Status Package Utilities Version license

Firetower

firetower是一个用golang开发的分布式推送(IM)服务

完全基于websocket封装,围绕topic进行sub/pub
自身实现订阅管理服务,无需依赖redis
聊天室demo体验地址: http://chat.ojbk.io

可用版本

go get github.com/OSMeteor/[email protected]

构成

基本服务由两点构成

  • topic管理服务

详见示例 example/topicService

该服务主要作为集群环境下唯一的topic管理节点
firetower一定要依赖这个管理节点才能正常工作
大型项目可以将该服务单独部署在一台独立的服务器上,小项目可以同连接层服务一起部署在一台机器上

  • 连接层服务(websocket服务)

详见示例 example/websocketService

websocket服务是用户基于firetower自定义开发的业务逻辑
可以通过firetower提供的回调方法来实现自己的业务逻辑
(web client 在 example/web 下)

架构图

beacontower

接入姿势

package main

import (
    "fmt"
    "github.com/gorilla/websocket"
    "github.com/OSMeteor/firetower/gateway"
    "github.com/holdno/snowFlakeByGo" // 这是一个分布式全局唯一id生成器
    "net/http"
    "strconv"
)

var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
} 

var GlobalIdWorker *snowFlakeByGo.Worker

func main() {
    GlobalIdWorker, _ = snowFlakeByGo.NewWorker(1)
    // 如果是集群环境  一定一定要给每个服务设置唯一的id
    // 取值范围 1-1024
    gateway.ClusterId = 1
    http.HandleFunc("/ws", Websocket)
    fmt.Println("websocket service start: 0.0.0.0:9999")
    http.ListenAndServe("0.0.0.0:9999", nil)
}

func Websocket(w http.ResponseWriter, r *http.Request) {
    // 做用户身份验证
    ...
    // 验证成功才升级连接
    ws, _ := upgrader.Upgrade(w, r, nil)
    // 生成一个全局唯一的clientid 正常业务下这个clientid应该由前端传入
    id := GlobalIdWorker.GetId()
    tower := gateway.BuildTower(ws, strconv.FormatInt(id, 10)) // 生成一个烽火台
    tower.Run()
}

目前支持的回调方法

  • ReadHandler 收到客户端发送的消息时触发
tower := gateway.BuildTower(ws, strconv.FormatInt(id, 10)) // 创建beacontower实例
tower.SetReadHandler(func(fire *gateway.FireInfo) bool { // 绑定ReadHandler回调方法
    // message.Data 为客户端传来的信息
    // message.Topic 为消息传递的topic
    // 用户可在此做发送验证
    // 判断发送方是否有权限向到达方发送内容
    // 通过 Publish 方法将内容推送到所有订阅 message.Topic 的连接
    tower.Publish(message)
    return true
})
  • ReadTimeoutHandler 客户端websocket请求超时处理(生产速度高于消费速度)
tower.SetReadTimeoutHandler(func(fire *gateway.FireInfo) {
    fmt.Println("read timeout:", fire.Message.Type, fire.Message.Topic, fire.Message.Data)
})
  • BeforeSubscribeHandler 客户端订阅某些topic时触发(这个时候topic还没有订阅,是before subscribe)
tower.SetBeforeSubscribeHandler(func(context *gateway.FireLife, topic []string) bool {
    // 这里用来判断当前用户是否允许订阅该topic
    return true
})
  • SubscribeHandler 客户端完成某些topic的订阅时触发(topic已经被topicService收录并管理)
tower.SetSubscribeHandler(func(context *gateway.FireLife, topic []string) bool {
    // 我们给出的聊天室示例是需要用到这个回调方法
    // 当某个聊天室(topic)有新的订阅者,则需要通知其他已经在聊天室内的成员当前在线人数+1
    for _, v := range topic {
        num := tower.GetConnectNum(v)
        // 继承订阅消息的context
        var pushmsg = gateway.NewFireInfo(tower, context)
        pushmsg.Message.Topic = v
        pushmsg.Message.Data = []byte(fmt.Sprintf("{\"type\":\"onSubscribe\",\"data\":%d}", num))
        tower.Publish(pushmsg)
    }
    return true
})
  • UnSubscribeHandler 客户端取消订阅某些topic完成时触发 (这个回调方法没有设置before方法,目前没有想到什么场景会使用到before unsubscribe,如果有请issue联系)
tower.SetUnSubscribeHandler(func(context *gateway.FireLife, topic []string) bool {
    for _, v := range topic {
        num := tower.GetConnectNum(v)
        // 继承订阅消息的context
        var pushmsg = gateway.NewFireInfo(tower, context)
        pushmsg.Message.Topic = v
        pushmsg.Message.Data = []byte(fmt.Sprintf("{\"type\":\"onUnsubscribe\",\"data\":%d}", num))
        tower.Publish(pushmsg)
    }
    return true
})

注意:当客户端断开websocket连接时firetower会将其在线时订阅的所有topic进行退订 会触发UnSubscirbeHandler

系统架构与无限扩展指南 (System Architecture & Scalability Guide)

Firetower 采用 Gateway (接入层) + TopicManager (逻辑控制层) 的分离架构设计。这种设计天生具备良好的扩展性。本指南将阐述如何从单机 Docker 部署演进到支撑百万级在线用户的分布式集群。

1. 核心组件

  • Gateway (Websocket Service):
    • 职责: 维护海量 WebSocket 长连接,处理协议封包/解包,执行消息广播。
    • 特性: 仅处理连接逻辑,几乎无状态(订阅关系同步给 TM),可无限水平扩展。针对慢连接实现了非阻塞广播和自动丢包保护。
  • TopicManager (Topic Service):
    • 职责: 管理 Topic -> Gateway 节点的映射关系,接收 Publish 请求并分发给持有该 Topic 订阅者的所有 Gateway。
    • 特性: 目前为单点状态节点 (Stateful),是扩展的瓶颈所在。

2. 演进路线图

阶段一:单机/小规模集群 (Current)

  • 适用场景: < 50,000 在线用户,业务量适中。
  • 部署:
    • 1个 TopicManager 实例。
    • N个 Gateway 实例 (N >= 2),通过 Nginx/SLB 做 4层或7层负载均衡。
    • Gateway 启动时通过配置指向唯一的 TopicManager IP。

阶段二:TopicManager 分片 (Sharding)

  • 适用场景: < 500,000 在线用户,Topic 数量巨大。
  • 改造方案:
    • 部署 M 个 TopicManager 节点。
    • Gateway 改造: 在连接 TM 时,不再连接单一节点,而是连接 TM 集群。
    • 路由算法: 采用一致性哈希 (Consistent Hashing) 或 Hash(Topic) % M 算法。
      • 当订阅 Topic_A 时,Gateway 计算 Hash 路由到 TM_Node_1 进行注册。
      • 当发布 Topic_A 时,Gateway 同样路由到 TM_Node_1 进行发布。
    • 效果: 将订阅关系管理的内存压力和匹配计算的 CPU 压力分散到集群中。

阶段三:无状态化与中间件集成 (Stateless & Middleware)

  • 适用场景: > 1,000,000 在线用户 (百万级并发)。
  • 核心痛点: 此时自研的 TopicManager 可能成为维护负担,且有状态服务的扩容迁移复杂。
  • 改造方案:
    • 移除 TopicManager:完全废弃自研的 TopicManager 服务。
    • 引入 Redis Pub/Sub (或 Kafka/NATS):使用成熟的消息中间件作为 Topic 路由中心。
    • Gateway 行为:
      • 用户订阅 Topic_A -> Gateway 直接向 Redis 订阅 Channel_A
      • 收到 Redis Channel_A 消息 -> Gateway 广播给本机所有订阅了 Topic_A 的 WebSocket 连接。
    • 优势: 彻底利用云厂商提供的 Redis 集群能力,Gateway 变为完全无状态的纯连接层,实现真正的无限水平扩展

阶段四:千万级超大规模 (Hierarchical Routing)

  • 适用场景: 各国头部 APP (如 WhatsApp, 微信)。
  • 改造方案:
    • Bucket 预分片: 引入 "Slot" 概念 (如 16384 个 Slot)。
    • 二级路由: 建立 Slot -> Gateway_IP_List 的全局映射表 (存储在 Etcd/ZooKeeper)。
    • 边缘计算: 消息先推送到 Slot 对应的“分发层”,再由分发层并行推送到具体的 Gateway 节点。

3. 稳定性保障 (已实装)

为支撑上述扩展,本项目在代码层面已实装以下企业级特性:

  • Panic Recovery: 关键协程全覆盖,单点故障不扩散。
  • Non-blocking Send: 防止慢消费者(弱网用户)拖死服务子系统。
  • Exponential Backoff: 指数退避重连,防止服务重启时的流量雪崩。
  • Zero-Copy Logic: 协议层优化,支撑高吞吐。

TODO

  • 运行时web看板
  • 提供推送相关http及grpc接口

License

MIT

Directories

Path Synopsis
topic_service command
grpc
service

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL