go-kitsune API Reference
Documents every exported operator and which StageOption features each one actually uses in its implementation.
Feature Key
| Column |
Meaning |
| Conc |
Concurrency(n): parallel workers |
| Ord |
Ordered(): preserve order in concurrent mode |
| Buf |
Buffer(n): output channel capacity |
| Name |
WithName(s): stage label for hooks/metrics |
| Err |
OnError(h): error handlers: Halt, Skip, Return, Retry, RetryThen |
| Sup |
Supervise(p): restart on error/panic: RestartOnError, RestartOnPanic, RestartAlways |
| TO |
Timeout(d): per-item deadline; cancels item context after d |
| Cache |
CacheBy(fn): memoize results; skip fn on cache hit |
| OvF |
Overflow(s): Block / DropNewest / DropOldest on full buffer |
| Clock |
WithClock(c): inject time source (for deterministic tests) |
| DS |
WithDedupSet(s): external deduplication backend |
| BT |
BatchTimeout(d): flush partial batch after d |
| FP |
Fast-path / fusion: internal optimization for serial, hook-free chains |
Cell values
| Symbol |
Meaning |
✓ |
Supported |
– |
Not applicable or not supported |
! |
Panics at construction time if used on this operator |
| Operator |
Signature |
Conc |
Ord |
Buf |
Name |
Err |
Sup |
TO |
Cache |
OvF |
Clock |
DS |
BT |
FP |
Map |
Map[I,O](p, fn, opts...) |
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
– |
– |
– |
✓ |
FlatMap |
FlatMap[I,O](p, fn, opts...) |
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
– |
✓ |
– |
– |
– |
✓ |
ConcatMap |
ConcatMap[I,O](p, fn, opts...) |
– |
– |
✓ |
✓ |
✓ |
✓ |
✓ |
– |
✓ |
– |
– |
– |
– |
Filter |
Filter[T](p, pred func(ctx,T)(bool,error), opts...) |
– |
– |
✓ |
✓ |
– |
✓ |
– |
– |
✓ |
– |
– |
– |
✓ |
Tap |
Tap[T](p, fn func(ctx,T)error, opts...) |
– |
– |
✓ |
✓ |
– |
✓ |
– |
– |
✓ |
– |
– |
– |
– |
TapError |
TapError[T](p, fn func(ctx,error)) |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
Finally |
Finally[T](p, fn func(ctx,error)) |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
ExpandMap |
ExpandMap[T](p, fn func(ctx,T)*Pipeline[T], opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
✓ |
– |
– |
Reject |
Reject[T](p, pred func(ctx,T)(bool,error), opts...) |
– |
– |
✓ |
✓ |
– |
✓ |
– |
– |
✓ |
– |
– |
– |
✓ |
IgnoreElements |
IgnoreElements[T](p); also (p).IgnoreElements() |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
ForEach |
(p).ForEach(fn, opts...) → *ForEachRunner[T] |
✓ |
✓ |
– |
✓ |
✓ |
✓ |
✓ |
– |
– |
– |
– |
– |
✓ |
Drain |
(p).Drain() → *DrainRunner[T] |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
Notes
- Filter, Tap, Reject support Supervise but not OnError; errors from their fn/pred propagate directly.
- TapError fires its callback only for non-context errors; context cancellation does not trigger the callback. It does not accept StageOption (implemented via Generate, like Catch).
- Finally fires for all exits (success, error, cancellation, early consumer stop). On early stop (e.g. downstream Take), fn receives nil. Does not accept StageOption.
- ExpandMap performs BFS expansion: items at depth N are all emitted before any item at depth N+1. fn may return nil for leaf nodes. Accepts WithName and Buffer. Use MaxDepth(n) to cap BFS depth (0 = roots only) and MaxItems(n) to cap total emissions; the stage closes normally when either cap is reached. Use VisitedBy(keyFn) to enable cycle detection (items whose key was already seen are skipped, along with their subtrees); combine with WithDedupSet to override the default MemoryDedupSet backend. WARNING: without MaxDepth, MaxItems, or a downstream Take, ExpandMap traverses the entire reachable graph and can exhaust memory on high-fanout inputs.
- ForEach returns a typed ForEachRunner[T]; call .Run(ctx) or .RunAsync(ctx). Supports Concurrency, Ordered, OnError, and Supervise.
- Drain returns a DrainRunner[T] (deprecated; use ForEach with a no-op). It satisfies Runnable, so it can be passed to MergeRunners directly; RunAsync is also available on it.
- Map → FlatMap → ForEach chains fuse into a single goroutine when the chain is serial, hook-free, and uses default overflow (FP column). See doc/tuning.md for exact eligibility conditions.
- Pipeline[T].IsOptimized(opts ...RunOption) []OptimizationReport reports fast-path and fusion eligibility for each stage. Pipeline[T].IsFastPath(opts ...RunOption) bool is a convenience wrapper. Both are non-destructive.
- WithContextMapper[T](fn func(T) context.Context) is accepted by Map, FlatMap, and ForEach. It extracts a per-item context from each item using fn, enabling per-item tracing or baggage propagation without requiring the item type to implement ContextCarrier. When set, it takes precedence over ContextCarrier. Setting WithContextMapper disqualifies the stage from the fast path (FP = – when set). See doc/options.md for full details.
2 · Higher-Order Maps
| Operator |
Signature |
Conc |
Ord |
Buf |
Name |
Err |
Sup |
TO |
Cache |
OvF |
Clock |
DS |
BT |
FP |
SwitchMap |
SwitchMap[I,O](p, fn, opts...) |
✓ |
– |
✓ |
✓ |
✓ |
✓ |
✓ |
– |
✓ |
– |
– |
– |
– |
ExhaustMap |
ExhaustMap[I,O](p, fn, opts...) |
✓ |
– |
✓ |
✓ |
✓ |
✓ |
✓ |
– |
✓ |
– |
– |
– |
– |
MapResult |
MapResult[I,O](p, fn, opts...) → (*Pipeline[O], *Pipeline[ErrItem[I]]) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
MapRecover |
MapRecover[I,O](p, fn, recover, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
MapPooled |
MapPooled[I,O](p, pool, fn func(ctx,I,*Pooled[O])error, opts...) |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
MapBatch (compat) |
MapBatch[I,O](p, size, fn, opts...) |
– |
– |
✓ |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
✓ |
– |
MapEvery |
MapEvery[I,O](p, n, fn, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
MapIntersperse (compat) |
MapIntersperse[T,O](p, sep, fn, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
Notes
- SwitchMap cancels the active inner pipeline when a new upstream item arrives. ExhaustMap ignores new items while an inner pipeline is active.
- Timeout on SwitchMap/ExhaustMap threads a per-item deadline into the inner fn context.
- MapBatch (compat) delegates to Batch + FlatMap; it only threads Buffer, Name, Err, and BatchTimeout to the internal Batch stage.
- MapPooled: fn receives *Pooled[O] (pointer). ReleaseAll takes []*Pooled[T]. Pooled[T].Release() has a pointer receiver.
- MapResult both branches must be consumed before calling Run.
| Operator |
Signature |
Conc |
Ord |
Buf |
Name |
Err |
Sup |
TO |
Cache |
OvF |
Clock |
DS |
BT |
FP |
MapWith |
MapWith[I,O,S](p, key, fn, opts...) |
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
! |
! |
✓ |
– |
– |
– |
– |
FlatMapWith |
FlatMapWith[I,O,S](p, key, fn, opts...) |
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
! |
– |
✓ |
– |
– |
– |
– |
MapWithKey |
MapWithKey[I,O,S](p, key, itemKeyFn, fn, opts...) |
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
! |
! |
✓ |
– |
– |
– |
– |
FlatMapWithKey |
FlatMapWithKey[I,O,S](p, key, itemKeyFn, fn, opts...) |
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
! |
– |
✓ |
– |
– |
– |
– |
Notes
- !: Timeout and CacheBy panic at construction; they are not meaningful for stateful loops.
- Concurrency semantics differ by operator:
- MapWith / FlatMapWith: each of n workers gets its own independent Ref[S] (worker-local state).
- MapWithKey / FlatMapWithKey: the key space is sharded across n workers via hash(key) % n. Same-key items always reach the same worker; lock-free in the hot path.
- Supervise wraps the stage loop; the Ref (or keyed ref map) is initialised outside the inner fn and is preserved across restarts.
- State TTL: NewKey("name", initial, StateTTL(d)). Ref.Get returns the zero value and resets the slot when the TTL has elapsed.
- Per-key eviction (MapWithKey / FlatMapWithKey only): WithKeyTTL(d) evicts map entries that have been inactive for longer than d; the next item for that key starts from the initial value. Eviction is lazy (checked on next access; no background goroutine). Independent of StateTTL. Use WithDefaultKeyTTL(d) as a run-level default; per-stage WithKeyTTL(0) overrides it to disable eviction for that stage.
4 · Batching & Windowing
| Operator |
Signature |
Conc |
Ord |
Buf |
Name |
Err |
Sup |
TO |
Cache |
OvF |
Clock |
DS |
BT |
FP |
Batch |
Batch[T](p, opts...) ✱ |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
✓ |
– |
✓ |
– |
Within |
Within[T,O](p, stage, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
BufferWith |
BufferWith[T,S](p, closingSelector, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
Unbatch |
Unbatch[T](p, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
SlidingWindow |
SlidingWindow[T](p, size, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
SessionWindow |
SessionWindow[T](p, gap, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
✓ |
– |
– |
– |
ChunkBy |
ChunkBy[T,K](p, keyFn, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
ChunkWhile |
ChunkWhile[T](p, pred func(prev,curr T)bool, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
Notes
- ✱ Batch requires at least one flush trigger (BatchCount, BatchMeasure, or BatchTimeout); panics at construction if none are provided.
- Batch supports WithClock only when BatchTimeout is also set (the clock powers the flush ticker).
- DropPartial discards the final partial batch when the source closes; only full batches are emitted. Use BufferWith(p, Ticker(d)) for fixed-duration time bucketing.
- BufferWith takes a second pipeline (closingSelector) as its flush trigger; each signal from that pipeline emits the current buffer. When the selector closes, any remaining items are flushed. Named BufferWith to avoid collision with the Buffer(n) stage option.
- ChunkBy emits a group when the key changes. ChunkWhile emits a group when the predicate between adjacent items is false.
5 · Aggregation
| Operator |
Signature |
Conc |
Ord |
Buf |
Name |
Err |
Sup |
TO |
Cache |
OvF |
Clock |
DS |
BT |
FP |
Scan |
Scan[T,S](p, seed, fn, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
Reduce |
Reduce[T,S](p, seed, fn, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
Distinct |
Distinct[T comparable](p, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
DistinctBy |
DistinctBy[T,K comparable](p, keyFn, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
Dedupe |
Dedupe[T comparable](p, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
✓ |
– |
– |
DedupeBy |
DedupeBy[T,K comparable](p, keyFn, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
✓ |
– |
– |
GroupBy |
GroupBy[T,K](p, keyFn, opts...) → *Pipeline[map[K][]T] |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
TakeRandom |
TakeRandom[T](p, n, opts...) → *Pipeline[[]T] |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
ReduceWhile |
ReduceWhile[T,S](p, seed, fn, opts...) → *Pipeline[S] |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
ToMap |
ToMap[T,K,V](p, keyFn, valFn, opts...) → *Pipeline[map[K]V] |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
Frequencies |
Frequencies[T comparable](p, opts...) → *Pipeline[map[T]int] |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
FrequenciesBy |
FrequenciesBy[T,K comparable](p, keyFn, opts...) → *Pipeline[map[K]int] |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
RunningCountBy |
RunningCountBy[T,K comparable](p, keyFn, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
RunningSumBy |
RunningSumBy[T,K comparable,V Numeric](p, keyFn, valFn, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
RunningFrequencies |
RunningFrequencies[T comparable](p, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
RunningFrequenciesBy |
RunningFrequenciesBy[T,K comparable](p, keyFn, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
RandomSample |
RandomSample[T](p, rate float64, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
Notes
- GroupBy, TakeRandom, ReduceWhile, ToMap, Frequencies, FrequenciesBy are all buffering pipeline operators: they buffer the full source and emit exactly one result when the source closes. Combine with Single to extract the value.
- Dedupe is identity-based (T comparable). DedupeBy is key-based. Default behaviour is global (all-seen) deduplication. Use DedupeWindow(1) for consecutive-only deduplication.
- RandomSample passes each item downstream with independent probability rate; it is stateless and streaming (no buffering).
- RunningFrequencies / RunningFrequenciesBy emit a fresh count-map snapshot after each item.
- RunningCountBy / RunningSumBy emit a fresh full snapshot after each item; the snapshot map is a copy so it is safe to retain.
6 · Fan-Out / Fan-In
| Operator |
Signature |
Conc |
Ord |
Buf |
Name |
Err |
Sup |
TO |
Cache |
OvF |
Clock |
DS |
BT |
FP |
Broadcast |
Broadcast[T](p, n, opts...) → []*Pipeline[T] |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
Balance |
Balance[T](p, n, opts...) → []*Pipeline[T] |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
KeyedBalance |
KeyedBalance[T](p, n, keyFn, opts...) → []*Pipeline[T] |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
Share |
Share[T](p, opts...) → func(opts...) *Pipeline[T] |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
Partition |
Partition[T](p, pred, opts...) → (*Pipeline[T], *Pipeline[T]) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
Merge |
Merge[T](pipelines...) |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
Zip |
Zip[A,B](a, b) → *Pipeline[Pair[A,B]] |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
ZipWith |
ZipWith[A,B,O](a, b, fn, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
Unzip |
Unzip[A,B](p, opts...) → (*Pipeline[A], *Pipeline[B]) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
CombineLatest |
CombineLatest[A,B](a, b) → *Pipeline[Pair[A,B]] |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
CombineLatestWith |
CombineLatestWith[A,B,O](a, b, fn, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
LatestFrom |
LatestFrom[T,U](p, other) → *Pipeline[Pair[T,U]] |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
LatestFromWith |
LatestFromWith[T,U,O](p, other, fn, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
Enrich |
Enrich[T,K,V,O](p, keyFn, fetch, join, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
✓ |
– |
LookupBy |
LookupBy[T,K,V](p, keyFn, fetch, opts...) → *Pipeline[Enriched[T,V]] |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
✓ |
– |
Notes
- Merge, Zip, CombineLatest, LatestFrom create no buffered output channel of their own, so Buffer does not apply.
- The *With variants run a user fn and do produce a buffered output channel; Buffer and Name apply.
- Broadcast requires n ≥ 2.
- Share returns a factory; call the factory once per desired branch before building the runner. At least one subscribe call is required. Buffer and WithName can be set on each individual subscribe call (per-subscribe opts override factory opts). Calling the factory after Run() has started panics. Unlike Broadcast, Share allows a single subscriber.
- LookupBy and Enrich also accept BatchTimeout as a first-class field on LookupConfig / EnrichConfig; the field is equivalent to passing the BatchTimeout(d) StageOption and is forwarded to the internal Batch stage.
7 · Sources
Source operators produce items from external input. They accept no StageOption.
| Operator |
Signature |
From |
From[T](src <-chan T) |
FromSlice |
FromSlice[T](items []T) |
FromIter |
FromIter[T](iter func(yield func(T)bool)) |
Generate |
Generate[T](fn func(ctx)(T,error)) |
Unfold |
Unfold[S,T](seed S, fn func(ctx,S)(T,S,bool,error)) |
Iterate |
Iterate[T](seed T, fn func(T)T) |
Repeatedly |
Repeatedly[T](fn func()T) |
Cycle |
Cycle[T](items []T) |
Empty |
Empty[T]() |
Never |
Never[T]() |
Concat |
Concat[T](factories ...func()*Pipeline[T]) |
Amb |
Amb[T](factories ...func()*Pipeline[T]) |
Using |
Using[T,R](acquire func(ctx)(R,error), build func(R)*Pipeline[T], release func(R)) |
NewChannel |
NewChannel[T]() → *Channel[T] (with Send, TrySend, Close) |
Time-based sources: accept Buffer, Name, and WithClock as StageOption:
| Operator |
Signature |
Ticker |
Ticker(d, opts...) → *Pipeline[time.Time] |
Timer |
Timer(delay, fn, opts...) → *Pipeline[T] |
8 · Terminal Functions
Terminal functions run the pipeline and return a materialised result. They accept ...RunOption (not StageOption).
| Operator |
Signature |
Collect |
Collect[T](ctx, p, opts...) → ([]T, error); also (p).Collect |
Single |
Single[T](ctx, p, opts...) → (T, error): expect exactly one item; use OrDefault(v) or OrZero[T]() for empty-stream fallback |
First |
First[T](ctx, p, opts...) → (T, bool, error); also (p).First |
Last |
Last[T](ctx, p, opts...) → (T, bool, error); also (p).Last |
Count |
Count[T](ctx, p, opts...) → (int, error); also (p).Count |
Any |
Any[T](ctx, p, fn, opts...) → (bool, error); also (p).Any |
All |
All[T](ctx, p, fn, opts...) → (bool, error); also (p).All |
Find |
Find[T](ctx, p, pred, opts...) → (T, bool, error); also (p).Find |
Iter |
Iter[T](ctx, p, opts...) → (iter.Seq[T], func()error); also (p).Iter |
Sum |
Sum[T](ctx, p, opts...) → (T, error) |
Min / Max |
Min[T](ctx, p, opts...) → (T, bool, error) |
MinMax |
MinMax[T](ctx, p, opts...) → (MinMaxResult[T], bool, error) |
MinBy / MaxBy |
MinBy[T,K](ctx, p, keyFn, less, opts...) → (T, bool, error) |
Contains |
Contains[T comparable](ctx, p, val, opts...) → (bool, error) |
SequenceEqual |
SequenceEqual[T comparable](ctx, a, b, opts...) → (bool, error) |
Notes
- Iter exposes a pipeline as iter.Seq[T] for range-over-func (Go 1.23+). Breaking out of the loop early cancels the pipeline.
- Single returns a plain error if the pipeline emits zero items (unless OrDefault(v) or OrZero[T]() is supplied) or more than one item.
9 · Timing & Observation
| Operator |
Signature |
Conc |
Ord |
Buf |
Name |
Err |
Sup |
TO |
Cache |
OvF |
Clock |
DS |
BT |
FP |
Throttle |
Throttle[T](p, window, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
✓ |
– |
– |
– |
Debounce |
Debounce[T](p, silence, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
✓ |
– |
– |
– |
Sample |
Sample[T](p, d, opts...): emit latest item per tick |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
✓ |
– |
– |
– |
Timestamp |
Timestamp[T](p, opts...) → *Pipeline[Timestamped[T]] |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
✓ |
– |
– |
– |
TimeInterval |
TimeInterval[T](p, opts...) → *Pipeline[TimedInterval[T]] |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
✓ |
– |
– |
– |
Notes
- Sample emits the most-recently-seen item on each tick and resets the latch; ticks with no new item produce no output. Unlike Debounce, it does not flush on source close.
10 · Sequence Operators
| Operator |
Signature |
Conc |
Ord |
Buf |
Name |
Err |
Sup |
TO |
Cache |
OvF |
Clock |
DS |
BT |
FP |
Take |
Take[T](p, n); also (p).Take |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
Drop |
Drop[T](p, n); also (p).Skip, (p).Drop |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
TakeWhile |
TakeWhile[T](p, pred func(T)bool) |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
DropWhile |
DropWhile[T](p, pred func(T)bool) |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
DropLast |
DropLast[T](p, n): omit last n items |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
TakeEvery |
TakeEvery[T](p, n) |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
DropEvery |
DropEvery[T](p, n) |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
MapEvery |
MapEvery[I,O](p, n, fn, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
WithIndex |
WithIndex[T](p, opts...) → *Pipeline[Indexed[T]] |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
Intersperse |
Intersperse[T](p, sep, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
Pairwise |
Pairwise[T](p, opts...) → *Pipeline[Consecutive[T]] |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
TakeUntil |
TakeUntil[T,U](p, boundary *Pipeline[U], opts...): pass items until boundary emits |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
DropUntil |
DropUntil[T,U](p, boundary *Pipeline[U], opts...): drop items until boundary emits |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
StartWith |
StartWith[T](p, items...) |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
EndWith |
EndWith[T](p, items...): append fixed items after source closes |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
– |
DefaultIfEmpty |
DefaultIfEmpty[T](p, val, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
✓ |
– |
– |
– |
– |
Sort |
Sort[T](p, less, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
SortBy |
SortBy[T,K](p, keyFn, less, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
– |
– |
– |
– |
Notes
- Take, Drop, TakeWhile, DropWhile, TakeEvery, DropEvery, DropLast use hardcoded buffer sizes and accept no options.
- TakeUntil / DropUntil accept any *Pipeline[U] as boundary; only its first emission matters.
- StartWith / EndWith accept no options; they delegate to Concat + FromSlice.
- Indexed[T] is struct{ Index int; Value T }.
- (p).Skip is an alias for Drop.
11 · Middleware
| Operator |
Signature |
Conc |
Ord |
Buf |
Name |
Err |
Sup |
TO |
Cache |
OvF |
Clock |
DS |
BT |
FP |
RateLimit |
RateLimit[T](p, rps float64, rlOpts []RateLimitOpt, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
✓ |
– |
– |
– |
CircuitBreaker |
CircuitBreaker[T](p, fn, cbOpts []CircuitBreakerOpt, opts...) |
– |
– |
✓ |
✓ |
– |
– |
– |
– |
– |
✓ |
– |
– |
– |
RateLimitOpt (not StageOption):
| Option |
Effect |
Burst(n int) |
Token bucket burst size |
RateMode(m) |
RateLimitWait (backpressure) or RateLimitDrop (skip excess) |
CircuitBreakerOpt (not StageOption):
| Option |
Effect |
FailureThreshold(n int) |
Failures before opening |
CooldownDuration(d) |
Time in open state before half-open |
HalfOpenProbes(n int) |
Successes required to close from half-open |
HalfOpenTimeout(d) |
If probes not received within d, reopen |
Notes
- WithName, Buffer, and WithClock are the only StageOptions that apply to both operators.
- CircuitBreaker emits ErrCircuitOpen when the circuit is open.
12 · Stage Composition
| Symbol |
Signature |
Notes |
Stage[I,O] |
type Stage[I,O any] func(*Pipeline[I]) *Pipeline[O] |
Named function type; zero-cost transformer |
(s).Apply |
(s Stage[I,O]).Apply(p *Pipeline[I]) *Pipeline[O] |
Apply a stage to a pipeline |
Then |
Then[I,M,O](s1 Stage[I,M], s2 Stage[M,O]) Stage[I,O] |
Compose two stages |
(p).Through |
(p *Pipeline[T]).Through(s Stage[T,T]) *Pipeline[T] |
Method form of Apply |
Or |
Or[I,O](primary, fallback func) Stage[I,O] |
Try primary; fall back to fallback on no output |
(s).Or |
(s Stage[I,O]).Or(fallback Stage[I,O]) Stage[I,O] (compat) |
Method form taking a full Stage |
13 · Error Routing
| Operator |
Signature |
Notes |
Catch |
Catch[T](p, fn func(error)*Pipeline[T]) |
On pipeline error, switch to fallback pipeline returned by fn |
MapResult |
MapResult[I,O](p, fn, opts...) → (*Pipeline[O], *Pipeline[ErrItem[I]]) |
Routes errors to a dead-letter branch |
MapRecover |
MapRecover[I,O](p, fn, recover, opts...) |
Inline recovery fn produces a fallback value |
14 · Helper / Lift Functions
| Function |
Signature |
Notes |
LiftFallible |
LiftFallible[I,O](fn func(I)(O,error)) |
Adapt error-returning fn to func(ctx,I)(O,error) |
LiftPure |
LiftPure[I,O](fn func(I)O) |
Wrap infallible fn |
FilterFunc |
FilterFunc[T](fn func(T)bool) |
Lift plain pred for use with free-fn Filter |
RejectFunc |
RejectFunc[T](fn func(T)bool) |
Lift plain pred for use with free-fn Reject |
TapFunc |
TapFunc[T](fn func(T)) |
Lift void fn for use with free-fn Tap |
TapErrorFunc |
TapErrorFunc(fn func(error)) |
Lift void error observer for use with free-fn TapError |
FinallyFunc |
FinallyFunc(fn func(error)) |
Lift void cleanup function for use with free-fn Finally |
ExpandMapFunc |
ExpandMapFunc[T](fn func(T)*Pipeline[T]) |
Lift context-free child factory for use with free-fn ExpandMap |
15 · Pool
| Symbol |
Signature |
Notes |
NewPool[T] |
NewPool[T](newFn func() T) *Pool[T] |
Construct a typed pool |
(p).Warmup |
(p *Pool[T]).Warmup(n int) |
Pre-populate pool; reduces first-request latency |
(p).Put |
(p *Pool[T]).Put(item *Pooled[T]) |
Return item without calling Release |
MapPooled |
MapPooled[I,O](p, pool, fn func(ctx,I,*Pooled[O])error, opts...) |
Acquire slot → call fn → emit *Pooled[O] |
ReleaseAll |
ReleaseAll[T](items []*Pooled[T]) |
Bulk release |
(item).Release |
(item *Pooled[T]).Release() |
Return single item to pool |
16 · Run Options
RunOption values are passed to Runner.Run(ctx, opts...), Runner.RunAsync(ctx, opts...), or any terminal function.
| Option |
Signature |
Description |
WithErrorStrategy |
WithErrorStrategy(h ErrorHandler) |
Default error handler for all stages that do not set their own OnError. Priority: stage OnError > WithErrorStrategy > Halt. Does not apply to MapResult. |
WithStore |
WithStore(s Store) |
State backend for MapWith, FlatMapWith, MapWithKey, FlatMapWithKey. |
WithHook |
WithHook(h Hook) |
Observability hook for the run. |
WithDrain |
WithDrain(timeout time.Duration) |
Graceful shutdown: let in-flight items drain before stopping. |
WithCache |
WithCache(cache Cache, ttl time.Duration) |
Default cache backend and TTL for Map stages using CacheBy. |
WithSampleRate |
WithSampleRate(n int) |
SampleHook.OnItemSample frequency (default 10). Negative disables. |
WithCodec |
WithCodec(c Codec) |
Serialisation codec for store-backed state and cache. Default: JSON. |
WithPauseGate |
WithPauseGate(g *Gate) |
Attach an external gate for pause/resume control. |
WithDefaultBuffer |
WithDefaultBuffer(n int) |
Default channel buffer size for all stages that do not set their own Buffer. Default: 16. Per-stage Buffer(n) takes precedence. |
WithDefaultKeyTTL |
WithDefaultKeyTTL(d time.Duration) |
Default inactivity TTL for all MapWithKey and FlatMapWithKey stages that do not set their own WithKeyTTL. Default: 0 (disabled). Per-stage WithKeyTTL takes precedence; WithKeyTTL(0) explicitly disables eviction for a stage. |
17 · Observability
All hooks are wired into every stage runner automatically when provided via WithHook.
| Interface |
Package |
Notes |
Hook |
internal (re-exported) |
Base: OnStageStart, OnItem, OnStageDone |
OverflowHook |
internal |
OnOverflow fired on DropNewest / DropOldest |
SupervisionHook |
internal |
OnRestart fired on supervision restart |
SampleHook |
internal |
OnItemSample fired every N items (configurable via WithSampleRate) |
GraphHook |
internal |
OnGraph fired at run-time with full []GraphNode |
BufferHook |
internal |
OnBufferChange fired on channel depth changes |
LogHook |
root |
Structured log output for every hook event |
MultiHook |
root |
Fan-out to multiple hook implementations |
MetricsHook |
root |
Lock-free atomic counters per stage; Snapshot() / Reset() / Snapshot().JSON() |
JSONCodec |
root |
Default codec for cache and store-backed Ref serialization |
GraphNode exposes: Kind, Name, Concurrency, Buffer, Overflow, BatchSize, Timeout, HasRetry, HasSupervision.
Pipeline[T].Describe() []GraphNode: returns the same []GraphNode snapshot synchronously, without executing the pipeline. Callable on any *Pipeline[T], including intermediate (non-terminal) stages. Useful for static validation and unit-testing graph structure without a full Run.
ContextCarrier: interface implemented by item types that carry a context.Context with an attached trace span. When an item implements ContextCarrier, the engine merges its context into the stage function call: cancellation still comes from the pipeline stage context, but context values (e.g. the active trace span) come from the item. Stage functions can call tracer.Start(ctx, "my-work") to create per-item child spans with no signature changes. Zero cost for items that don't implement the interface. See tails/kotel for OTel integration.
18 · DedupSet Backends
WithDedupSet(s) accepts any value implementing DedupSet:
| Backend |
Constructor |
Bounded Memory |
False Positives |
Expiry |
MemoryDedupSet |
MemoryDedupSet() |
No |
No |
No |
BloomDedupSet |
BloomDedupSet(expectedItems, fp) |
Yes |
Yes (configurable rate) |
No |
TTLDedupSet |
TTLDedupSet(ttl) |
Yes (bounded by active window) |
No |
Yes (lazy, per-key TTL) |
Notes
- MemoryDedupSet is the default for Dedupe, DedupeBy, and ExpandMap.
- BloomDedupSet panics if expectedItems <= 0 or falsePositiveRate is not in (0, 1).
- TTLDedupSet panics if ttl <= 0. Re-adding an existing key refreshes its expiry. Eviction is lazy (on next Contains or Add); no background goroutine is started.
- External backends (Redis, etc.) are available in tails/kredis via kredis.NewDedupSet.
19 · Testing Infrastructure
| Component |
Notes |
testkit.MustCollect |
Run a pipeline and collect output; t.Fatal on error |
testkit.CollectAndExpect |
Collect and assert exact ordered equality |
testkit.CollectAndExpectUnordered |
Collect and assert same multiset, any order |
testkit.MustRun / MustRunWithHook |
Run a sink runner; optionally capture a RecordingHook |
testkit.RecordingHook |
Captures Items, Errors, Drops, Restarts, Graph, Dones events |
testkit.NewTestClock |
Virtual clock for deterministic tests of time-sensitive operators |
testkit.FailAt / FailEvery |
Inject errors at specific item positions or intervals |
testkit.SlowMap / SlowSink |
Simulate slow I/O in map/sink functions |
Property tests (properties_test.go) |
Algebra invariants verified with pgregory.net/rapid (build tag: property); run via task test:property. Covers: Merge multiset/commutativity, Sort ordering/idempotence, Take∘Sort prefix semantics, Broadcast completeness, Balance item-count and round-robin fairness. |
20 · Tails (External Adapters)
Tails are separate Go modules under tails/ that adapt external systems to kitsune pipelines. Each follows the "user owns the client" principle: the caller creates, configures, and closes connections; kitsune never opens or closes them. See doc/tails.md for detailed usage examples.
| Module |
Package |
Source |
Sink |
Notes |
| Apache Kafka |
tails/kkafka |
Consume |
Produce |
segmentio/kafka-go; at-least-once: last message before a pipeline boundary redelivers on reconnect |
| NATS / JetStream |
tails/knats |
Subscribe, Consume |
Publish, JetStreamPublish |
nats.go; see also tails/kjetstream for pull, ordered, KV, and async publish |
| NATS JetStream (advanced) |
tails/kjetstream |
Fetch, FetchBytes, OrderedConsume, WatchKV |
PublishAsync (returns sink + flush), PutKV |
nats.go/jetstream; pull-batch, ordered consumers, KV watch, async batched publish |
| RabbitMQ / AMQP 0-9-1 |
tails/kamqp |
Consume |
Publish |
rabbitmq/amqp091-go; manual ack by default, configurable via WithAutoAck, WithRequeueOnNack |
| MQTT |
tails/kmqtt |
Subscribe |
Publish |
paho.mqtt.golang |
| Azure Service Bus |
tails/kazsb |
Receive |
Send |
azservicebus |
| Azure Event Hubs |
tails/kazeh |
Receive |
ProduceBatch |
azeventhubs |
| AWS SQS |
tails/ksqs |
Receive |
Send |
aws-sdk-go-v2 |
| AWS Kinesis |
tails/kkinesis |
Consume |
Put |
aws-sdk-go-v2 |
| Google Cloud Pub/Sub |
tails/kpubsub |
Receive |
Publish |
cloud.google.com/go/pubsub |
| Google Cloud Storage |
tails/kgcs |
ListObjects |
Upload |
cloud.google.com/go/storage |
| Apache Pulsar |
tails/kpulsar |
Consume |
Send |
apache/pulsar-client-go |
| Elasticsearch / OpenSearch |
tails/kes |
Scroll |
BulkIndex |
elastic/go-elasticsearch |
| MongoDB |
tails/kmongo |
Watch, Find |
Insert |
mongodb/mongo-go-driver |
| PostgreSQL |
tails/kpostgres |
Listen, Query |
Insert |
jackc/pgx |
| Redis |
tails/kredis |
Subscribe |
Publish |
redis/go-redis; also Store and Cache backends |
| ClickHouse |
tails/kclickhouse |
Query |
Insert |
ClickHouse/clickhouse-go |
| SQLite |
tails/ksqlite |
Query |
Insert |
mattn/go-sqlite3 |
| AWS S3 |
tails/ks3 |
ListObjects |
Upload |
aws-sdk-go-v2 |
| gRPC |
tails/kgrpc |
ServerStream |
ClientStream |
google.golang.org/grpc |
| HTTP |
tails/khttp |
Poll |
Post |
net/http |
| WebSocket |
tails/kwebsocket |
Receive |
Send |
nhooyr.io/websocket |
| File |
tails/kfile |
Lines, Watch |
Write |
os/bufio |
| OpenTelemetry |
tails/kotel |
– |
– |
Hook only; traces + metrics |
| Prometheus |
tails/kprometheus |
– |
– |
Hook only; Prometheus metrics |
| Datadog |
tails/kdatadog |
– |
– |
Hook only; DogStatsD metrics |