Documentation
¶
Overview ¶
Package broker implements an extensible MQTT broker.
Example ¶
server, err := transport.Launch("tcp://localhost:8080")
if err != nil {
panic(err)
}
done := make(chan struct{})
backend := NewMemoryBackend()
backend.Logger = func(e LogEvent, c *Client, pkt packet.Generic, msg *packet.Message, err error) {
if err != nil {
fmt.Printf("B [%s] %s\n", e, err.Error())
} else if msg != nil {
fmt.Printf("B [%s] %s\n", e, msg.String())
} else if pkt != nil {
fmt.Printf("B [%s] %s\n", e, pkt.String())
} else {
fmt.Printf("B [%s]\n", e)
}
if e == LostConnection {
close(done)
}
}
engine := NewEngine(backend)
engine.Accept(server)
c := client.New()
wait := make(chan struct{})
c.Callback = func(msg *packet.Message, err error) error {
if err != nil {
panic(err)
}
fmt.Printf("C [message] %s\n", msg.String())
close(wait)
return nil
}
cf, err := c.Connect(client.NewConfig("tcp://localhost:8080"))
if err != nil {
panic(err)
}
err = cf.Wait(10 * time.Second)
if err != nil {
panic(err)
}
sf, err := c.Subscribe("test", 0)
if err != nil {
panic(err)
}
err = sf.Wait(10 * time.Second)
if err != nil {
panic(err)
}
pf, err := c.Publish("test", []byte("test"), 0, false)
if err != nil {
panic(err)
}
err = pf.Wait(10 * time.Second)
if err != nil {
panic(err)
}
<-wait
err = c.Disconnect()
if err != nil {
panic(err)
}
<-done
err = server.Close()
if err != nil {
panic(err)
}
engine.Close()
Output: B [new connection] B [packet received] <Connect ClientID="" KeepAlive=30 Username="" Password="" CleanSession=true Will=nil Version=4> B [packet sent] <Connack SessionPresent=false ReturnCode=0> B [packet received] <Subscribe ID=1 Subscriptions=["test"=>0]> B [packet sent] <Suback ID=1 ReturnCodes=[0]> B [packet received] <Publish ID=0 Message=<Message Topic="test" QOS=0 Retain=false Payload=74657374> Dup=false> B [message published] <Message Topic="test" QOS=0 Retain=false Payload=74657374> B [message dequeued] <Message Topic="test" QOS=0 Retain=false Payload=74657374> B [packet sent] <Publish ID=0 Message=<Message Topic="test" QOS=0 Retain=false Payload=74657374> Dup=false> B [message forwarded] <Message Topic="test" QOS=0 Retain=false Payload=74657374> C [message] <Message Topic="test" QOS=0 Retain=false Payload=74657374> B [packet received] <Disconnect> B [client disconnected] B [lost connection]
Index ¶
- Variables
- func Run(engine *Engine, protocol string) (string, chan struct{}, chan struct{})
- type Ack
- type Backend
- type Client
- type Engine
- type LogEvent
- type MemoryBackend
- func (m *MemoryBackend) Authenticate(_ *Client, user, password string) (bool, error)
- func (m *MemoryBackend) Close(timeout time.Duration) bool
- func (m *MemoryBackend) Dequeue(client *Client) (*packet.Message, Ack, error)
- func (m *MemoryBackend) Log(event LogEvent, client *Client, pkt packet.Generic, msg *packet.Message, ...)
- func (m *MemoryBackend) Publish(client *Client, msg *packet.Message, ack Ack) error
- func (m *MemoryBackend) Restore(*Client) error
- func (m *MemoryBackend) Setup(client *Client, id string, clean bool) (Session, bool, error)
- func (m *MemoryBackend) Subscribe(client *Client, subs []packet.Subscription, ack Ack) error
- func (m *MemoryBackend) Terminate(client *Client) error
- func (m *MemoryBackend) Unsubscribe(client *Client, topics []string, ack Ack) error
- type Session
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrClientClosed = errors.New("client closed")
ErrClientClosed is returned if a client is being closed by the broker.
var ErrClientDisconnected = errors.New("client disconnected")
ErrClientDisconnected is returned if a client disconnects cleanly.
var ErrClosing = errors.New("closing")
ErrClosing is returned to a client if the backend is closing.
var ErrKillTimeout = errors.New("kill timeout")
ErrKillTimeout is returned to a client if the existing client does not close in time.
var ErrMissingSession = errors.New("missing session")
ErrMissingSession is returned if the backend does not return a session.
var ErrNotAuthorized = errors.New("not authorized")
ErrNotAuthorized is returned when a client is not authorized.
var ErrQueueFull = errors.New("queue full")
ErrQueueFull is returned to a client that attempts two write to its own full queue, which would result in a deadlock.
var ErrTokenTimeout = errors.New("token timeout")
ErrTokenTimeout is returned if the client reaches the token timeout.
var ErrUnexpectedPacket = errors.New("unexpected packet")
ErrUnexpectedPacket is returned when an unexpected packet is received.
Functions ¶
Types ¶
type Ack ¶ added in v0.6.0
type Ack func()
Ack is executed by the Backend or Client to signal either that a message will be delivered under the selected qos level and is therefore safe to be deleted from either queue or the successful handling of subscriptions.
type Backend ¶
type Backend interface {
// Authenticate should authenticate the client using the user and password
// values and return true if the client is eligible to continue or false
// when the broker should terminate the connection.
Authenticate(client *Client, user, password string) (ok bool, err error)
// Setup is called when a new client comes online and is successfully
// authenticated. Setup should return the already stored session for the
// supplied id or create and return a new one if it is missing or a clean
// session is requested. If the supplied id has a zero length, a new
// temporary session should be returned that is not stored further. The
// backend should also close any existing clients that use the same id.
//
// Note: In this call the Backend may also allocate other resources and
// set up the client for further usage as the broker will acknowledge the
// connection when the call returns. The Terminate function is called for
// every client that Setup has been called for.
Setup(client *Client, id string, clean bool) (a Session, resumed bool, err error)
// Restore is called after the client has restored packets from the session.
//
// The Backend should resubscribe stored subscriptions and begin with queueing
// missed offline messages. When all offline messages have been queued the
// client may receive online messages. Depending on the implementation, this
// may not be required as Dequeue will already pick up offline messages.
Restore(client *Client) error
// Subscribe should subscribe the passed client to the specified topics and
// store the subscription in the session. If an Ack is provided, the
// subscription will be acknowledged when called during or after the call to
// Subscribe.
//
// Incoming messages that match the supplied subscription should be added to
// a temporary or persistent queue that is drained when Dequeue is called.
//
// Retained messages that match the supplied subscription should be added to
// a temporary queue that is also drained when Dequeue is called. The messages
// must be delivered with the retained flag set to true.
Subscribe(client *Client, subs []packet.Subscription, ack Ack) error
// Unsubscribe should unsubscribe the passed client from the specified topics
// and remove the subscriptions from the session. If an Ack is provided, the
// unsubscription will be acknowledged when called during or after the call
// to Unsubscribe.
Unsubscribe(client *Client, topics []string, ack Ack) error
// Publish should forward the passed message to all other clients that hold
// a subscription that matches the message's topic. It should also add the
// message to all sessions that have a matching offline subscription. The
// later may only apply to message's with a QOS greater than 0. If an Ack is
// provided, the message will be acknowledged when called during or after
// the call to Publish.
//
// If the retained flag is set, messages with a payload should replace the
// currently retained message. Otherwise, the currently retained message
// should be removed. The flag should be cleared before publishing the
// message to other subscribed clients.
Publish(client *Client, msg *packet.Message, ack Ack) error
// Dequeue is called by the Client to obtain the next message from the queue
// and must return either a message or an error. The backend must only return
// no message and no error if the client's Closing channel has been closed.
//
// The Backend may return an Ack to receive a signal that the message is being
// delivered under the selected qos level and is therefore safe to be deleted
// from the queue. The Ack will be called before Dequeue is called again.
//
// The returned message must have a QOS set that respects the QOS set by
// the matching subscription.
Dequeue(client *Client) (*packet.Message, Ack, error)
// Terminate is called when the client goes offline. Terminate should
// unsubscribe the passed client from all previously subscribed topics. The
// backend may also convert a clients subscriptions to offline subscriptions.
//
// Note: The Backend may also clean up previously allocated resources for
// that client as the broker will close the connection when the call
// returns.
Terminate(client *Client) error
// Log is called multiple times during the lifecycle of a client see LogEvent
// for a list of all events.
Log(event LogEvent, client *Client, pkt packet.Generic, msg *packet.Message, err error)
}
A Backend provides the effective brokering functionality to its clients.
type Client ¶
type Client struct {
// MaximumKeepAlive may be set during Setup to enforce a maximum keep alive
// for this client. Missing or higher intervals will be set to the specified
// value.
//
// Will default to 5 minutes.
MaximumKeepAlive time.Duration
// ParallelPublishes may be set during Setup to control the number of
// parallel calls to Publish a client can perform. This setting also has an
// effect on how many incoming packets are stored in the client's session.
//
// Will default to 10.
ParallelPublishes int
// ParallelSubscribes may be set during Setup to control the number of
// parallel calls to Subscribe and Unsubscribe a client can perform.
//
// Will default to 10.
ParallelSubscribes int
// InflightMessages may be set during Setup to control the number of
// inflight messages from the broker to the client. This also defines how
// many outgoing packets are stored in the client's session.
//
// Will default to 10.
InflightMessages int
// TokenTimeout sets the timeout after which the client should fail when
// obtaining publish, subscribe and dequeue tokens in order to prevent
// potential deadlocks.
//
// Will default to 30 seconds.
TokenTimeout time.Duration
// PacketCallback can be set to inspect packets before processing and
// apply rate limits. To guarantee the connection lifecycle, Connect and
// Disconnect packets are not provided to the callback.
PacketCallback func(packet.Generic) error
// Ref can be used by the backend to attach a custom object to the client.
Ref interface{}
// contains filtered or unexported fields
}
A Client represents a remote client that is connected to the broker.
func (*Client) Closed ¶ added in v0.6.0
func (c *Client) Closed() <-chan struct{}
Closed returns a channel that is closed when the client is closed.
func (*Client) Closing ¶ added in v0.6.0
func (c *Client) Closing() <-chan struct{}
Closing returns a channel that is closed when the client is closing.
func (*Client) Conn ¶ added in v0.6.0
Conn returns the client's underlying connection. Calls to SetReadLimit, LocalAddr and RemoteAddr are safe.
type Engine ¶
type Engine struct {
// The Backend that will be passed to accepted clients.
Backend Backend
// ReadLimit defines the initial read limit.
ReadLimit int64
// MaxWriteDelay defines the initial max write delay.
MaxWriteDelay time.Duration
// ConnectTimeout defines the timeout to receive the first packet.
ConnectTimeout time.Duration
// OnError can be used to receive errors from the engine. If an error is
// received the server should be restarted.
OnError func(error)
// contains filtered or unexported fields
}
The Engine handles incoming connections and connects them to the backend.
type LogEvent ¶
type LogEvent string
LogEvent denotes the class of an event passed to the logger.
const ( // NewConnection is emitted when a client comes online. NewConnection LogEvent = "new connection" // PacketReceived is emitted when a packet has been received. PacketReceived LogEvent = "packet received" // MessagePublished is emitted after a message has been published. MessagePublished LogEvent = "message published" // MessageAcknowledged is emitted after a message has been acknowledged. MessageAcknowledged LogEvent = "message acknowledged" // MessageDequeued is emitted after a message has been dequeued. MessageDequeued LogEvent = "message dequeued" // MessageForwarded is emitted after a message has been forwarded. MessageForwarded LogEvent = "message forwarded" // PacketSent is emitted when a packet has been sent. PacketSent LogEvent = "packet sent" // ClientDisconnected is emitted when a client disconnects cleanly. ClientDisconnected LogEvent = "client disconnected" // TransportError is emitted when an underlying transport error occurs. TransportError LogEvent = "transport error" // SessionError is emitted when a call to the session fails. SessionError LogEvent = "session error" // BackendError is emitted when a call to the backend fails. BackendError LogEvent = "backend error" // ClientError is emitted when the client violates the protocol. ClientError LogEvent = "client error" // LostConnection is emitted when the connection has been terminated. LostConnection LogEvent = "lost connection" )
type MemoryBackend ¶
type MemoryBackend struct {
// The size of the session queue.
SessionQueueSize int
// The time after an error is returned while waiting on a killed existing
// client to exit.
KillTimeout time.Duration
// Client configuration options. See broker.Client for details.
ClientMaximumKeepAlive time.Duration
ClientParallelPublishes int
ClientParallelSubscribes int
ClientInflightMessages int
ClientTokenTimeout time.Duration
// A map of username and passwords that grant read and write access.
Credentials map[string]string
// The Logger callback handles incoming log events.
Logger func(LogEvent, *Client, packet.Generic, *packet.Message, error)
// contains filtered or unexported fields
}
A MemoryBackend stores everything in memory.
func NewMemoryBackend ¶
func NewMemoryBackend() *MemoryBackend
NewMemoryBackend returns a new MemoryBackend.
func (*MemoryBackend) Authenticate ¶
func (m *MemoryBackend) Authenticate(_ *Client, user, password string) (bool, error)
Authenticate will authenticate a clients credentials.
func (*MemoryBackend) Close ¶ added in v0.6.0
func (m *MemoryBackend) Close(timeout time.Duration) bool
Close will close all active clients and close the backend. The return value denotes if the timeout has been reached.
func (*MemoryBackend) Dequeue ¶ added in v0.6.0
Dequeue will get the next message from the temporary or stored queue.
func (*MemoryBackend) Log ¶ added in v0.7.2
func (m *MemoryBackend) Log(event LogEvent, client *Client, pkt packet.Generic, msg *packet.Message, err error)
Log will call the associated logger.
func (*MemoryBackend) Publish ¶
Publish will handle retained messages and add the message to the session queues.
func (*MemoryBackend) Restore ¶ added in v0.7.0
func (m *MemoryBackend) Restore(*Client) error
Restore is not needed at the moment.
func (*MemoryBackend) Subscribe ¶
func (m *MemoryBackend) Subscribe(client *Client, subs []packet.Subscription, ack Ack) error
Subscribe will store the subscription and queue retained messages.
func (*MemoryBackend) Terminate ¶
func (m *MemoryBackend) Terminate(client *Client) error
Terminate will disassociate the session from the client.
func (*MemoryBackend) Unsubscribe ¶
func (m *MemoryBackend) Unsubscribe(client *Client, topics []string, ack Ack) error
Unsubscribe will delete the subscription.
type Session ¶
type Session interface {
// NextID should return the next id for outgoing packets.
NextID() packet.ID
// SavePacket should store a packet in the session. An eventual existing
// packet with the same id should be quietly overwritten.
SavePacket(session.Direction, packet.Generic) error
// LookupPacket should retrieve a packet from the session using the packet id.
LookupPacket(session.Direction, packet.ID) (packet.Generic, error)
// DeletePacket should remove a packet from the session. The method should
// not return an error if no packet with the specified id does exist.
DeletePacket(session.Direction, packet.ID) error
// AllPackets should return all packets currently saved in the session.
AllPackets(session.Direction) ([]packet.Generic, error)
}
A Session is used to get packet ids and persist incoming/outgoing packets.