ds

package
v0.0.43 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultRingSize = 1024

Variables

View Source
var (
	// ErrTopicNotFound indicates the given topic does not exist.
	ErrTopicNotFound = errors.New("topic not found")

	// ErrTopicAlreadyExists indicates the topic already exists.
	ErrTopicAlreadyExists = errors.New("topic already exists")

	// ErrTopicQueueFull indicates the topic queue is full
	ErrTopicQueueFull = errors.New("topic queue is full")
)
View Source
var ErrConnPoolClosed = fmt.Errorf("conn pool is closed")
View Source
var (
	ErrHubClosed = errors.New("hub is closed")
)

ErrHubClosed is returned when operations are attempted on a closed Hub.

Functions

func NewConnPool added in v0.0.35

func NewConnPool[T io.Closer](
	factory func() (T, error),
	residence int,
	opts ...ConnPoolOption[T],
) *connPool[T]

NewConnPool returns a new connection pool.

Types

type ArrayStack

type ArrayStack[T any] struct {
	// contains filtered or unexported fields
}

func NewArrayStack

func NewArrayStack[T any]() *ArrayStack[T]

func (*ArrayStack[T]) Empty

func (m *ArrayStack[T]) Empty() bool

func (*ArrayStack[T]) Peek

func (m *ArrayStack[T]) Peek() (value T, exist bool)

func (*ArrayStack[T]) Pop

func (m *ArrayStack[T]) Pop() (value T, exist bool)

func (*ArrayStack[T]) Push

func (m *ArrayStack[T]) Push(value T)

func (*ArrayStack[T]) Size

func (m *ArrayStack[T]) Size() int

func (*ArrayStack[T]) String

func (m *ArrayStack[T]) String() string

type BroadcastBus added in v0.0.31

type BroadcastBus[T any] struct {
	// contains filtered or unexported fields
}

BroadcastBus is a multi-topic, multi-subscriber pub-sub bus.

Each topic is a FastHub (fan-out): - When you publish a message to a topic, all subscribers receive a copy. - If a topic has no subscribers, messages are discarded.

Typical use case: event broadcast, notifications, state updates.

func NewBroadcastBus added in v0.0.31

func NewBroadcastBus[T any]() *BroadcastBus[T]

NewBroadcastBus creates a new BroadcastBus with the given buffer size for each subscriber channel.

func (*BroadcastBus[T]) AddTopic added in v0.0.32

func (u *BroadcastBus[T]) AddTopic(topic string, bufSize int) error

AddTopic creates a new topic with its own buffered channel. Returns an error if the topic already exists.

func (*BroadcastBus[T]) Close added in v0.0.31

func (u *BroadcastBus[T]) Close() error

Close closes all topics and marks the bus as closed. Any further operations on the bus will return an error.

func (*BroadcastBus[T]) Publish added in v0.0.31

func (u *BroadcastBus[T]) Publish(topic string, msg T) error

Publish broadcasts a message to all subscribers of the given topic.

If the topic has no subscribers, returns ErrTopicNotFound. If the bus is closed, returns an error.

func (*BroadcastBus[T]) RemoveTopic added in v0.0.32

func (u *BroadcastBus[T]) RemoveTopic(topic string) error

RemoveTopic closes and removes the topic's channel. Messages in the buffer are dropped.

func (*BroadcastBus[T]) Subscribe added in v0.0.31

func (u *BroadcastBus[T]) Subscribe(topic string) (*Subscription[T], error)

Subscribe subscribes to the given topic, creating the topic if needed.

Returns a new Subscription which has a buffered channel for receiving messages.

If the bus is closed, returns an error.

func (*BroadcastBus[T]) Unsubscribe added in v0.0.31

func (u *BroadcastBus[T]) Unsubscribe(topic string, sub *Subscription[T]) error

Unsubscribe removes a subscriber from the given topic. If no subscribers remain for that topic, the topic is closed and removed.

Returns an error if the bus is closed or the topic does not exist.

