Documentation
¶
Index ¶
- Constants
- Variables
- func NewConnPool[T io.Closer](factory func() (T, error), residence int, opts ...ConnPoolOption[T]) *connPool[T]
- type ArrayStack
- type BroadcastBus
- func (u *BroadcastBus[T]) AddTopic(topic string, bufSize int) error
- func (u *BroadcastBus[T]) Close() error
- func (u *BroadcastBus[T]) Publish(topic string, msg T) error
- func (u *BroadcastBus[T]) RemoveTopic(topic string) error
- func (u *BroadcastBus[T]) Subscribe(topic string) (*Subscription[T], error)
- func (u *BroadcastBus[T]) Unsubscribe(topic string, sub *Subscription[T]) error
- type Cache
- type ConnPoolOption
- type FastHub
- type HashSet
- func (s HashSet[T]) Append(items ...T)
- func (s HashSet[T]) Contains(value T) (exist bool)
- func (s HashSet[T]) Difference(s1 Set[T]) Set[T]
- func (s HashSet[T]) Intersection(s1 Set[T]) Set[T]
- func (s HashSet[T]) Len() int
- func (s HashSet[T]) Remove(values ...T) bool
- func (s HashSet[T]) String() string
- func (s HashSet[T]) SymmetricDifference(s1 Set[T]) Set[T]
- func (s HashSet[T]) Union(s1 Set[T]) Set[T]
- func (s HashSet[T]) Values() (values []T)
- type Hub
- type LinkStack
- type Map
- type MutexMap
- func (m *MutexMap[K, V]) CompareAndDelete(key K, old V) bool
- func (m *MutexMap[K, V]) CompareAndSwap(key K, old, new V) bool
- func (m *MutexMap[K, V]) CompareFnAndDelete(key K, fn func(V, V) bool, old V) bool
- func (m *MutexMap[K, V]) CompareFnAndSwap(key K, fn func(V, V) bool, old, new V) bool
- func (m *MutexMap[K, V]) Delete(key K) bool
- func (m *MutexMap[K, V]) Filter(filter func(K, V) bool) Map[K, V]
- func (m *MutexMap[K, V]) Keys() []K
- func (m *MutexMap[K, V]) Len() int
- func (m *MutexMap[K, V]) Load(key K) (value V, ok bool)
- func (m *MutexMap[K, V]) LoadAndDelete(key K) (value V, loaded bool)
- func (m *MutexMap[K, V]) LoadOrStore(key K, new V) (actual V, loaded bool)
- func (m *MutexMap[K, V]) Range(iterator func(key K, value V) bool)
- func (m *MutexMap[K, V]) Store(key K, value V)
- func (m *MutexMap[K, V]) Swap(key K, value V) (pre V, loaded bool)
- func (m *MutexMap[K, V]) Values() []V
- type OrderHub
- type Pool
- type RingBuffer
- type Set
- type Stack
- type Subscription
- type SyncMap
- func (m *SyncMap[K, V]) CompareAndDelete(key K, old V) bool
- func (m *SyncMap[K, V]) CompareAndSwap(key K, old, new V) bool
- func (m *SyncMap[K, V]) CompareFnAndDelete(key K, fn func(V, V) bool, old V) bool
- func (m *SyncMap[K, V]) CompareFnAndSwap(key K, fn func(V, V) bool, old, new V) bool
- func (m *SyncMap[K, V]) Delete(key K) bool
- func (m *SyncMap[K, V]) Filter(filter func(K, V) bool) Map[K, V]
- func (m *SyncMap[K, V]) Keys() []K
- func (m *SyncMap[K, V]) Load(key K) (value V, ok bool)
- func (m *SyncMap[K, V]) LoadAndDelete(key K) (value V, loaded bool)
- func (m *SyncMap[K, V]) LoadOrStore(key K, new V) (actual V, loaded bool)
- func (m *SyncMap[K, V]) Range(iterator func(key K, value V) bool)
- func (m *SyncMap[K, V]) Store(key K, value V)
- func (m *SyncMap[K, V]) Swap(key K, value V) (pre V, loaded bool)
- func (m *SyncMap[K, V]) Values() []V
- type SyncSet
- func (s *SyncSet[T]) Append(values ...T)
- func (s *SyncSet[T]) Contains(v T) bool
- func (s *SyncSet[T]) Difference(s1 Set[T]) Set[T]
- func (s *SyncSet[T]) Intersection(s1 Set[T]) Set[T]
- func (s *SyncSet[T]) Len() int
- func (s *SyncSet[T]) Remove(values ...T) bool
- func (s *SyncSet[T]) String() string
- func (s *SyncSet[T]) SymmetricDifference(s1 Set[T]) Set[T]
- func (s *SyncSet[T]) Union(s1 Set[T]) Set[T]
- func (s *SyncSet[T]) Values() []T
- type WorkQueue
Constants ¶
const DefaultRingSize = 1024
Variables ¶
var ( // ErrTopicNotFound indicates the given topic does not exist. ErrTopicNotFound = errors.New("topic not found") // ErrTopicAlreadyExists indicates the topic already exists. ErrTopicAlreadyExists = errors.New("topic already exists") // ErrTopicQueueFull indicates the topic queue is full ErrTopicQueueFull = errors.New("topic queue is full") )
var ErrConnPoolClosed = fmt.Errorf("conn pool is closed")
var (
ErrHubClosed = errors.New("hub is closed")
)
ErrHubClosed is returned when operations are attempted on a closed Hub.
Functions ¶
Types ¶
type ArrayStack ¶
type ArrayStack[T any] struct { // contains filtered or unexported fields }
func NewArrayStack ¶
func NewArrayStack[T any]() *ArrayStack[T]
func (*ArrayStack[T]) Empty ¶
func (m *ArrayStack[T]) Empty() bool
func (*ArrayStack[T]) Peek ¶
func (m *ArrayStack[T]) Peek() (value T, exist bool)
func (*ArrayStack[T]) Pop ¶
func (m *ArrayStack[T]) Pop() (value T, exist bool)
func (*ArrayStack[T]) Push ¶
func (m *ArrayStack[T]) Push(value T)
func (*ArrayStack[T]) Size ¶
func (m *ArrayStack[T]) Size() int
func (*ArrayStack[T]) String ¶
func (m *ArrayStack[T]) String() string
type BroadcastBus ¶ added in v0.0.31
type BroadcastBus[T any] struct { // contains filtered or unexported fields }
BroadcastBus is a multi-topic, multi-subscriber pub-sub bus.
Each topic is a FastHub (fan-out): - When you publish a message to a topic, all subscribers receive a copy. - If a topic has no subscribers, messages are discarded.
Typical use case: event broadcast, notifications, state updates.
func NewBroadcastBus ¶ added in v0.0.31
func NewBroadcastBus[T any]() *BroadcastBus[T]
NewBroadcastBus creates a new BroadcastBus with the given buffer size for each subscriber channel.
func (*BroadcastBus[T]) AddTopic ¶ added in v0.0.32
func (u *BroadcastBus[T]) AddTopic(topic string, bufSize int) error
AddTopic creates a new topic with its own buffered channel. Returns an error if the topic already exists.
func (*BroadcastBus[T]) Close ¶ added in v0.0.31
func (u *BroadcastBus[T]) Close() error
Close closes all topics and marks the bus as closed. Any further operations on the bus will return an error.
func (*BroadcastBus[T]) Publish ¶ added in v0.0.31
func (u *BroadcastBus[T]) Publish(topic string, msg T) error
Publish broadcasts a message to all subscribers of the given topic.
If the topic has no subscribers, returns ErrTopicNotFound. If the bus is closed, returns an error.
func (*BroadcastBus[T]) RemoveTopic ¶ added in v0.0.32
func (u *BroadcastBus[T]) RemoveTopic(topic string) error
RemoveTopic closes and removes the topic's channel. Messages in the buffer are dropped.
func (*BroadcastBus[T]) Subscribe ¶ added in v0.0.31
func (u *BroadcastBus[T]) Subscribe(topic string) (*Subscription[T], error)
Subscribe subscribes to the given topic, creating the topic if needed.
Returns a new Subscription which has a buffered channel for receiving messages.
If the bus is closed, returns an error.
func (*BroadcastBus[T]) Unsubscribe ¶ added in v0.0.31
func (u *BroadcastBus[T]) Unsubscribe(topic string, sub *Subscription[T]) error
Unsubscribe removes a subscriber from the given topic. If no subscribers remain for that topic, the topic is closed and removed.
Returns an error if the bus is closed or the topic does not exist.
type Cache ¶
type Cache[T any] struct { // contains filtered or unexported fields }
type ConnPoolOption ¶ added in v0.0.35
type FastHub ¶ added in v0.0.31
type FastHub[T any] struct { // contains filtered or unexported fields }
FastHub is a simple in-memory publish-subscribe hub. It broadcasts each published message to all active subscribers. FastHub does not guarantee message ordering or reliable delivery. If a subscriber's channel buffer is full, the message is dropped.
func NewFastHub ¶ added in v0.0.31
NewFastHub creates a new FastBus with the given buffer size for each subscriber.
func (*FastHub[T]) Close ¶ added in v0.0.31
Close closes the bus and all active subscriber channels. After closing, Publish and Subscribe will return ErrHubClosed.
func (*FastHub) Increment ¶ added in v0.0.31
func (g *FastHub) Increment() uint64
Increment atomically increments and returns a new unique ID.
func (*FastHub[T]) Publish ¶ added in v0.0.31
Publish broadcasts the given value to all active subscribers. If a subscriber's channel is full, the message is dropped. If the bus is closed, it returns ErrHubClosed.
func (*FastHub) SetPublishCallback ¶ added in v0.0.39
func (h *FastHub) SetPublishCallback(f func(id uint64, v T))
func (*FastHub[T]) Subscribe ¶ added in v0.0.31
func (b *FastHub[T]) Subscribe() (*Subscription[T], error)
Subscribe registers a new subscriber and returns its Subscription. If the bus is closed, it returns ErrHubClosed.
func (*FastHub[T]) Unsubscribe ¶ added in v0.0.31
func (b *FastHub[T]) Unsubscribe(s *Subscription[T])
Unsubscribe removes a subscriber from the bus and closes its channel.
type HashSet ¶
type HashSet[T comparable] map[T]struct{}
func NewHashSet ¶
func NewHashSet[T comparable](items ...T) HashSet[T]
Create a new Set with element type T
func (HashSet[T]) Difference ¶
Difference 返回两个集合的对称差集
func (HashSet[T]) Intersection ¶
Intersection 返回两个集合的交集
func (HashSet[T]) SymmetricDifference ¶
Difference 返回两个集合的差集
type Hub ¶ added in v0.0.31
type Hub[T any] interface { // Subscribe registers a new subscriber and returns its Subscription. // If the Hub is closed, it returns ErrHubClosed. Subscribe() (*Subscription[T], error) // Publish sends the given value to all active subscribers. // If the Hub is closed, it returns ErrHubClosed. Publish(v T) error // Unsubscribe removes the given Subscription from the Hub. // After unsubscription, the subscriber will no longer receive messages. Unsubscribe(*Subscription[T]) // SetPublishCallback sets a custom publish callback function. // callback is called asynchronously when a message cannot be delivered to a subscriber due to a full buffer. // The callback must be non-blocking and must NOT call back into Hub. SetPublishCallback(callback func(id uint64, v T)) // Close closes the Hub and all active subscriptions. // After closing, Subscribe and Publish will return ErrHubClosed. Close() error }
Hub defines a simple publish-subscribe broadcast center. All subscribers receive every published message. It does not guarantee message ordering or delivery reliability.
type LinkStack ¶
type LinkStack[T any] struct { // contains filtered or unexported fields }
func NewLinkStack ¶
type Map ¶
type Map[K comparable, V any] interface { // 存储 Store(K, V) // 读取 Load(K) (V, bool) // 删除 Delete(K) bool // 交换 Swap(key K, newValue V) (old V, loaded bool) // 遍历,当iterator返回false时,遍历终止 Range(iterator func(K, V) bool) // 读取或将值存储 LoadOrStore(key K, newValue V) (value V, loaded bool) // 读取并删除 LoadAndDelete(K) (value V, loaded bool) // 对比并交换 // 对比当前map中key对应的value是否等于old,如果相等则将key对应的value替换为newValue CompareAndSwap(key K, old, newValue V) bool // 对比并删除 // 对比当前map中key对应的value是否等于value,如果相等则删除 CompareAndDelete(key K, value V) bool // 对比并交换(自定义比较函数) CompareFnAndSwap(key K, fn func(V, V) bool, old, newValue V) bool // 对比并删除(自定义比较函数) CompareFnAndDelete(key K, fn func(V, V) bool, old V) bool // 键 Keys() []K // 值 Values() []V // Filter Filter(filter func(K, V) bool) Map[K, V] }
type MutexMap ¶
type MutexMap[K comparable, V any] struct { // contains filtered or unexported fields }
func NewMutexMap ¶
func NewMutexMap[K comparable, V any]() *MutexMap[K, V]
func (*MutexMap[K, V]) CompareAndDelete ¶
func (*MutexMap[K, V]) CompareAndSwap ¶
func (*MutexMap[K, V]) CompareFnAndDelete ¶
func (*MutexMap[K, V]) CompareFnAndSwap ¶
func (*MutexMap[K, V]) LoadAndDelete ¶
func (*MutexMap[K, V]) LoadOrStore ¶
type OrderHub ¶ added in v0.0.31
type OrderHub[T any] struct { // contains filtered or unexported fields }
OrderHub is a simple in-memory publish-subscribe hub that delivers messages to subscribers in the order they subscribed. Unlike FastBus, OrderHub maintains a slice to preserve subscription order.
func NewOrderHub ¶ added in v0.0.31
NewOrderHub creates a new OrderBus with the given buffer size for each subscriber.
func (*OrderHub[T]) Close ¶ added in v0.0.31
Close closes the bus and all subscriber channels. After closing, Subscribe and Publish will return ErrHubClosed.
func (*OrderHub) Increment ¶ added in v0.0.39
func (g *OrderHub) Increment() uint64
Increment atomically increments and returns a new unique ID.
func (*OrderHub[T]) Publish ¶ added in v0.0.31
Publish sends the given value to all subscribers in subscription order. If a subscriber's channel is full, the message is dropped for that subscriber. If the bus is closed, it returns ErrHubClosed.
func (*OrderHub) SetPublishCallback ¶ added in v0.0.39
func (h *OrderHub) SetPublishCallback(f func(id uint64, v T))
func (*OrderHub[T]) Subscribe ¶ added in v0.0.31
func (b *OrderHub[T]) Subscribe() (*Subscription[T], error)
Subscribe registers a new subscriber and returns its Subscription. Messages will be delivered in the order subscribers were added. If the bus is closed, it returns ErrHubClosed.
func (*OrderHub[T]) Unsubscribe ¶ added in v0.0.31
func (b *OrderHub[T]) Unsubscribe(s *Subscription[T])
Unsubscribe removes a subscriber from the bus and closes its channel. After removal, the subscriber will no longer receive messages.
type Pool ¶
type Pool[K, V any] struct { New poolNewFunc[K, V] Identifier poolIDFunc[K] Destroy poolDestroyFunc[V] // contains filtered or unexported fields }
func (*Pool[K, V]) GetWithCtx ¶
type RingBuffer ¶ added in v0.0.31
type RingBuffer[T any] struct { // contains filtered or unexported fields }
RingBuffer Non-concurrency-safe Ring
func NewRingBuffer ¶ added in v0.0.31
func NewRingBuffer[T any]() *RingBuffer[T]
func NewRingBufferWithSize ¶ added in v0.0.31
func NewRingBufferWithSize[T any](size int) *RingBuffer[T]
func (*RingBuffer[T]) Cap ¶ added in v0.0.31
func (r *RingBuffer[T]) Cap() int
func (*RingBuffer[T]) Iterator ¶ added in v0.0.31
func (r *RingBuffer[T]) Iterator() iter.Seq[T]
func (*RingBuffer[T]) Len ¶ added in v0.0.31
func (r *RingBuffer[T]) Len() int
func (*RingBuffer[T]) Push ¶ added in v0.0.31
func (r *RingBuffer[T]) Push(values ...T)
func (*RingBuffer[T]) Values ¶ added in v0.0.31
func (r *RingBuffer[T]) Values() (result []T)
type Set ¶
type Set[T comparable] interface { fmt.Stringer // 集合长度 Len() int // 添加元素 Append(...T) // 移除元素 Remove(...T) bool // 判断元素是否存在 Contains(T) bool // 集合元素的切片 Values() []T // 并集 Union(Set[T]) Set[T] // 交集 Intersection(Set[T]) Set[T] // 差集 Difference(Set[T]) Set[T] // 对称差集 SymmetricDifference(Set[T]) Set[T] }
type Subscription ¶ added in v0.0.31
type Subscription[T any] struct { // ID is the unique identifier for this subscriber. ID uint64 // contains filtered or unexported fields }
Subscription represents a single subscriber to a Hub. It holds the unique subscription ID and the receive-only channel that delivers published messages.
func (*Subscription[T]) Channel ¶ added in v0.0.39
func (s *Subscription[T]) Channel() <-chan T
type SyncMap ¶
type SyncMap[K comparable, V any] struct { // contains filtered or unexported fields }
SyncMap Generic wrapper for sync.Map
func NewSyncMap ¶
func NewSyncMap[K comparable, V any]() *SyncMap[K, V]
func (*SyncMap[K, V]) CompareAndDelete ¶
func (*SyncMap[K, V]) CompareAndSwap ¶
func (*SyncMap[K, V]) CompareFnAndDelete ¶
func (*SyncMap[K, V]) CompareFnAndSwap ¶
func (*SyncMap[K, V]) LoadAndDelete ¶
func (*SyncMap[K, V]) LoadOrStore ¶
type SyncSet ¶
type SyncSet[T comparable] struct { // contains filtered or unexported fields }
SyncSet is a thread-safe set implementation for comparable types.
func NewSyncSet ¶
func NewSyncSet[T comparable](items ...T) *SyncSet[T]
NewSyncSet creates a new SyncSet and optionally adds initial items.
func (*SyncSet[T]) Append ¶
func (s *SyncSet[T]) Append(values ...T)
Append adds one or more elements to the set. Duplicates are ignored.
func (*SyncSet[T]) Difference ¶
Difference returns a new set containing elements in the current set but not in the other.
func (*SyncSet[T]) Intersection ¶
Intersection returns a new set containing elements present in both sets.
func (*SyncSet[T]) Remove ¶
Remove deletes one or more elements from the set. Non-existent elements are ignored.
func (*SyncSet[T]) SymmetricDifference ¶
SymmetricDifference returns a new set containing elements present in either of the sets but not both.
type WorkQueue ¶ added in v0.0.31
type WorkQueue[T any] struct { // contains filtered or unexported fields }
WorkQueue is a simple pub-sub style queue where each topic has exactly one subscriber (worker). A published message goes to exactly one consumer.
Typical use case: task queues, job workers.
T is the type of the message.
func NewWorkQueue ¶ added in v0.0.31
NewWorkQueue creates a new WorkQueue with a given channel buffer size.
func (*WorkQueue[T]) AddTopic ¶ added in v0.0.31
AddTopic creates a new topic with its own buffered channel. Returns an error if the topic already exists.
func (*WorkQueue[T]) Close ¶ added in v0.0.31
func (u *WorkQueue[T]) Close()
Close closes all topic channels. Topics are not removed from the map.
func (*WorkQueue[T]) Publish ¶ added in v0.0.31
Publish sends a message to the topic's channel. If the channel buffer is full, the message is dropped silently. Returns an error if the topic does not exist.
func (*WorkQueue[T]) RemoveTopic ¶ added in v0.0.31
RemoveTopic closes and removes the topic's channel. Messages in the buffer are dropped.