Documentation
¶
Overview ¶
Package journal contains all runtime components for Gazette journals, including Fragment & Spool for journal content, Head (serving replications), Tail (reads), and Broker (for brokering new writes).
Code generated by mockery v1.0.0 ¶
Code generated by mockery v1.0.0 ¶
Code generated by mockery v1.0.0 ¶
Code generated by mockery v1.0.0 ¶
Code generated by mockery v1.0.0
Index ¶
- Constants
- Variables
- func ErrorFromResponse(response *http.Response) error
- func NewWalkFuncAdapter(callback func(Fragment) error, rewrites ...string) filepath.WalkFunc
- func StatusCodeForError(err error) int
- type AppendArgs
- type AppendOp
- type AppendResult
- type AsyncAppend
- type Broker
- type BrokerConfig
- type Client
- type Creator
- type Doer
- type Fragment
- func (f Fragment) AsDirectURL(cfs cloudstore.FileSystem, duration time.Duration) (*url.URL, error)
- func (f Fragment) ContentName() string
- func (f *Fragment) ContentPath() string
- func (f Fragment) IsLocal() bool
- func (f Fragment) ReaderFromOffset(offset int64, cfs cloudstore.FileSystem) (io.ReadCloser, error)
- func (f Fragment) Size() int64
- type FragmentFile
- type FragmentPersister
- type FragmentSet
- type Getter
- type Head
- type Header
- type IndexWatcher
- type Mark
- type MarkedReader
- type MemoryBroker
- func (j *MemoryBroker) Create(journal Name) error
- func (j *MemoryBroker) Flush()
- func (j *MemoryBroker) Get(args ReadArgs) (ReadResult, io.ReadCloser)
- func (j *MemoryBroker) Head(args ReadArgs) (ReadResult, *url.URL)
- func (j *MemoryBroker) ReadFrom(name Name, r io.Reader) (*AsyncAppend, error)
- func (j *MemoryBroker) Write(name Name, b []byte) (*AsyncAppend, error)
- type MockDoer
- type MockFragmentFile
- func (_m *MockFragmentFile) Close() error
- func (_m *MockFragmentFile) Fd() uintptr
- func (_m *MockFragmentFile) Read(p []byte) (int, error)
- func (_m *MockFragmentFile) ReadAt(p []byte, off int64) (int, error)
- func (_m *MockFragmentFile) Seek(offset int64, whence int) (int64, error)
- func (_m *MockFragmentFile) Write(p []byte) (int, error)
- type MockGetter
- type MockHeader
- type MockWriter
- type Name
- type ReadArgs
- type ReadOp
- type ReadResult
- type Replica
- type ReplicateArgs
- type ReplicateOp
- type ReplicateResult
- type Replicator
- type RetryReader
- type RouteToken
- type Spool
- type Tail
- type WriteCommitter
- type Writer
Constants ¶
const (
AppendOpBufferSize = 100
)
const ReplicateOpBufferSize = 10
Variables ¶
var ( ErrExists = errors.New("journal exists") ErrNotBroker = errors.New("not journal broker") ErrNotFound = errors.New("journal not found") ErrNotReplica = errors.New("not journal replica") ErrNotYetAvailable = errors.New("offset not yet available") ErrReplicationFailed = errors.New("replication failed") ErrWrongRouteToken = errors.New("wrong route token") ErrWrongWriteHead = errors.New("wrong write head") )
var ErrInvalidDelta = errors.New("invalid delta")
Functions ¶
func ErrorFromResponse ¶
Maps a HTTP status code into a correponding Journal protocol error, or nil. Unknown status codes are converted into an error.
func NewWalkFuncAdapter ¶
NewWalkFuncAdapter returns a filepath.WalkFunc which parses encountered files as Fragments, and passes each to the provided |callback|. Prefix |rewrites| may be included, as pairs of "from", "to" prefixes which are applied in order. For example, NewWalkFuncAdapter(cb, "/from/", "/foo/to/", "/foo/", "/") would rewrite path "/from/bar" => "/to/bar".
func StatusCodeForError ¶
Maps Journal protocol errors into a unique HTTP status code. Other errors are mapped into http.StatusInternalServerError.
Types ¶
type AppendArgs ¶
type AppendArgs struct {
Journal Name
// Content to be appended to |Journal|. The append will consume |Content|
// until io.EOF, and abort the append (without committing any content)
// if any other error is returned by |Content.Read()|.
Content io.Reader
// Context which may trace, cancel or supply a deadline for the operation.
Context context.Context
}
func (AppendArgs) String ¶
func (a AppendArgs) String() string
type AppendOp ¶
type AppendOp struct {
AppendArgs
// Channel by which broker returns operation status.
Result chan AppendResult `json:"-"`
}
type AppendResult ¶
type AppendResult struct {
// Any error that occurred during the append operation (PUT request.)
Error error
// Write head at the completion of the operation.
WriteHead int64
// RouteToken of the Journal. Set on ErrNotBroker.
RouteToken
}
func (AppendResult) String ¶
func (a AppendResult) String() string
type AsyncAppend ¶
type AsyncAppend struct {
// Read-only, and valid only after Ready is signaled.
AppendResult
// Signaled with the AppendOp has completed.
Ready chan struct{}
}
Represents an AppendOp which is being asynchronously executed.
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker is responsible for scattering journal writes to each replica, i.e., brokering transactions.
func (*Broker) StartServingOps ¶
StartServingOps starts a loop to consume config updates and serves appends. Updates are always handled before appends.
func (*Broker) Stop ¶
func (b *Broker) Stop()
Stop shuts down the broker. It blocks until all pending config updates and appends are handled.
func (*Broker) UpdateConfig ¶
func (b *Broker) UpdateConfig(config BrokerConfig)
type BrokerConfig ¶
type BrokerConfig struct {
// Replica instances which should be involved in brokered transactions.
Replicas []Replicator
// Token representing the Broker's view of the current replication topology.
// Sent with replication requests and verified for consensus by each remote
// replica: for a transaction to succeed, all replicas must agree on the
// current |RouteToken|.
RouteToken
// Next offset of the next brokered write transaction. Also sent with
// replication requests and verifed for consensus by each remote replica:
// for a transaction to succeed, all replicas must agree on the |WriteHead|.
WriteHead int64
// contains filtered or unexported fields
}
BrokerConfig is used to periodically update Broker with updated cluster topology and replication configuration.
type Doer ¶
Provides low-level routing and access to a Gazette service, suitable for proxying requests and modeled on http.Client. The client will perform journal-based routing to the appropriate Gazette instance. See gazette.Client.
type Fragment ¶
type Fragment struct {
Journal Name
Begin, End int64
Sum [sha1.Size]byte
// Backing file of the fragment, if present locally.
File FragmentFile
// If fragment is remote, the time of last modification.
// NOTE(joshk): Does not get set in Client use.
// TODO(johnny): Is this the appropriate factoring?
RemoteModTime time.Time
}
func LocalFragments ¶
LocalFragments returns fragments of |journal| under the local |directory|.
TODO(johnny): Collapse with NewWalkFuncAdapter above, or deprecate as part of a larger local-fragment change (Issues #30 & #31).
func (Fragment) AsDirectURL ¶
func (f Fragment) AsDirectURL(cfs cloudstore.FileSystem, duration time.Duration) (*url.URL, error)
func (Fragment) ContentName ¶
func (*Fragment) ContentPath ¶
func (Fragment) ReaderFromOffset ¶
func (f Fragment) ReaderFromOffset(offset int64, cfs cloudstore.FileSystem) (io.ReadCloser, error)
type FragmentFile ¶
type FragmentFile interface {
Close() error
Read(p []byte) (n int, err error)
ReadAt(p []byte, off int64) (n int, err error)
Seek(offset int64, whence int) (int64, error)
Fd() uintptr
Write(p []byte) (n int, err error)
}
Portions of os.File interface used by Fragment. An interface is used (rather than directly using *os.File) in support of test mocks.
type FragmentPersister ¶
type FragmentPersister interface {
Persist(Fragment)
}
FragmentPersister accepts completed local fragment spools, which should be persisted to long-term storage. See |gazette.Persister|.
type FragmentSet ¶
type FragmentSet []Fragment
Maintains fragments ordered on |Begin| and |End|, with the invariant that no fragment is fully overlapped by another fragment in the set (though it may be overlapped by a combination of other fragments). Larger fragments are preferred (and will replace spans of overlapped smaller fragments). An implication of this invariant is that no two fragments have the same |Begin| or |End| (as that would imply an overlap). Both are monotonically increasing in the set: set[0].Begin represents the minimum offset, and set[len(set)-1].End represents the maximum offset.
func (*FragmentSet) Add ¶
func (s *FragmentSet) Add(fragment Fragment) bool
func (*FragmentSet) BeginOffset ¶
func (s *FragmentSet) BeginOffset() int64
func (*FragmentSet) EndOffset ¶
func (s *FragmentSet) EndOffset() int64
func (*FragmentSet) LongestOverlappingFragment ¶
func (s *FragmentSet) LongestOverlappingFragment(offset int64) int
Finds and returns the fragment covering |offset|, which has the most content after |offset|. If no fragment covers |offset|, the first fragment beginning after |offset| is returned.
type Getter ¶
type Getter interface {
Get(args ReadArgs) (ReadResult, io.ReadCloser)
}
Performs a Gazette GET operation.
type Head ¶
type Head struct {
// contains filtered or unexported fields
}
func NewHead ¶
func NewHead(journal Name, directory string, persister FragmentPersister, updates chan<- Fragment) *Head
func (*Head) Replicate ¶
func (h *Head) Replicate(op ReplicateOp)
func (*Head) StartServingOps ¶
type Header ¶
type Header interface {
Head(args ReadArgs) (result ReadResult, fragmentLocation *url.URL)
}
Performs a Gazette HEAD operation.
type IndexWatcher ¶
type IndexWatcher struct {
// contains filtered or unexported fields
}
IndexWatcher monitors a journal's storage location in the cloud filesystem for new fragments, by performing periodic directory listings. When new fragment metadata arrives, it's published to the journal Tail via a shared channel, which indexes the fragment and makes it available for read requests.
func NewIndexWatcher ¶
func NewIndexWatcher(journal Name, cfs cloudstore.FileSystem, updates chan<- Fragment) *IndexWatcher
func (*IndexWatcher) StartWatchingIndex ¶
func (w *IndexWatcher) StartWatchingIndex() *IndexWatcher
func (*IndexWatcher) Stop ¶
func (w *IndexWatcher) Stop()
func (*IndexWatcher) WaitForInitialLoad ¶
func (w *IndexWatcher) WaitForInitialLoad()
type MarkedReader ¶
type MarkedReader struct {
Mark Mark
io.ReadCloser
}
A MarkedReader delegates reads to an underlying reader, and maintains |Mark| such that it always points to the next byte to be read.
func NewMarkedReader ¶
func NewMarkedReader(mark Mark, r io.ReadCloser) *MarkedReader
func (*MarkedReader) AdjustedMark ¶
func (r *MarkedReader) AdjustedMark(br *bufio.Reader) Mark
AdjustedMark returns the current Mark adjusted for content read by |br| (which must wrap this MarkedReader) but unconsumed from |br|'s buffer.
func (*MarkedReader) Close ¶
func (r *MarkedReader) Close() error
type MemoryBroker ¶
type MemoryBroker struct {
// DelayWrites indicates that writes should queue (and their promises not resolve) until:
// * The next explicit Flush, or
// * DelayWrites is set to false and another Write occurs.
DelayWrites bool
// Content written to each journal.
Content map[Name]*bytes.Buffer
// Pending content which will be written on the next Flush (or write, if !DelayWrites).
Pending map[Name]*bytes.Buffer
// contains filtered or unexported fields
}
MemoryBroker provides an in-memory implementation of the Client interface. The intended use is within unit tests which exercise components coordinating through the Client interface.
func NewMemoryBroker ¶
func NewMemoryBroker() *MemoryBroker
NewMemoryBroker returns an initialized, zero-value MemoryBroker.
func (*MemoryBroker) Create ¶
func (j *MemoryBroker) Create(journal Name) error
func (*MemoryBroker) Flush ¶
func (j *MemoryBroker) Flush()
Flush resolves all pending writes and wakes any blocked read operations.
func (*MemoryBroker) Get ¶
func (j *MemoryBroker) Get(args ReadArgs) (ReadResult, io.ReadCloser)
func (*MemoryBroker) Head ¶
func (j *MemoryBroker) Head(args ReadArgs) (ReadResult, *url.URL)
func (*MemoryBroker) ReadFrom ¶
func (j *MemoryBroker) ReadFrom(name Name, r io.Reader) (*AsyncAppend, error)
func (*MemoryBroker) Write ¶
func (j *MemoryBroker) Write(name Name, b []byte) (*AsyncAppend, error)
type MockFragmentFile ¶
MockFragmentFile is an autogenerated mock type for the FragmentFile type
func (*MockFragmentFile) Close ¶
func (_m *MockFragmentFile) Close() error
Close provides a mock function with given fields:
func (*MockFragmentFile) Fd ¶
func (_m *MockFragmentFile) Fd() uintptr
Fd provides a mock function with given fields:
func (*MockFragmentFile) Read ¶
func (_m *MockFragmentFile) Read(p []byte) (int, error)
Read provides a mock function with given fields: p
func (*MockFragmentFile) ReadAt ¶
func (_m *MockFragmentFile) ReadAt(p []byte, off int64) (int, error)
ReadAt provides a mock function with given fields: p, off
type MockGetter ¶
MockGetter is an autogenerated mock type for the Getter type
func (*MockGetter) Get ¶
func (_m *MockGetter) Get(args ReadArgs) (ReadResult, io.ReadCloser)
Get provides a mock function with given fields: args
type MockHeader ¶
MockHeader is an autogenerated mock type for the Header type
func (*MockHeader) Head ¶
func (_m *MockHeader) Head(args ReadArgs) (ReadResult, *url.URL)
Head provides a mock function with given fields: args
type MockWriter ¶
MockWriter is an autogenerated mock type for the Writer type
func (*MockWriter) ReadFrom ¶
func (_m *MockWriter) ReadFrom(journal Name, r io.Reader) (*AsyncAppend, error)
ReadFrom provides a mock function with given fields: journal, r
func (*MockWriter) Write ¶
func (_m *MockWriter) Write(journal Name, buffer []byte) (*AsyncAppend, error)
Write provides a mock function with given fields: journal, buffer
type Name ¶
type Name string
A typed journal name. By convention, journals are named using a forward- slash notation which captures their hierarchical relationships into organizations, topics and partitions. For example, a complete Name might be: "company-journals/interesting-topic/part-1234"
type ReadArgs ¶
type ReadArgs struct {
Journal Name
// Desired offset to begin reading from. Value -1 has special handling, where
// the read is performed from the current write head. All other positive
// values specify a desired exact byte offset to read from. If the offset is
// not available (eg, because it represents a portion of Journal which has
// been permantently deleted), the broker will return the next available
// offset. Callers should therefore always inspect the ReadResult Offset.
Offset int64
// Whether the operation should block until content becomes available.
// ErrNotYetAvailable is returned if a non-blocking read has no ready content.
Blocking bool
// Context which may trace, cancel or supply a deadline for the operation.
Context context.Context
// Deprecated: Server-side support for deadlines will be removed. Use
// context.WithDeadline instead.
// The time at which blocking will expire
Deadline time.Time
}
type ReadOp ¶
type ReadOp struct {
ReadArgs
// Channel by which replica returns a ReadResult.
Result chan ReadResult `json:"-"`
}
type ReadResult ¶
type ReadResult struct {
Error error
// The effective offset of the operation.
Offset int64
// Write head at the completion of the operation.
WriteHead int64
// RouteToken of the Journal. Set on ErrNotReplica.
RouteToken
// Result fragment, set iff |Error| is nil.
Fragment Fragment
}
func (ReadResult) String ¶
func (a ReadResult) String() string
type Replica ¶
type Replica struct {
// contains filtered or unexported fields
}
Replica manages journal components required to serve brokered writes, replications, and reads. A Replica instance is capable of switching roles at any time (and multiple times), from a pure replica which may serve replication requests only, to a broker of the journal.
func NewReplica ¶
func NewReplica(journal Name, localDir string, persister FragmentPersister, cfs cloudstore.FileSystem) *Replica
func (*Replica) Replicate ¶
func (r *Replica) Replicate(op ReplicateOp)
func (*Replica) StartBrokeringWithPeers ¶
func (r *Replica) StartBrokeringWithPeers(routeToken RouteToken, peers []Replicator)
Switch the Replica into broker mode. Appends will be brokered to |peers| with the topology captured by |routeToken|.
func (*Replica) StartReplicating ¶
func (r *Replica) StartReplicating(routeToken RouteToken)
Switch the Replica into pure-replica mode.
type ReplicateArgs ¶
type ReplicateArgs struct {
Journal Name
// WriteHead (eg, first byte) of the replicated transaction.
// Already known and verified by all journal replicas.
WriteHead int64
// RouteToken of the transaction, also known and verified by all replicas.
RouteToken
// Flags whether replicas should begin a new spool for this transaction.
NewSpool bool
// Context which may trace, cancel or supply a deadline for the operation.
Context context.Context
}
func (ReplicateArgs) String ¶
func (a ReplicateArgs) String() string
type ReplicateOp ¶
type ReplicateOp struct {
ReplicateArgs
// Channel by which replica returns a ReplicateResult.
Result chan ReplicateResult `json:"-"`
}
type ReplicateResult ¶
type ReplicateResult struct {
Error error
// Iff |Error| is ErrWrongWriteHead, then |ErrorWriteHead| is the replica's
// own, strictly greater write head.
ErrorWriteHead int64
// Set iff |Error| is nil.
Writer WriteCommitter
}
func (ReplicateResult) String ¶
func (a ReplicateResult) String() string
type Replicator ¶
type Replicator interface {
Replicate(op ReplicateOp)
}
A Replicator is able to serve a ReplicateOp. It may be backed by a local Spool, or by a remote Gazette process.
type RetryReader ¶
type RetryReader struct {
// MarkedReader manages the current reader and offset tracking.
MarkedReader
// Whether read operations should block (the default). If Blocking is false,
// than Read operations may return ErrNotYetAvailable.
Blocking bool
// LastResult retains the result of the last journal read operation.
// Callers may access it to inspect metadata returned by the broker.
// It may be invalidated on every Read call.
LastResult ReadResult
// Getter against which to perform read operations.
Getter Getter
// Context to use in read operations issued to Getter.
Context context.Context
}
RetryReader wraps a Getter and MarkedReader to provide callers with a long-lived journal reader. RetryReader transparently handles and retries errors, and will block as needed to await new journal content.
func NewRetryReader
deprecated
func NewRetryReader(mark Mark, getter Getter) *RetryReader
Deprecated: Use NewRetryReaderContext instead. NewRetryReader returns a RetryReader at |mark|, using the provided |getter| for all operations.
func NewRetryReaderContext ¶
func NewRetryReaderContext(ctx context.Context, mark Mark, getter Getter) *RetryReader
NewRetryReaderContext returns a RetryReader at |mark|, using the provided |getter| and |ctx| for all operations.
func (*RetryReader) AdjustedSeek ¶
AdjustedSeek sets the offset for the next Read, accounting for buffered data and updating the buffer as needed.
func (*RetryReader) Read ¶
func (rr *RetryReader) Read(p []byte) (n int, err error)
Read returns the next available bytes of journal content, retrying as required retry errors or await content to be written. Read will return a non-nil error in the following cases:
- If the RetryReader context is cancelled.
- If Blocking is false, and ErrNotYetAvailable is returned by the broker.
All other errors are retried.
type RouteToken ¶
type RouteToken string
Token which describes the ordered set of responsible servers for a Journal: the first acts as broker, and the rest serve replications and reads (only). Structured as '|'-separated URLs rooting the server's Journal hierarchy. Ex: "http://srv-2/a/root|https://srv-1|http://12.34.56.7:8080/other/root".
type Tail ¶
type Tail struct {
// contains filtered or unexported fields
}
func (*Tail) StartServingOps ¶
type WriteCommitter ¶
type WriteCommitter interface {
io.Writer
// Commits the first |count| bytes of previous Write([]byte) content.
Commit(count int64) error
}
A WriteCommitter extends Writer with a protocol for committing those writes.
type Writer ¶
type Writer interface {
// Appends |buffer| to |journal|. Either all of |buffer| is written, or none
// of it is. Returns a Promise which is resolved when the write has been
// fully committed.
Write(journal Name, buffer []byte) (*AsyncAppend, error)
// Appends |r|'s content to |journal|, by reading until io.EOF. Either all of
// |r| is written, or none of it is. Returns a Promise which is resolved when
// the write has been fully committed.
ReadFrom(journal Name, r io.Reader) (*AsyncAppend, error)
}
A Writer allows for append-only writes to a named journal.