type Cache

type Cache[T any] struct {
	// contains filtered or unexported fields
}

func NewCache

func NewCache[T any](expire, cleanup time.Duration) *Cache[T]

NewCache 新建缓存 expire 0 表示不过期 cleanup 0 表示不定时清理

func (*Cache[T]) Delete

func (c *Cache[T]) Delete(key string)

func (*Cache[T]) Get

func (c *Cache[T]) Get(key string) (value T, loaded bool)

func (*Cache[T]) Release added in v0.0.32

func (c *Cache[T]) Release() error

Release releases the goroutine for scheduled cleaning

func (*Cache[T]) Set

func (c *Cache[T]) Set(key string, value T)

func (*Cache[T]) SetWithExpire

func (c *Cache[T]) SetWithExpire(key string, value T, expire time.Duration)

type ConnPoolOption added in v0.0.35

type ConnPoolOption[T io.Closer] func(*connPool[T])

func WithCheck added in v0.0.35

func WithCheck[T io.Closer](check func(T) bool) ConnPoolOption[T]

type FastHub added in v0.0.31

type FastHub[T any] struct {
	// contains filtered or unexported fields
}

FastHub is a simple in-memory publish-subscribe hub. It broadcasts each published message to all active subscribers. FastHub does not guarantee message ordering or reliable delivery. If a subscriber's channel buffer is full, the message is dropped.

func NewFastHub added in v0.0.31

func NewFastHub[T any](bufSize int) *FastHub[T]

NewFastHub creates a new FastBus with the given buffer size for each subscriber.

func (*FastHub[T]) Close added in v0.0.31

func (b *FastHub[T]) Close() error

Close closes the bus and all active subscriber channels. After closing, Publish and Subscribe will return ErrHubClosed.

func (*FastHub) Increment added in v0.0.31

func (g *FastHub) Increment() uint64

Increment atomically increments and returns a new unique ID.

func (*FastHub[T]) Publish added in v0.0.31

func (b *FastHub[T]) Publish(v T) error

Publish broadcasts the given value to all active subscribers. If a subscriber's channel is full, the message is dropped. If the bus is closed, it returns ErrHubClosed.

func (*FastHub) SetPublishCallback added in v0.0.39

func (h *FastHub) SetPublishCallback(f func(id uint64, v T))

func (*FastHub[T]) Subscribe added in v0.0.31

func (b *FastHub[T]) Subscribe() (*Subscription[T], error)

Subscribe registers a new subscriber and returns its Subscription. If the bus is closed, it returns ErrHubClosed.

func (*FastHub[T]) Unsubscribe added in v0.0.31

func (b *FastHub[T]) Unsubscribe(s *Subscription[T])

Unsubscribe removes a subscriber from the bus and closes its channel.

type HashSet

type HashSet[T comparable] map[T]struct{}

func NewHashSet

func NewHashSet[T comparable](items ...T) HashSet[T]

Create a new Set with element type T

func (HashSet[T]) Append

func (s HashSet[T]) Append(items ...T)

func (HashSet[T]) Contains

func (s HashSet[T]) Contains(value T) (exist bool)

func (HashSet[T]) Difference

func (s HashSet[T]) Difference(s1 Set[T]) Set[T]

Difference 返回两个集合的对称差集

func (HashSet[T]) Intersection

func (s HashSet[T]) Intersection(s1 Set[T]) Set[T]

Intersection 返回两个集合的交集

func (HashSet[T]) Len

func (s HashSet[T]) Len() int

func (HashSet[T]) Remove

func (s HashSet[T]) Remove(values ...T) bool

func (HashSet[T]) String

func (s HashSet[T]) String() string

func (HashSet[T]) SymmetricDifference

func (s HashSet[T]) SymmetricDifference(s1 Set[T]) Set[T]

Difference 返回两个集合的差集

func (HashSet[T]) Union

func (s HashSet[T]) Union(s1 Set[T]) Set[T]

Union 返回两个集合的并集

func (HashSet[T]) Values

func (s HashSet[T]) Values() (values []T)

type Hub added in v0.0.31

type Hub[T any] interface {
	// Subscribe registers a new subscriber and returns its Subscription.
	// If the Hub is closed, it returns ErrHubClosed.
	Subscribe() (*Subscription[T], error)

	// Publish sends the given value to all active subscribers.
	// If the Hub is closed, it returns ErrHubClosed.
	Publish(v T) error

	// Unsubscribe removes the given Subscription from the Hub.
	// After unsubscription, the subscriber will no longer receive messages.
	Unsubscribe(*Subscription[T])

	// SetPublishCallback sets a custom publish callback function.
	// callback is called asynchronously when a message cannot be delivered to a subscriber due to a full buffer.
	// The callback must be non-blocking and must NOT call back into Hub.
	SetPublishCallback(callback func(id uint64, v T))

	// Close closes the Hub and all active subscriptions.
	// After closing, Subscribe and Publish will return ErrHubClosed.
	Close() error
}

Hub defines a simple publish-subscribe broadcast center. All subscribers receive every published message. It does not guarantee message ordering or delivery reliability.

type LinkStack

type LinkStack[T any] struct {
	// contains filtered or unexported fields
}

func NewLinkStack

func NewLinkStack[T any]() *LinkStack[T]

func (*LinkStack[T]) Empty

func (m *LinkStack[T]) Empty() bool

func (*LinkStack[T]) Peek

func (m *LinkStack[T]) Peek() (value T, exist bool)

func (*LinkStack[T]) Pop

func (m *LinkStack[T]) Pop() (value T, exist bool)

func (*LinkStack[T]) Push

func (m *LinkStack[T]) Push(value T)

func (*LinkStack[T]) Size

func (m *LinkStack[T]) Size() int

type Map

type Map[K comparable, V any] interface {
	// 存储
	Store(K, V)
	// 读取
	Load(K) (V, bool)
	// 删除
	Delete(K) bool
	// 交换
	Swap(key K, newValue V) (old V, loaded bool)
	// 遍历,当iterator返回false时,遍历终止
	Range(iterator func(K, V) bool)
	// 读取或将值存储
	LoadOrStore(key K, newValue V) (value V, loaded bool)
	// 读取并删除
	LoadAndDelete(K) (value V, loaded bool)
	// 对比并交换
	// 对比当前map中key对应的value是否等于old,如果相等则将key对应的value替换为newValue
	CompareAndSwap(key K, old, newValue V) bool
	// 对比并删除
	// 对比当前map中key对应的value是否等于value,如果相等则删除
	CompareAndDelete(key K, value V) bool
	// 对比并交换(自定义比较函数)
	CompareFnAndSwap(key K, fn func(V, V) bool, old, newValue V) bool
	// 对比并删除(自定义比较函数)
	CompareFnAndDelete(key K, fn func(V, V) bool, old V) bool
	// 键
	Keys() []K
	// 值
	Values() []V
	// Filter
	Filter(filter func(K, V) bool) Map[K, V]
}

type MutexMap

type MutexMap[K comparable, V any] struct {
	// contains filtered or unexported fields
}

func NewMutexMap

func NewMutexMap[K comparable, V any]() *MutexMap[K, V]

func (*MutexMap[K, V]) CompareAndDelete

func (m *MutexMap[K, V]) CompareAndDelete(key K, old V) bool

func (*MutexMap[K, V]) CompareAndSwap

func (m *MutexMap[K, V]) CompareAndSwap(key K, old, new V) bool

func (*MutexMap[K, V]) CompareFnAndDelete

func (m *MutexMap[K, V]) CompareFnAndDelete(key K, fn func(V, V) bool, old V) bool

func (*MutexMap[K, V]) CompareFnAndSwap

func (m *MutexMap[K, V]) CompareFnAndSwap(key K, fn func(V, V) bool, old, new V) bool

func (*MutexMap[K, V]) Delete

func (m *MutexMap[K, V]) Delete(key K) bool

func (*MutexMap[K, V]) Filter added in v0.0.32

func (m *MutexMap[K, V]) Filter(filter func(K, V) bool) Map[K, V]

func (*MutexMap[K, V]) Keys added in v0.0.32

func (m *MutexMap[K, V]) Keys() []K

func (*MutexMap[K, V]) Len added in v0.0.35

func (m *MutexMap[K, V]) Len() int

func (*MutexMap[K, V]) Load

func (m *MutexMap[K, V]) Load(key K) (value V, ok bool)

func (*MutexMap[K, V]) LoadAndDelete

func (m *MutexMap[K, V]) LoadAndDelete(key K) (value V, loaded bool)

func (*MutexMap[K, V]) LoadOrStore

func (m *MutexMap[K, V]) LoadOrStore(key K, new V) (actual V, loaded bool)

func (*MutexMap[K, V]) Range

func (m *MutexMap[K, V]) Range(iterator func(key K, value V) bool)

func (*MutexMap[K, V]) Store

func (m *MutexMap[K, V]) Store(key K, value V)

func (*MutexMap[K, V]) Swap

func (m *MutexMap[K, V]) Swap(key K, value V) (pre V, loaded bool)

func (*MutexMap[K, V]) Values added in v0.0.32

func (m *MutexMap[K, V]) Values() []V

type OrderHub added in v0.0.31

type OrderHub[T any] struct {
	// contains filtered or unexported fields
}

OrderHub is a simple in-memory publish-subscribe hub that delivers messages to subscribers in the order they subscribed. Unlike FastBus, OrderHub maintains a slice to preserve subscription order.

func NewOrderHub added in v0.0.31

func NewOrderHub[T any](bufSize int) *OrderHub[T]

NewOrderHub creates a new OrderBus with the given buffer size for each subscriber.

func (*OrderHub[T]) Close added in v0.0.31

func (b *OrderHub[T]) Close() error

Close closes the bus and all subscriber channels. After closing, Subscribe and Publish will return ErrHubClosed.

func (*OrderHub) Increment added in v0.0.39

func (g *OrderHub) Increment() uint64

Increment atomically increments and returns a new unique ID.

func (*OrderHub[T]) Publish added in v0.0.31

func (b *OrderHub[T]) Publish(v T) error

Publish sends the given value to all subscribers in subscription order. If a subscriber's channel is full, the message is dropped for that subscriber. If the bus is closed, it returns ErrHubClosed.

func (*OrderHub) SetPublishCallback added in v0.0.39

func (h *OrderHub) SetPublishCallback(f func(id uint64, v T))

func (*OrderHub[T]) Subscribe added in v0.0.31

func (b *OrderHub[T]) Subscribe() (*Subscription[T], error)

Subscribe registers a new subscriber and returns its Subscription. Messages will be delivered in the order subscribers were added. If the bus is closed, it returns ErrHubClosed.

func (*OrderHub[T]) Unsubscribe added in v0.0.31

func (b *OrderHub[T]) Unsubscribe(s *Subscription[T])

Unsubscribe removes a subscriber from the bus and closes its channel. After removal, the subscriber will no longer receive messages.

type Pool

type Pool[K, V any] struct {
	New        poolNewFunc[K, V]
	Identifier poolIDFunc[K]
	Destroy    poolDestroyFunc[V]
	// contains filtered or unexported fields
}

func (*Pool[K, V]) Get

func (p *Pool[K, V]) Get(ctx context.Context, key K) (value V, err error)

func (*Pool[K, V]) GetWithCtx

func (p *Pool[K, V]) GetWithCtx(ctx context.Context, key K) (value V, err error)

func (*Pool[K, V]) Put

func (p *Pool[K, V]) Put(key K) (err error)

func (*Pool[K, V]) PutWithCtx

func (p *Pool[K, V]) PutWithCtx(ctx context.Context, key K) (err error)

type RingBuffer added in v0.0.31

type RingBuffer[T any] struct {
	// contains filtered or unexported fields
}

RingBuffer Non-concurrency-safe Ring

func NewRingBuffer added in v0.0.31

func NewRingBuffer[T any]() *RingBuffer[T]

func NewRingBufferWithSize added in v0.0.31

func NewRingBufferWithSize[T any](size int) *RingBuffer[T]

func (*RingBuffer[T]) Cap added in v0.0.31

func (r *RingBuffer[T]) Cap() int

func (*RingBuffer[T]) Iterator added in v0.0.31

func (r *RingBuffer[T]) Iterator() iter.Seq[T]

func (*RingBuffer[T]) Len added in v0.0.31

func (r *RingBuffer[T]) Len() int

func (*RingBuffer[T]) Push added in v0.0.31

func (r *RingBuffer[T]) Push(values ...T)

func (*RingBuffer[T]) Values added in v0.0.31

func (r *RingBuffer[T]) Values() (result []T)

type Set

type Set[T comparable] interface {
	fmt.Stringer
	// 集合长度
	Len() int
	// 添加元素
	Append(...T)
	// 移除元素
	Remove(...T) bool
	// 判断元素是否存在
	Contains(T) bool
	// 集合元素的切片
	Values() []T
	// 并集
	Union(Set[T]) Set[T]
	// 交集
	Intersection(Set[T]) Set[T]
	// 差集
	Difference(Set[T]) Set[T]
	// 对称差集
	SymmetricDifference(Set[T]) Set[T]
}

type Stack

type Stack[T any] interface {
	Push(T)
	Pop() (T, bool)
	Peek() (T, bool)
	Empty() bool
	Size() int
}

type Subscription added in v0.0.31

type Subscription[T any] struct {
	// ID is the unique identifier for this subscriber.
	ID uint64
	// contains filtered or unexported fields
}

Subscription represents a single subscriber to a Hub. It holds the unique subscription ID and the receive-only channel that delivers published messages.

func (*Subscription[T]) Channel added in v0.0.39

func (s *Subscription[T]) Channel() <-chan T

type SyncMap

type SyncMap[K comparable, V any] struct {
	// contains filtered or unexported fields
}

SyncMap Generic wrapper for sync.Map

func NewSyncMap

func NewSyncMap[K comparable, V any]() *SyncMap[K, V]

func (*SyncMap[K, V]) CompareAndDelete

func (m *SyncMap[K, V]) CompareAndDelete(key K, old V) bool

func (*SyncMap[K, V]) CompareAndSwap

func (m *SyncMap[K, V]) CompareAndSwap(key K, old, new V) bool

func (*SyncMap[K, V]) CompareFnAndDelete

func (m *SyncMap[K, V]) CompareFnAndDelete(key K, fn func(V, V) bool, old V) bool

func (*SyncMap[K, V]) CompareFnAndSwap

func (m *SyncMap[K, V]) CompareFnAndSwap(key K, fn func(V, V) bool, old, new V) bool

func (*SyncMap[K, V]) Delete

func (m *SyncMap[K, V]) Delete(key K) bool

func (*SyncMap[K, V]) Filter added in v0.0.32

func (m *SyncMap[K, V]) Filter(filter func(K, V) bool) Map[K, V]

func (*SyncMap[K, V]) Keys added in v0.0.32

func (m *SyncMap[K, V]) Keys() []K

func (*SyncMap[K, V]) Load

func (m *SyncMap[K, V]) Load(key K) (value V, ok bool)

func (*SyncMap[K, V]) LoadAndDelete

func (m *SyncMap[K, V]) LoadAndDelete(key K) (value V, loaded bool)

func (*SyncMap[K, V]) LoadOrStore

func (m *SyncMap[K, V]) LoadOrStore(key K, new V) (actual V, loaded bool)

func (*SyncMap[K, V]) Range

func (m *SyncMap[K, V]) Range(iterator func(key K, value V) bool)

func (*SyncMap[K, V]) Store

func (m *SyncMap[K, V]) Store(key K, value V)

func (*SyncMap[K, V]) Swap

func (m *SyncMap[K, V]) Swap(key K, value V) (pre V, loaded bool)

func (*SyncMap[K, V]) Values added in v0.0.32

func (m *SyncMap[K, V]) Values() []V

type SyncSet

type SyncSet[T comparable] struct {
	// contains filtered or unexported fields
}

SyncSet is a thread-safe set implementation for comparable types.

func NewSyncSet

func NewSyncSet[T comparable](items ...T) *SyncSet[T]

NewSyncSet creates a new SyncSet and optionally adds initial items.

func (*SyncSet[T]) Append

func (s *SyncSet[T]) Append(values ...T)

Append adds one or more elements to the set. Duplicates are ignored.

func (*SyncSet[T]) Contains

func (s *SyncSet[T]) Contains(v T) bool

Contains reports whether the set contains the specified element.

func (*SyncSet[T]) Difference

func (s *SyncSet[T]) Difference(s1 Set[T]) Set[T]

Difference returns a new set containing elements in the current set but not in the other.

func (*SyncSet[T]) Intersection

func (s *SyncSet[T]) Intersection(s1 Set[T]) Set[T]

Intersection returns a new set containing elements present in both sets.

func (*SyncSet[T]) Len

func (s *SyncSet[T]) Len() int

Len returns the number of elements in the set.

func (*SyncSet[T]) Remove

func (s *SyncSet[T]) Remove(values ...T) bool

Remove deletes one or more elements from the set. Non-existent elements are ignored.

func (*SyncSet[T]) String

func (s *SyncSet[T]) String() string

String returns the string representation of the set's elements.

func (*SyncSet[T]) SymmetricDifference

func (s *SyncSet[T]) SymmetricDifference(s1 Set[T]) Set[T]

SymmetricDifference returns a new set containing elements present in either of the sets but not both.

func (*SyncSet[T]) Union

func (s *SyncSet[T]) Union(s1 Set[T]) Set[T]

Union returns a new set containing all elements from the current set and another set.

func (*SyncSet[T]) Values

func (s *SyncSet[T]) Values() []T

Values returns a slice containing all elements in the set (unordered).

type WorkQueue added in v0.0.31

type WorkQueue[T any] struct {
	// contains filtered or unexported fields
}

WorkQueue is a simple pub-sub style queue where each topic has exactly one subscriber (worker). A published message goes to exactly one consumer.

Typical use case: task queues, job workers.

T is the type of the message.

func NewWorkQueue added in v0.0.31

func NewWorkQueue[T any]() *WorkQueue[T]

NewWorkQueue creates a new WorkQueue with a given channel buffer size.

func (*WorkQueue[T]) AddTopic added in v0.0.31

func (u *WorkQueue[T]) AddTopic(topic string, bufSize int) error

AddTopic creates a new topic with its own buffered channel. Returns an error if the topic already exists.

func (*WorkQueue[T]) Close added in v0.0.31

func (u *WorkQueue[T]) Close()

Close closes all topic channels. Topics are not removed from the map.

func (*WorkQueue[T]) Publish added in v0.0.31

func (u *WorkQueue[T]) Publish(topic string, msg T) error

Publish sends a message to the topic's channel. If the channel buffer is full, the message is dropped silently. Returns an error if the topic does not exist.

func (*WorkQueue[T]) RemoveTopic added in v0.0.31

func (u *WorkQueue[T]) RemoveTopic(topic string) error

RemoveTopic closes and removes the topic's channel. Messages in the buffer are dropped.

func (*WorkQueue[T]) Subscribe added in v0.0.31

func (u *WorkQueue[T]) Subscribe(topic string) (<-chan T, error)

Subscribe returns the channel for the given topic. Only one subscriber should consume from the channel. Returns an error if the topic does not exist.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL