Operator Reference
This document covers every operator in go-kitsune. Each entry shows the exact Go signature, what the operator does, when to use or avoid it, which StageOption values apply, and a minimal working example.
Free functions (Map, FlatMap, Batch, …) can change the element type as items flow through. Methods (.Filter, .Take, .Skip, …) preserve the type. This split is a Go generic constraint: methods cannot introduce their own type parameters, so anything that changes Pipeline[A] to Pipeline[B] must be a free function.
Quick reference
Jump directly to any operator. See Contents for a grouped view.
| Operator | Category | Purpose |
|---|---|---|
FromSlice |
Source | Emit items from a Go slice |
From |
Source | Wrap an existing channel |
Generate |
Source | Push-based custom generator |
FromIter |
Source | Wrap iter.Seq[T] |
NewChannel |
Source | Send-from-anywhere push source |
Ticker |
Source | Periodic tick emission |
Timer |
Source | One-shot delay source |
Unfold |
Source | Seed-based generator |
Iterate |
Source | x, f(x), f(f(x)), … |
Repeatedly |
Source | Repeat a function call indefinitely |
Cycle |
Source | Loop over a fixed slice |
Empty |
Source | Complete immediately with no items |
Never |
Source | Block forever (until context cancellation) |
Concat |
Source | Strictly ordered concat of pipelines |
Amb |
Source | Race factories; keep the winner |
Catch |
Source | Fallback pipeline on error |
Retry |
Source | Re-run a pipeline on failure |
Using |
Source | Resource-scoped pipeline |
Map |
Transform | 1:1 element transform |
MapRecover |
Transform | Map with panic recovery |
MapResult |
Transform | Map with error-branch output |
Timestamp |
Transform | Tag each item with wall-clock time |
TimeInterval |
Transform | Tag each item with elapsed duration |
StartWith |
Transform | Prepend fixed items |
DefaultIfEmpty |
Transform | Emit a fallback if stream is empty |
Intersperse |
Transform | Insert separator between items |
LiftPure |
Transform | Adapt plain functions to stage signatures |
Materialize |
Transform | Wrap items and terminal error as Notification values |
Dematerialize |
Transform | Unwrap Notification stream back to T |
FlatMap |
Expansion | 1:N expansion |
ConcatMap |
Expansion | Sequential inner pipelines |
SwitchMap |
Expansion | Cancel previous inner on new item |
ExhaustMap |
Expansion | Ignore new items while inner is active |
ExpandMap |
Expansion | Recursive BFS expansion |
Pairwise |
Expansion | Emit consecutive pairs |
Unbatch |
Expansion | Flatten slices to individual items |
Filter |
Filter | Keep matching items |
IgnoreElements |
Filter | Drain for side effects, emit nothing |
Reject |
Filter | Drop matching items |
Take |
Filter | First N items |
Drop |
Filter | Skip first N items |
TakeWhile |
Filter | While predicate holds |
DropWhile |
Filter | Until predicate holds |
TakeEvery |
Filter | Keep every Nth item |
DropEvery |
Filter | Drop every Nth item |
Distinct |
Filter | Global dedup by value |
DistinctBy |
Filter | Global dedup by key function |
Dedupe |
Filter | Consecutive or set-backed dedup |
Key |
Stateful | Typed state reference |
MapWith |
Stateful | Map with run-scoped state |
FlatMapWith |
Stateful | FlatMap with run-scoped state |
MapWithKey |
Stateful | Sharded state by key |
FlatMapWithKey |
Stateful | FlatMap with sharded state |
Batch |
Batch | Collect items by count, measure, or timeout |
Within |
Batch | Apply a pipeline stage to slice contents |
MapBatch |
Batch | Batch → fn → flatten |
Window |
Batch | Count-based tumbling window |
SlidingWindow |
Batch | Overlapping sliding windows |
SessionWindow |
Batch | Gap-based session window |
ChunkBy |
Batch | Consecutive same-key grouping |
ChunkWhile |
Batch | Consecutive predicate grouping |
Merge |
Fan-in | N → 1 same-type streams |
Partition |
Fan-out | 1 → 2 by predicate |
Broadcast |
Fan-out | Copy to N branches |
Share |
Fan-out | Hot multicast factory |
Balance |
Fan-out | Round-robin N branches |
KeyedBalance |
Fan-out | Hash-key consistent fan-out |
Zip |
Fan-in | Pairwise combine two streams |
Unzip |
Fan-out | Split pairs into two pipelines |
LatestFrom |
Fan-in | Primary + latest secondary value |
CombineLatest |
Fan-in | Symmetric latest-pair emission |
SampleWith |
Fan-in | Emit latest on external pipeline signal |
LookupBy |
Enrichment | Batched key lookup |
Enrich |
Enrichment | LookupBy + join function |
Scan |
Aggregate | Running fold, emits each step |
RunningFrequencies |
Aggregate | Running frequency map, emits each step |
RunningFrequenciesBy |
Aggregate | Running frequency map by key function |
GroupBy |
Aggregate | Group items into a single map |
Reduce |
Aggregate | Fold to single buffered result |
ReduceWhile |
Aggregate | Fold until predicate fails |
TakeRandom |
Aggregate | Buffering random reservoir sample |
ToMap |
Aggregate | Collect to map |
Frequencies |
Aggregate | Count occurrences |
FrequenciesBy |
Aggregate | Count occurrences by key function |
Sort |
Aggregate | Sort and re-emit in order |
SortBy |
Aggregate | Sort by key function |
Collect |
Scalar terminal | Return []T |
First |
Scalar terminal | First item or zero |
Last |
Scalar terminal | Last item or zero |
Count |
Scalar terminal | Item count |
Any |
Scalar terminal | Predicate: any match |
All |
Scalar terminal | Predicate: all match |
Find |
Scalar terminal | First matching item |
Contains |
Scalar terminal | Membership test |
Sum |
Scalar terminal | Numeric sum |
Min |
Scalar terminal | Numeric minimum |
Max |
Scalar terminal | Numeric maximum |
MinMax |
Scalar terminal | Numeric minimum and maximum |
MinBy |
Scalar terminal | Min by key function |
MaxBy |
Scalar terminal | Max by key function |
SequenceEqual |
Scalar terminal | Element-wise equality check |
Iter |
Scalar terminal | iter.Seq[T] bridge |
Single |
Terminal | Expect exactly one item |
ForEach |
Terminal | Run side-effect sink |
Drain |
Terminal | Consume and discard |
Runner |
Terminal | Explicit run handle |
MergeRunners |
Terminal | Combine multiple runners |
Throttle |
Time | Rate-limit (leading edge) |
Debounce |
Time | Emit after quiet period |
Sample |
Time | Periodic latest-value emission |
RateLimit |
Resilience | Token-bucket limiter |
CircuitBreaker |
Resilience | Open/half-open/closed protection |
MapPooled |
Resilience | Object pool acquisition |
WithIndex |
Utility | Tag items with position |
Tap |
Utility | Side-effect on each item |
TapError |
Utility | Side-effect on each error |
Finally |
Utility | Side-effect on completion |
Stage[I,O] |
Composition | Composable typed pipeline fragment |
Halt |
Error option | Stop pipeline on first error |
Skip |
Error option | Drop errored items, continue |
Return |
Error option | Emit error as a value |
Retry |
Error option | Retry with backoff |
Contents
- Sources
- 1:1 Transforms
- 1:N Expansion
- Filtering & Selection
- Stateful Transforms
- Batching & Windowing
- Fan-Out & Fan-In
- Enrichment
- Aggregation & Collection
- Time-Based Operators
- Resilience
- Utility & Metadata
- Terminal Operators
- Stage Composition
- Error Handling Options
- Stage Options Reference
Sources
Sources are the entry point of every pipeline. They have no input pipeline; they produce items from an external data source, a collection, or a generator function.
FromSlice
Creates a pipeline that emits each element of items in order, then closes. The slice is captured by reference at construction time; do not modify it after calling FromSlice.
When to use: Tests, small fixed datasets, or any time you already have all the data in memory.
When not to use: Large datasets where you want to stream lazily; use Generate or FromIter instead.
Options: none (sources take no StageOption).
nums := kitsune.FromSlice([]int{1, 2, 3, 4, 5})
result, _ := nums.Collect(ctx)
// result == [1, 2, 3, 4, 5]
From
Wraps an existing channel as a pipeline source. The pipeline completes when src is closed.
When to use: Bridging existing channel-based code (Kafka consumer channels, os/signal channels, etc.) into a kitsune pipeline.
When not to use: When you control the producer; NewChannel or Generate give you better lifecycle management.
Options: none.
ch := make(chan Event, 64)
go kafkaConsumer(ch) // fills ch and closes it when done
p := kitsune.From(ch)
Generate
Creates a push-based source from a generator function. Call yield(item) for each item to emit; yield returns false when the pipeline is shutting down. Return nil on clean completion or an error to propagate it.
Generate is the lowest-level source primitive; all other sources are implemented with it. The context passed to fn is cancelled when the pipeline shuts down, so any blocking I/O inside fn (long-poll RPCs, database cursors) is interrupted cleanly.
When to use: Paginated APIs, database cursors, WebSocket feeds, or any external source you can drive with a loop.
Options: none.
pages := kitsune.Generate(func(ctx context.Context, yield func(Page) bool) error {
cursor := ""
for {
page, next, err := api.Fetch(ctx, cursor)
if err != nil {
return err
}
if !yield(page) {
return nil
}
if next == "" {
return nil
}
cursor = next
}
})
FromIter
Wraps a Go 1.23 iter.Seq[T] iterator as a pipeline source. Cancellation is checked between items.
When to use: When you have an iter.Seq[T] from slices.Values, maps.Keys, or a custom iterator and want to plug it into a pipeline.
Options: none.
NewChannel / Channel[T]
func NewChannel[T any](buffer int) *Channel[T]
func (c *Channel[T]) Source() *Pipeline[T]
func (c *Channel[T]) Send(ctx context.Context, item T) error
func (c *Channel[T]) TrySend(item T) bool
func (c *Channel[T]) Close()
A thread-safe, push-based source for external producers. Create with NewChannel, obtain the pipeline with Source(), push items with Send or TrySend, and call Close when no more items will be sent. Close is idempotent. Send blocks if the buffer is full; TrySend returns false immediately instead of blocking.
When to use: HTTP handlers, gRPC streams, or any external goroutine that needs to feed items into a running pipeline without knowing about kitsune internals.
Options: none.
ch := kitsune.NewChannel[Order](64)
p := ch.Source()
// In an HTTP handler (separate goroutine):
go func() {
http.HandleFunc("/order", func(w http.ResponseWriter, r *http.Request) {
var o Order
json.NewDecoder(r.Body).Decode(&o)
ch.Send(r.Context(), o)
})
}()
// When the server shuts down:
ch.Close()
Generate vs Channel[T]
Both Generate and Channel[T] bridge external code into a pipeline, but they suit different producer shapes.
| Aspect | Generate |
Channel[T] |
|---|---|---|
| Producer shape | A loop you write inside fn |
External goroutines that already exist |
| Control flow | The pipeline drives the loop; fn runs on the pipeline goroutine |
External code drives Send; the pipeline only consumes |
| Backpressure | yield blocks when downstream is full |
Send blocks when the buffer is full; TrySend returns false |
| Cancellation | ctx passed to fn is cancelled when the pipeline shuts down, so blocking I/O is interrupted automatically |
External producers must observe their own context; Close() signals the pipeline to drain |
| Lifecycle | Returning from fn closes the source |
Close() (idempotent) closes the source |
| Concurrency | Single goroutine (the pipeline runs fn once) |
Safe for concurrent Send/TrySend from many goroutines |
Choose Generate when the producer is a loop you can express inline: paginated REST APIs, database cursors, polling a queue, walking a filesystem. The pipeline owns the loop and shuts it down cleanly via context cancellation.
Choose Channel[T] when items arrive from goroutines the pipeline does not own: HTTP handlers, gRPC stream handlers, library callbacks, fan-in from multiple producer goroutines. Producers stay decoupled from kitsune internals and only need a Send call.
If you find yourself starting a goroutine inside Generate just to call yield, use Channel[T] instead. If you find yourself wrapping Channel[T] in a single-goroutine loop, use Generate instead.
Ticker
Emits the current time.Time at regular intervals. The first tick fires after d. The pipeline runs indefinitely until the context is cancelled or a downstream operator (like Take) stops it.
When to use: Periodic polling, heartbeats, scheduled work triggered by time.
Options: WithClock, WithName.
Timer
Emits exactly one value after delay by calling fn, then closes. If the context is cancelled before delay elapses, no value is emitted.
When to use: Deferred notifications, one-shot scheduled events, timeouts that produce a sentinel value.
Options: WithClock, WithName.
// Emit a "ping" string after 5 seconds.
ping := kitsune.Timer(5*time.Second, func() string { return "ping" })
Unfold
Generates a stream by repeatedly applying fn to an accumulator. fn receives the current state and returns (value, nextState, stop). When stop is true, the stream ends without emitting the value.
When to use: Sequences derived from recurrences (Fibonacci, geometric series, tree traversals) where each step depends on the last.
Options: none.
// Fibonacci sequence
fib := kitsune.Unfold([2]int{0, 1}, func(s [2]int) (int, [2]int, bool) {
return s[0], [2]int{s[1], s[0] + s[1]}, false
}).Take(8)
// emits: 0, 1, 1, 2, 3, 5, 8, 13
Iterate
Creates an infinite stream starting with seed where each subsequent value is produced by applying fn to the previous one. Use Take or TakeWhile to bound it.
Options: none.
Repeatedly
Creates an infinite stream by calling fn on each iteration. Use Take or TakeWhile to bound it.
When to use: Sampling a random number generator, reading from a sensor, generating UUIDs.
Options: none.
Cycle
Creates an infinite stream that repeatedly loops over items. Panics if items is empty.
Options: none.
Empty
Returns a Pipeline that completes immediately with no items. Useful as an identity element in pipeline composition and as a base case in tests.
Merge(Empty[T](), p) behaves identically to p for any pipeline p. Amb(Empty[T](), p) is not the same: Amb forwards whichever emits first, and Empty completes before emitting, so the winner is p.
Options: none.
// Use as a no-op source in conditional pipelines:
var src *kitsune.Pipeline[Event]
if cond {
src = realSource()
} else {
src = kitsune.Empty[Event]()
}
See also: Never (blocks indefinitely), FromSlice with a nil slice.
Never
Returns a Pipeline that never emits any items and never completes until the context is cancelled. The absorbing element for Amb: Amb(Never[T](), p) always forwards from p.
Useful as a placeholder in tests that assert on other branches, or to keep a merged pipeline alive while other branches are active.
Options: none.
// In a test: ensure the other branch wins the race.
winner := kitsune.Amb(
func() *kitsune.Pipeline[int] { return kitsune.Never[int]() },
func() *kitsune.Pipeline[int] { return kitsune.FromSlice([]int{1, 2, 3}) },
)
See also: Empty (completes immediately), Amb.
Concat
Runs each pipeline factory in order: all items from factory[0] are emitted before factory[1] starts. Factories are called lazily; each one is invoked only after the previous pipeline has fully completed. Accepts factories (not *Pipeline values directly) so each run creates a fresh pipeline graph.
When to use: Sequential sources that must not overlap, such as first emitting a header, then body rows, then a footer.
Options: none.
kitsune.Concat(
func() *kitsune.Pipeline[int] { return kitsune.FromSlice([]int{1, 2}) },
func() *kitsune.Pipeline[int] { return kitsune.FromSlice([]int{3, 4}) },
)
// emits: 1, 2, 3, 4
Amb
Subscribes to all factories concurrently and forwards items exclusively from whichever factory emits first, immediately cancelling all others. If no factory emits before the context is cancelled, the pipeline produces no items.
When to use: Hedged requests: try a primary and a replica simultaneously, use whichever responds first.
Options: none.
result := kitsune.Amb(
func() *kitsune.Pipeline[Result] { return fetchFromPrimary(ctx) },
func() *kitsune.Pipeline[Result] { return fetchFromReplica(ctx) },
)
Catch
Runs p normally. If p returns a non-nil, non-context error, fn is called with the error and the returned fallback pipeline is subscribed; items already emitted by p are kept and the fallback's items follow. If p completes without error, the fallback is never started.
When to use: Streaming fallback to a secondary source when the primary fails.
Options: none.
kitsune.Catch(primaryFeed, func(err error) *kitsune.Pipeline[Event] {
log.Printf("primary failed (%v); switching to backup", err)
return backupFeed()
})
Retry
Re-runs the entire pipeline p from scratch whenever it errors, according to pol. This is the right primitive for sources that must reconnect on failure: websocket tails, change-data-capture streams, long-poll HTTP. The correct response to a disconnect is to re-establish the connection and resume.
Items produced during any attempt (including partial output from a failed attempt) are forwarded downstream immediately; Retry does not buffer or replay. Downstream observes the concatenation of each attempt's output.
When to use: Persistent reconnect-on-drop semantics for a source pipeline. For per-item retries of a transformation function, use OnError(RetryMax(...)) instead.
Options: none. Configure behaviour via the RetryPolicy argument.
RetryPolicy fields:
| Field | Type | Description |
|---|---|---|
MaxAttempts |
int |
Total runs allowed including the first. ≤ 0 means unlimited. |
Backoff |
Backoff |
Wait duration before the Nth retry (0-indexed). nil = no delay. |
Retryable |
func(error) bool |
Which errors are eligible for retry. nil = all non-context errors. |
OnRetry |
func(attempt int, err error, wait time.Duration) |
Called before each sleep; useful for logging. |
Convenience constructors:
kitsune.RetryUpTo(n, backoff) // at most n total attempts
kitsune.RetryForever(backoff) // retry until context cancellation
// Reconnecting websocket tail: retry forever with exponential backoff.
kitsune.Retry(
kitsune.Generate(websocketTail),
kitsune.RetryForever(kitsune.ExponentialBackoff(100*time.Millisecond, 30*time.Second)),
)
// Retry up to 5 times, only on transient network errors.
kitsune.Retry(
primaryFeed,
kitsune.RetryUpTo(5, kitsune.FixedBackoff(time.Second)).
WithRetryable(func(err error) bool { return errors.Is(err, io.ErrUnexpectedEOF) }).
WithOnRetry(func(attempt int, err error, _ time.Duration) {
log.Printf("retry %d after: %v", attempt+1, err)
}),
)
Using
func Using[T, R any](
acquire func(context.Context) (R, error),
build func(R) *Pipeline[T],
release func(R),
) *Pipeline[T]
Acquires a resource, builds a pipeline from it, and guarantees release is called exactly once when the pipeline exits, regardless of success, error, or cancellation. If acquire returns an error, no items are emitted and release is not called.
When to use: Database connections, file handles, or any resource that must be explicitly released when the pipeline finishes.
Options: none.
p := kitsune.Using(
func(ctx context.Context) (*sql.Rows, error) {
return db.QueryContext(ctx, "SELECT id, name FROM users")
},
func(rows *sql.Rows) *kitsune.Pipeline[User] {
return kitsune.Generate(func(ctx context.Context, yield func(User) bool) error {
for rows.Next() {
var u User
if err := rows.Scan(&u.ID, &u.Name); err != nil {
return err
}
if !yield(u) {
return nil
}
}
return rows.Err()
})
},
func(rows *sql.Rows) { rows.Close() },
)
1:1 Transforms
Each item in produces exactly one item out, potentially of a different type.
Map
func Map[I, O any](p *Pipeline[I], fn func(context.Context, I) (O, error), opts ...StageOption) *Pipeline[O]
Applies fn to each item, potentially changing the element type from I to O. The context passed to fn is cancelled if the pipeline shuts down; always use it for any I/O calls inside fn.
With Concurrency(n) > 1, fn is called from n goroutines in parallel. Results arrive in completion order unless you add Ordered(). The engine has a serial fast path that activates when concurrency is 1, there is no error handler override, and no hook; this path avoids all overhead per item.
When to use: Any 1:1 transformation: JSON parsing, struct conversion, external API calls.
When not to use: When you need to emit zero or multiple outputs per item; use Filter or FlatMap.
Options: Concurrency, Ordered, OnError, Buffer, Overflow, WithName, Timeout, Supervise, CacheBy.
// Parse log lines concurrently, preserving order.
parsed := kitsune.Map(lines, func(ctx context.Context, line string) (LogEntry, error) {
return parseLine(line)
}, kitsune.Concurrency(8), kitsune.Ordered())
MapRecover
func MapRecover[I, O any](
p *Pipeline[I],
fn func(context.Context, I) (O, error),
recover func(context.Context, I, error) O,
opts ...StageOption,
) *Pipeline[O]
Applies fn to each item. If fn returns an error or panics, recover is called with the original item and the error to produce a fallback output value. The output pipeline always emits exactly one item per input; no items are dropped and the pipeline never halts on a per-item failure.
When to use: When you want a guaranteed-1:1 transform that substitutes a sentinel or default on failure, rather than propagating the error or dropping the item.
Options: Buffer, Timeout, WithName.
enriched := kitsune.MapRecover(events,
func(ctx context.Context, e Event) (EnrichedEvent, error) {
return enrich(ctx, e)
},
func(ctx context.Context, e Event, err error) EnrichedEvent {
log.Printf("enrich failed for %v: %v", e.ID, err)
return EnrichedEvent{Event: e, Enriched: false}
},
)
MapResult
func MapResult[I, O any](
p *Pipeline[I],
fn func(context.Context, I) (O, error),
opts ...StageOption,
) (*Pipeline[O], *Pipeline[ErrItem[I]])
Applies fn to each item and routes by outcome: successful outputs go to the first (ok) pipeline; failures go to the second (failed) pipeline as ErrItem[I] values containing the original input and the error. The pipeline never halts. Both output pipelines must be consumed.
ErrItem[I] is defined as:
Unlike OnError(Skip()), failed items are not silently discarded; they are available for inspection, logging, or dead-letter storage. MapResult does not retry; wrap fn with your own retry loop if you need transient-failure handling before routing to the dead-letter branch.
When to use: When you want to process failures separately without halting the pipeline: audit trails, error reporting pipelines, reprocessing queues.
Options: Buffer, Timeout, WithName.
ok, failed := kitsune.MapResult(records, func(ctx context.Context, r Record) (Stored, error) {
return db.Store(ctx, r)
})
runner, _ := kitsune.MergeRunners(
ok.ForEach(func(_ context.Context, s Stored) error {
metrics.Inc("stored")
return nil
}),
failed.ForEach(func(_ context.Context, e kitsune.ErrItem[Record]) error {
log.Printf("failed to store %v: %v", e.Item.ID, e.Err)
return nil
}),
)
runner.Run(ctx)
Timestamp
Tags each item with the wall-clock time it was observed at this stage. Emits Timestamped[T]{Value T; Time time.Time}.
Options: WithClock, WithName, Buffer.
TimeInterval
Tags each item with the elapsed duration since the previous item. The first item always has Elapsed == 0. Emits TimedInterval[T]{Value T; Elapsed time.Duration}. Always runs at Concurrency(1).
Options: WithClock, WithName, Buffer.
intervals := kitsune.TimeInterval(events)
// use intervals to detect stalls: if Elapsed > threshold, alert
StartWith
Prepends one or more items before the first item from p. The prefix is always emitted in full before p begins.
Options: none.
DefaultIfEmpty
Forwards all items from p unchanged. If p completes without emitting any items, emits defaultVal once.
Options: Buffer, WithName.
Intersperse
Inserts sep between consecutive items. The separator is never emitted at the start or end of the stream.
Options: Buffer, WithName.
LiftPure / LiftFallible
func LiftPure[I, O any](fn func(I) O) func(context.Context, I) (O, error)
func LiftFallible[I, O any](fn func(I) (O, error)) func(context.Context, I) (O, error)
Adapter helpers that wrap a context-free or context-free-fallible function into the signature expected by Map, Filter, etc.
Materialize / Dematerialize
func Materialize[T any](p *Pipeline[T]) *Pipeline[Notification[T]]
func Dematerialize[T any](p *Pipeline[Notification[T]], opts ...StageOption) *Pipeline[T]
Materialize converts each item and the terminal outcome of p into a Notification[T] value emitted on a single output pipeline. Dematerialize is the inverse: it unwraps a Notification[T] stream back into a plain T stream.
Notification[T] is a sum type:
Helper constructors and predicates are provided:
| Constructor | IsValue() |
IsError() |
IsComplete() |
|---|---|---|---|
NextNotification(v) |
✓ | ||
ErrorNotification(err) |
✓ | ||
CompleteNotification[T]() |
✓ |
Materialize emission rules:
| Upstream event | Emitted notification | Run result |
|---|---|---|
Item v |
NextNotification(v) |
— |
| Normal completion | CompleteNotification[T]() |
nil |
Pipeline error err |
ErrorNotification[T](err) |
nil |
| Context cancellation | (none) | ctx.Err() |
The key property: Materialize never propagates pipeline errors. They are encoded as the final notification instead, so downstream operators continue running. Context cancellation is not materialized; it exits the run immediately.
Dematerialize processing rules:
| Notification | Action |
|---|---|
IsValue() |
Emit n.Value downstream |
IsComplete() |
Complete normally |
IsError() |
Re-inject n.Err as a pipeline error |
| Upstream closed without terminal | Complete normally (defensive) |
When to use: When you need to pass error events through operators that only handle T; for example, routing, logging, or filtering errors without halting the pipeline. The standard pattern:
// Classify notifications by outcome without halting on errors.
classified := kitsune.Map(
kitsune.Materialize(src),
func(_ context.Context, n kitsune.Notification[Event]) (TaggedEvent, error) {
if n.IsError() {
return TaggedEvent{Err: n.Err, Source: "pipeline"}, nil
}
if n.IsComplete() {
return TaggedEvent{Done: true}, nil
}
return TaggedEvent{Event: n.Value}, nil
},
)
Options (Dematerialize only): Buffer, Overflow, WithName.
Options (Materialize): none; it wraps the upstream run internally and cannot accept per-stage options.
See also: MapResult (routes errors to a separate pipeline without materialization), TapError (side-effect on terminal error), Catch (fallback pipeline on error).
1:N Expansion
These operators allow each input item to produce zero or more output items.
FlatMap
func FlatMap[I, O any](
p *Pipeline[I],
fn func(context.Context, I, func(O) error) error,
opts ...StageOption,
) *Pipeline[O]
Transforms each input into zero or more outputs. fn calls yield(item) for each output it wants to emit; yield returns an error if the pipeline is shutting down. With Concurrency(n) > 1, multiple input items are processed in parallel. With Ordered(), all outputs from item i are emitted before any outputs from item i+1.
When to use: Expanding records into sub-records, fetching related items per input, or flattening nested structures.
Options: Concurrency, Ordered, OnError, Buffer, Overflow, WithName, Timeout, Supervise.
// Expand each user into their orders.
orders := kitsune.FlatMap(users,
func(ctx context.Context, u User, yield func(Order) error) error {
orders, err := db.OrdersForUser(ctx, u.ID)
if err != nil {
return err
}
for _, o := range orders {
if err := yield(o); err != nil {
return err
}
}
return nil
}, kitsune.Concurrency(4))
ConcatMap
func ConcatMap[I, O any](
p *Pipeline[I],
fn func(context.Context, I, func(O) error) error,
opts ...StageOption,
) *Pipeline[O]
Like FlatMap but always processes items sequentially; the next item starts only after the current item's inner pipeline has fully emitted. Output order is fully preserved. This is equivalent to FlatMap with Concurrency(1), but the intent is made explicit.
When to use: When you need strictly ordered output or when sub-streams have side effects that must not overlap.
Options: OnError, Buffer, Overflow, WithName, Timeout, Supervise.
ConcatMap is always serial. Passing Concurrency(n) with n > 1 panics at pipeline construction time; use FlatMap with Concurrency(n) if you want parallel fan-out.
// Each file is processed completely before the next starts.
lines := kitsune.ConcatMap(filePaths,
func(ctx context.Context, path string, yield func(string) bool) error {
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
if err := yield(scanner.Text()); err != nil {
return err
}
}
return scanner.Err()
})
SwitchMap
func SwitchMap[I, O any](
p *Pipeline[I],
fn func(context.Context, I, func(O) error) error,
opts ...StageOption,
) *Pipeline[O]
Transforms each input into a sub-stream. When a new input arrives, the currently running sub-stream is cancelled immediately and the new one begins. Only the most-recently-started sub-stream's outputs reach downstream; older sub-streams are abandoned even if they have not finished emitting.
When to use: Typeahead search (cancel the previous request when the user types again), "latest wins" streaming; only the most recent input matters.
When not to use: When you need all outputs from all inputs, or when sub-streams have committed side effects that cannot be safely cancelled mid-flight.
Options: OnError, Buffer, Overflow, WithName, Timeout, Supervise.
// For each search query, fetch results; cancel old search when new query arrives.
results := kitsune.SwitchMap(queries,
func(ctx context.Context, q string, yield func(Result) error) error {
hits, err := search.Query(ctx, q)
if err != nil {
return err
}
for _, h := range hits {
if err := yield(h); err != nil {
return err
}
}
return nil
})
ExhaustMap
func ExhaustMap[I, O any](
p *Pipeline[I],
fn func(context.Context, I, func(O) error) error,
opts ...StageOption,
) *Pipeline[O]
Transforms each input into a sub-stream, but while a sub-stream is in progress, new input items are silently dropped. Only when the current sub-stream finishes is the next item accepted. The opposite of SwitchMap, using "first wins" rather than "latest wins".
When to use: Rate-limited refresh operations where you want to ignore duplicate triggers while one is already running, e.g., a cache refresh that should not run concurrently with itself.
Options: OnError, Buffer, Overflow, WithName, Timeout, Supervise.
// Refresh cache on signal; ignore duplicate signals while refresh is in progress.
refreshed := kitsune.ExhaustMap(signals,
func(ctx context.Context, _ Signal, yield func(CacheSnapshot) error) error {
snap, err := rebuildCache(ctx)
if err != nil {
return err
}
return yield(snap)
})
ExpandMap
func ExpandMap[T any](
p *Pipeline[T],
fn func(context.Context, T) *Pipeline[T],
opts ...StageOption,
) *Pipeline[T]
BFS graph expansion. For each item, fn returns a child pipeline (or nil for leaf items). Items are emitted in BFS order. Use VisitedBy to detect and skip cycles.
When to use: Tree or DAG traversal where each node can produce more nodes of the same type: directory trees, dependency graphs, org charts.
Options: Buffer, WithName, MaxDepth, MaxItems, VisitedBy (for cycle detection), WithDedupSet.
Warning: unbounded by default. Without
MaxDepth,MaxItems, or a downstreamTake(n),ExpandMapwill traverse the entire reachable graph. A graph with branching factorfanand depthdproduces up tofan^ditems, which can exhaust memory silently as the BFS queue grows. Always bound expansion on untrusted or potentially deep inputs.
// Crawl a directory tree.
files := kitsune.ExpandMap(
kitsune.FromSlice([]string{"/root"}),
func(ctx context.Context, path string) *kitsune.Pipeline[string] {
entries, err := os.ReadDir(path)
if err != nil {
return nil
}
var children []string
for _, e := range entries {
if e.IsDir() {
children = append(children, filepath.Join(path, e.Name()))
}
}
return kitsune.FromSlice(children)
},
)
Bounded expansion: cap both depth and total entries:
// Walk at most 4 levels deep and at most 10 000 entries total.
files := kitsune.ExpandMap(
kitsune.FromSlice([]string{"/root"}),
func(ctx context.Context, path string) *kitsune.Pipeline[string] {
entries, err := os.ReadDir(path)
if err != nil {
return nil
}
var children []string
for _, e := range entries {
if e.IsDir() {
children = append(children, filepath.Join(path, e.Name()))
}
}
return kitsune.FromSlice(children)
},
kitsune.MaxDepth(4),
kitsune.MaxItems(10_000),
)
When either bound is reached the stage stops enqueueing children and closes its output channel normally; no error is returned, matching the semantics of Take(n). If both options are set, whichever limit fires first wins.
Pairwise
Emits overlapping consecutive pairs: {item[0], item[1]}, {item[1], item[2]}, {item[2], item[3]}, …. The first item is held internally; no pair is emitted until the second item arrives. A stream of n items produces n-1 pairs.
When to use: Computing deltas between consecutive values, detecting direction changes, change detection.
Options: Buffer, WithName.
deltas := kitsune.Map(
kitsune.Pairwise(prices),
kitsune.LiftPure(func(c kitsune.Consecutive[float64]) float64 {
return c.Curr - c.Prev
}),
)
Unbatch
Flattens a pipeline of slices into a pipeline of individual items. This is the inverse of Batch.
When to use: When you receive data in batches (bulk API response, database rows) and want to process items individually downstream.
Options: Buffer, WithName.
// API returns batches; process each item individually.
items := kitsune.Unbatch(kitsune.Map(pages, fetchPage))
Filtering & Selection
Filter
func Filter[T any](p *Pipeline[T], pred func(context.Context, T) (bool, error), opts ...StageOption) *Pipeline[T]
Emits only items for which pred returns true. Items for which pred returns false are silently dropped. Items for which pred returns an error halt the pipeline (unless OnError is set).
The method form on *Pipeline[T] accepts a simpler func(T) bool:
Options (free function): Buffer, Overflow, WithName. OnError applies if the predicate can return errors.
// Free function with context-aware predicate:
active := kitsune.Filter(users, func(ctx context.Context, u User) (bool, error) {
return subscriptionDB.IsActive(ctx, u.ID)
})
// Method form with simple predicate:
adults := users.Filter(func(u User) bool { return u.Age >= 18 })
IgnoreElements
Drains p for its side effects and emits nothing downstream. The returned pipeline completes (or errors) when p completes (or errors). Any Tap, Map, or other side-effecting operators in p still run.
Also available as a method: p.IgnoreElements().
When to use: You want a pipeline to run for its side effects (writes, metrics, logging) without forwarding any items to downstream consumers.
Options: none.
// Run a Tap-instrumented pipeline for its side effects only:
kitsune.IgnoreElements(
kitsune.Tap(events, func(_ context.Context, e Event) error {
metrics.Record(e)
return nil
}),
).Run(ctx)
// Method form:
events.Tap(metrics.Record).IgnoreElements().Run(ctx)
See also: ForEach (terminal; use when you own the run), Filter (keeps matching items).
Reject
func Reject[T any](p *Pipeline[T], pred func(context.Context, T) (bool, error), opts ...StageOption) *Pipeline[T]
The inverse of Filter. Emits only items for which pred returns false (discards items where pred is true). Also available as a method with func(T) bool.
Take
Emits the first n items and then stops the pipeline, signalling upstream sources to stop producing. Infinite sources like Ticker and Repeatedly stop cleanly when Take closes.
Also available as p.Take(n).
Options: none.
Drop
Discards the first n items, then forwards all subsequent items unchanged.
Also available as p.Drop(n) and p.Skip(n) (alias).
Options: none.
TakeWhile
Emits items as long as pred returns true. As soon as pred returns false, the pipeline stops; the item that failed the predicate is not emitted. Unlike Filter, which drops individual items, TakeWhile terminates the pipeline.
Options: none.
// Stop reading when we reach a sentinel record.
data := kitsune.TakeWhile(records, func(r Record) bool { return r.Type != "EOF" })
DropWhile
Discards items as long as pred returns true. The first item for which pred returns false (and all subsequent items) are forwarded.
Options: none.
// Skip header lines starting with '#'.
lines := kitsune.DropWhile(rawLines, func(s string) bool { return strings.HasPrefix(s, "#") })
TakeEvery
Emits every nth item starting with the first (index 0). Items at indices 1, 2, …, n-1 are dropped; the item at index n is emitted; and so on. Panics if n <= 0.
Options: none.
DropEvery
Drops every nth item (indices 0, n, 2n, …), forwarding all others. Panics if n <= 0.
Options: none.
Distinct
Emits only items that have not been seen before in the entire stream, using == equality. Keeps an in-memory set of all seen values; memory usage grows with the number of unique items.
Options: Buffer, WithName.
DistinctBy
func DistinctBy[T any, K comparable](p *Pipeline[T], keyFn func(T) K, opts ...StageOption) *Pipeline[T]
Like Distinct but uses keyFn to derive the comparison key, allowing deduplication of complex types by a single field.
Options: Buffer, WithName.
// Deduplicate events by their ID field.
unique := kitsune.DistinctBy(events, func(e Event) string { return e.ID })
Dedupe / DedupeBy
func Dedupe[T comparable](p *Pipeline[T], opts ...StageOption) *Pipeline[T]
func DedupeBy[T any, K comparable](p *Pipeline[T], keyFn func(T) K, opts ...StageOption) *Pipeline[T]
Suppresses duplicate items. By default (without DedupeWindow), deduplication is global: any item whose key was already seen anywhere in the stream is dropped. Dedupe uses == equality; DedupeBy uses a key function.
Use DedupeWindow(n) to change the deduplication scope:
DedupeWindow(0): global deduplication (default). All previously-seen keys are remembered.DedupeWindow(1): consecutive deduplication only. An item is dropped only if it is identical to the immediately preceding item. This is the classic "remove adjacent duplicates" behaviour.DedupeWindow(n)wheren > 1: sliding-window deduplication. An item is dropped if it appeared in the lastnitems.
Use WithDedupSet to provide a custom backend (Redis, Bloom filter) for global deduplication instead of the default in-process MemoryDedupSet.
Options: DedupeWindow, WithDedupSet, Buffer, WithName.
// Global dedup (default): drop any item seen previously in the stream.
unique := kitsune.DedupeBy(events, func(e Event) string { return e.ID })
// Consecutive-only: suppress adjacent duplicates.
changes := kitsune.DedupeBy(statusUpdates, func(s Status) string { return s.State },
kitsune.DedupeWindow(1),
)
// Sliding window: drop any item seen in the last 50 items.
recent := kitsune.Dedupe(values, kitsune.DedupeWindow(50))
DedupSet backends
Dedupe, DedupeBy, and ExpandMap all accept a WithDedupSet(backend) option to override their default in-process deduplication store. Three built-in backends are provided:
MemoryDedupSet
Unbounded in-memory set. Never evicts. Suitable for finite streams or when the key space is bounded. The default for all dedup operators.
BloomDedupSet
Bounded probabilistic set backed by a Bloom filter. Memory usage is fixed regardless of key-space size; items are never missed (zero false-negative rate), but a configured false-positive rate allows a small fraction of unseen keys to appear seen. Panics if expectedItems <= 0 or falsePositiveRate is not in (0, 1).
When to use: when the key space is unbounded but bounded memory is required and occasional false positives are acceptable (e.g. spam suppression, cache-miss avoidance).
TTLDedupSet
In-process deduplication set that forgets keys ttl after they were last added. Memory is bounded by the set of currently non-expired keys. Eviction is lazy: expired entries are purged on the next Contains or Add call; there is no background goroutine. Re-adding an existing key refreshes its expiry (touch semantics). Panics if ttl <= 0.
When to use: deduplicating a high-volume event stream over a sliding time window (e.g. suppress duplicate webhooks received within the last 5 minutes) without unbounded memory growth. Prefer over MemoryDedupSet when keys must be forgotten; prefer over BloomDedupSet when zero false positives are required.
// Suppress duplicate webhook deliveries within a 5-minute window.
set := kitsune.TTLDedupSet(5 * time.Minute)
unique := kitsune.DedupeBy(events, func(e Event) string { return e.ID },
kitsune.WithDedupSet(set),
)
RandomSample
Passes each item downstream with probability rate (in the range [0.0, 1.0]). Each item is evaluated independently; decisions are stateless. A rate of 1.0 passes all items; 0.0 drops all items.
When to use: Streaming telemetry sampling, load-shedding under high throughput, approximate analysis where a random subset is sufficient.
Comparison with TakeRandom: RandomSample is a streaming operator; it passes through items as they arrive and requires no buffering. TakeRandom buffers the entire stream and returns a fixed-size slice using reservoir sampling; use it when you need a precise count from a finite stream.
Options: Buffer, WithName.
// Forward roughly 5% of events to a debug trace topic.
sample := kitsune.RandomSample(events, 0.05)
Stateful Transforms
These operators inject a *Ref[S], a concurrent-safe state handle, into the stage function, enabling accumulators, counters, and running state across items.
Key / NewKey / Ref
Declares a typed, run-scoped state key. Declare keys as package-level variables. The initial value is used at the start of each runner.Run. Use StateTTL(d) to expire state after a period of inactivity.
A Ref[T] injected by MapWith provides:
Get(ctx): read current valueSet(ctx, value): overwriteUpdate(ctx, fn): atomic read-modify-writeUpdateAndGet(ctx, fn): atomic read-modify-write, returns new valueGetOrSet(ctx, fn): return existing or initialise
MapWith
func MapWith[I, O, S any](
p *Pipeline[I],
key Key[S],
fn func(context.Context, *Ref[S], I) (O, error),
opts ...StageOption,
) *Pipeline[O]
Like Map but injects a *Ref[S] carrying persistent state identified by key. At Concurrency(1) (the default), a single Ref is shared across all items in sequence; this is perfect for running totals, accumulators, or event counters. With Concurrency(n) > 1, each worker goroutine gets its own independent Ref (worker-local state).
State survives across items within a single runner.Run. Use WithStore at run time to persist state to Redis, DynamoDB, etc.
When to use: Running counters, sequence numbering, de-duplication with memory, rate tracking per pipeline run.
Options: Concurrency, OnError, Buffer, Overflow, WithName, Timeout, Supervise.
var seqKey = kitsune.NewKey[int]("seq", 0)
numbered := kitsune.MapWith(events, seqKey,
func(ctx context.Context, ref *kitsune.Ref[int], e Event) (NumberedEvent, error) {
n, err := ref.UpdateAndGet(ctx, func(n int) (int, error) { return n + 1, nil })
if err != nil {
return NumberedEvent{}, err
}
return NumberedEvent{Seq: n, Event: e}, nil
},
)
FlatMapWith
func FlatMapWith[I, O, S any](
p *Pipeline[I],
key Key[S],
fn func(context.Context, *Ref[S], I, func(O) error) error,
opts ...StageOption,
) *Pipeline[O]
Like FlatMap but with a *Ref[S] for persistent state. Each input can produce zero or more outputs while reading and writing state.
Options: Concurrency, OnError, Buffer, Overflow, WithName, Timeout, Supervise.
var windowKey = kitsune.NewKey[[]Event]("window", nil)
// Emit a window every 5 events.
windows := kitsune.FlatMapWith(events, windowKey,
func(ctx context.Context, ref *kitsune.Ref[[]Event], e Event, yield func([]Event) error) error {
buf, _ := ref.Get(ctx)
buf = append(buf, e)
if len(buf) >= 5 {
if err := yield(append([]Event(nil), buf...)); err != nil {
return err
}
buf = buf[:0]
}
return ref.Set(ctx, buf)
},
)
MapWithKey
func MapWithKey[I, O, S any](
p *Pipeline[I],
keyFn func(I) string,
key Key[S],
fn func(context.Context, *Ref[S], I) (O, error),
opts ...StageOption,
) *Pipeline[O]
Like MapWith but maintains one independent Ref[S] per entity key (derived from each item via keyFn). Items with the same key share state; items with different keys are isolated. This enables per-user, per-session, or per-device aggregation in a single stage.
When to use: Per-entity state in a multiplexed stream: event counts per user, session tracking, per-device rate limiting.
Options: Concurrency, OnError, Buffer, Overflow, WithName, Timeout, Supervise, WithKeyTTL.
var countKey = kitsune.NewKey[int]("event_count", 0)
counted := kitsune.MapWithKey(events,
func(e Event) string { return e.UserID },
countKey,
func(ctx context.Context, ref *kitsune.Ref[int], e Event) (Result, error) {
n, err := ref.UpdateAndGet(ctx, func(n int) (int, error) { return n + 1, nil })
if err != nil {
return Result{}, err
}
return Result{UserID: e.UserID, Count: n}, nil
},
)
High-cardinality eviction: On long-running pipelines with unbounded key spaces (user IDs, session tokens), use WithKeyTTL(d) to evict entries that have been inactive for longer than d. The next item for an evicted key starts from the initial value. Eviction is lazy (checked on the next access; no background goroutine). WithKeyTTL is independent of StateTTL: StateTTL expires the value held by a Ref; WithKeyTTL expires the map entry that holds the Ref.
// Evict per-user state after 15 minutes of inactivity.
kitsune.MapWithKey(events,
func(e Event) string { return e.UserID },
sessionKey,
handler,
kitsune.WithKeyTTL(15*time.Minute),
)
The run-level WithDefaultKeyTTL(d) sets the default TTL for all MapWithKey and FlatMapWithKey stages that do not specify their own WithKeyTTL. Per-stage WithKeyTTL(0) explicitly disables eviction even when a run-level default is set.
Supervise and state lifetime: When combined with Supervise, per-key Ref state IS preserved across supervised restarts within a single Run call: the keyed map is allocated once per run and captured by the stage's restarted loop, so a panic or error that triggers a restart does not zero the accumulated state. State is NOT preserved across separate Run calls with the default in-process store; callers that need cross-run durability must configure an external Store via WithStore.
FlatMapWithKey
func FlatMapWithKey[I, O, S any](
p *Pipeline[I],
keyFn func(I) string,
key Key[S],
fn func(context.Context, *Ref[S], I, func(O) error) error,
opts ...StageOption,
) *Pipeline[O]
Like MapWithKey but allows emitting zero or more outputs per item while maintaining per-key state.
Options: Concurrency, OnError, Buffer, Overflow, WithName, Timeout, Supervise, WithKeyTTL.
Batching & Windowing
BufferWith
func BufferWith[T, S any](p *Pipeline[T], closingSelector *Pipeline[S], opts ...StageOption) *Pipeline[[]T]
Collects items from p into a slice, emitting the accumulated buffer each time closingSelector fires. An empty buffer is never emitted. When the source closes, any remaining buffered items are flushed before the output closes. When closingSelector closes, any remaining buffered items are flushed and the output closes.
BufferWith generalizes Batch (fixed-size boundary) and BatchTimeout (periodic boundary) to arbitrary external boundary signals. Use it when the flush trigger is externally driven: heartbeats, quiescence signals, control channels, or upstream events.
When to use: - Flushing accumulated events on an external heartbeat or tick pipeline. - Coalescing bursts until a quiescence signal arrives on a separate channel. - Building custom batching policies that go beyond size or time.
Semantics:
- Items are emitted in input order; the flattened output preserves source ordering.
- If closingSelector fires while the buffer is empty, no batch is emitted.
- If closingSelector closes before the source, any remaining source items are not read.
- Context cancellation returns ctx.Err() without flushing.
- Panics if closingSelector is nil.
Options: Buffer, WithName.
// Flush buffered events on every heartbeat tick.
heartbeat := kitsune.Ticker(5 * time.Second)
batches := kitsune.BufferWith(events, heartbeat)
Batch
Collects items into []T slices and flushes them according to one or more flush triggers. At least one of BatchCount, BatchMeasure, or BatchTimeout must be provided; the stage panics at construction time if none are set. Multiple triggers can be combined: the batch flushes as soon as any trigger fires.
When the source closes, any remaining items are flushed as a partial batch. An empty batch is never emitted. With DropPartial(), the final partial batch is discarded when the source closes.
Flush triggers:
BatchCount(n): flush whennitems have accumulated.BatchMeasure(fn, n): flush when the cumulative measure (computed byfnfor each item) reachesn.BatchTimeout(d): flush afterdeven if the count or measure threshold has not been reached.
When to use: Bulk database inserts, batched API calls, reducing per-item overhead for expensive operations.
When not to use: When you need overlapping windows; use SlidingWindow or SessionWindow.
Options: BatchCount, BatchMeasure, BatchTimeout, DropPartial, WithClock, Buffer, WithName.
// Flush up to 100 items at a time, or after 500ms.
batches := kitsune.Batch(events,
kitsune.BatchCount(100),
kitsune.BatchTimeout(500*time.Millisecond),
)
// Flush when cumulative byte size reaches 64 KiB.
chunks := kitsune.Batch(messages,
kitsune.BatchMeasure(func(m Message) int { return len(m.Payload) }, 64*1024),
)
// Emit only full batches of 10; drop any trailing items.
chunks := kitsune.Batch(items, kitsune.BatchCount(10), kitsune.DropPartial())
MapBatch
func MapBatch[I, O any](
p *Pipeline[I],
size int,
fn func(context.Context, []I) ([]O, error),
opts ...StageOption,
) *Pipeline[O]
Collects items into batches of up to size, passes each batch to fn, and flattens the results back to individual items. This is syntactic sugar for Batch + FlatMap and is the right primitive for bulk external calls where output count equals input count (e.g., bulk database lookups).
The fn must return the same number of results as items in the batch.
Options: BatchTimeout, Concurrency, OnError, Buffer, WithName.
enriched := kitsune.MapBatch(userIDs, 200,
func(ctx context.Context, ids []int) ([]User, error) {
return db.BulkFetchUsers(ctx, ids)
},
)
Within
func Within[T any, O any](p *Pipeline[[]T], stage func(*Pipeline[T]) *Pipeline[O], opts ...StageOption) *Pipeline[[]O]
Applies a pipeline stage to the contents of each incoming slice, collecting the results back into a slice. Each []T item is unwound into a sub-pipeline, the stage function is applied, and the output items are gathered into a []O before being emitted downstream.
When to use: Transforming or filtering the elements inside batches produced by Batch, SlidingWindow, ChunkBy, or similar slice-emitting operators, without unwinding to individual items and re-batching.
Options: Buffer, WithName.
// Sort each batch before forwarding.
sorted := kitsune.Within(batches, func(p *kitsune.Pipeline[Event]) *kitsune.Pipeline[Event] {
return kitsune.Sort(p, func(a, b Event) bool { return a.Time.Before(b.Time) })
})
// Filter items within each chunk and keep non-empty chunks.
filtered := kitsune.Within(chunks, func(p *kitsune.Pipeline[Record]) *kitsune.Pipeline[Record] {
return kitsune.Filter(p, func(r Record) bool { return r.Valid })
})
SlidingWindow
Emits overlapping slices of exactly size items, advancing by step items each time. When step == size, this is a non-overlapping tumbling window. When step < size, windows overlap. Partial windows at the end of the stream are dropped. Panics if step <= 0 or step > size.
When to use: Rolling averages, sliding statistics, n-gram generation.
Options: Buffer, WithName.
// Moving average over 5-item windows, advancing 1 at a time.
windows := kitsune.SlidingWindow(prices, 5, 1)
SessionWindow
Groups items into sessions separated by periods of inactivity. A new session starts whenever no item arrives within gap. The accumulated session buffer is emitted when the gap timer fires. An empty session is never emitted. The final partial session is emitted when the source closes.
When to use: User session detection, grouping related events that occur close together in time.
Options: WithClock, Buffer, WithName.
// Group clicks into sessions with a 30-second inactivity timeout.
sessions := kitsune.SessionWindow(clickEvents, 30*time.Second)
ChunkBy
func ChunkBy[T any, K comparable](p *Pipeline[T], keyFn func(T) K, opts ...StageOption) *Pipeline[[]T]
Groups consecutive items that share the same key (returned by keyFn) into slices. A new chunk begins whenever the key changes. The last chunk is emitted when the source completes.
When to use: Run-length grouping: group consecutive log lines by severity, transactions by account.
Options: Buffer, WithName.
ChunkWhile
func ChunkWhile[T any](p *Pipeline[T], pred func(prev, curr T) bool, opts ...StageOption) *Pipeline[[]T]
Groups consecutive items into chunks while pred(prev, current) returns true. A new chunk begins when pred returns false.
Options: Buffer, WithName.
// Group ascending runs: [1,2,3,1,2] → [[1,2,3],[1,2]]
kitsune.ChunkWhile(nums, func(prev, curr int) bool { return curr > prev })
Fan-Out & Fan-In
Merge
Combines multiple pipelines of the same type into one. Items are emitted as they arrive from any source (race order). The merged pipeline completes when all inputs have completed.
When to use: Combining results from multiple concurrent sources: multiple Kafka partitions, multiple API endpoints running in parallel.
Options: none (buffer is fixed internally).
Partition
func Partition[T any](p *Pipeline[T], pred func(T) bool, opts ...StageOption) (*Pipeline[T], *Pipeline[T])
Splits a pipeline into two: items for which pred returns true go to the first pipeline; items for which pred returns false go to the second. Both pipelines must be consumed; use MergeRunners to run them together.
Options: Buffer, WithName.
valid, invalid := kitsune.Partition(records, func(r Record) bool { return r.Valid })
runner, _ := kitsune.MergeRunners(
valid.ForEach(store),
invalid.ForEach(logInvalid),
)
runner.Run(ctx)
Broadcast
Fans out each item to n identical output pipelines. Every item is delivered to every branch (synchronised fan-out). A slow branch backpressures upstream and all other branches. All n pipelines must be consumed. Panics if n < 2.
When to use: When you know the exact number of consumers at construction time and need each one to see every item: metrics + storage + audit.
Options: Buffer, WithName.
branches := kitsune.Broadcast(events, 3)
runner, _ := kitsune.MergeRunners(
branches[0].ForEach(storeEvent),
branches[1].ForEach(updateMetrics),
branches[2].ForEach(auditLog),
)
runner.Run(ctx)
Share
Returns a subscription factory for dynamic multicast. Call the returned function once per desired branch; each branch receives every item from p. Unlike Broadcast, the number of consumers does not need to be known upfront; branches are registered dynamically before Run is called.
Options passed to Share are defaults for all branches; options passed to individual subscribe calls override them per-branch.
When to use: When consumers are built in a loop, from config, or from a plugin registry, when Broadcast's fixed n is inconvenient.
Options on the factory: Buffer, WithName (defaults for all branches). Per-subscribe calls can also pass Buffer, WithName.
subscribe := kitsune.Share(events)
audit := subscribe(kitsune.WithName("audit"), kitsune.Buffer(1000))
metrics := subscribe(kitsune.WithName("metrics"), kitsune.Buffer(16))
if cfg.FraudEnabled {
fraud = subscribe(kitsune.WithName("fraud"))
}
runner, _ := kitsune.MergeRunners(
audit.ForEach(writeAudit),
metrics.ForEach(updateMetrics),
)
runner.Run(ctx)
Balance
Distributes items across n output pipelines in round-robin order. Each item goes to exactly one output. All n pipelines must be consumed.
When to use: Spreading load across n identical workers, each maintaining its own connection pool or state.
Options: Buffer, WithName.
branches := kitsune.Balance(jobs, 4)
runners := make([]kitsune.Runnable, 4)
for i, b := range branches {
runners[i] = b.ForEach(worker)
}
runner, _ := kitsune.MergeRunners(runners...)
runner.Run(ctx)
KeyedBalance
func KeyedBalance[T any](p *Pipeline[T], n int, keyFn func(T) string, opts ...StageOption) []*Pipeline[T]
Distributes items across n output pipelines by consistent hash of keyFn(item). All items with the same key always go to the same branch, enabling per-entity parallelism without cross-branch coordination. Pairs well with MapWithKey for sharded stateful workloads.
Options: Buffer, WithName.
// All events for the same userID go to the same branch.
branches := kitsune.KeyedBalance(events, 8, func(e Event) string { return e.UserID })
Zip / ZipWith
func Zip[A, B any](a *Pipeline[A], b *Pipeline[B]) *Pipeline[Pair[A, B]]
func ZipWith[A, B, O any](
a *Pipeline[A],
b *Pipeline[B],
fn func(context.Context, A, B) (O, error),
opts ...StageOption,
) *Pipeline[O]
Zip pairs items from a and b positionally into Pair[A, B] values. ZipWith pairs them and transforms the pair using fn. The pipeline completes when either input completes; the other's remaining items are discarded.
When to use: Correlating two aligned streams positionally: test inputs with expected outputs, requests with responses.
Options (ZipWith): Buffer, WithName.
Unzip
Splits a pipeline of Pair[A, B] into two separate pipelines. Both output pipelines must be consumed.
Options: Buffer, WithName.
LatestFrom / LatestFromWith
func LatestFrom[A, B any](main *Pipeline[A], other *Pipeline[B]) *Pipeline[Pair[A, B]]
func LatestFromWith[A, B, O any](
main *Pipeline[A],
other *Pipeline[B],
fn func(context.Context, A, B) (O, error),
opts ...StageOption,
) *Pipeline[O]
Combines each item from main with the most-recently-seen item from other. Items from main are only emitted after other has emitted at least one item. Items from other that arrive between main items update the "latest" snapshot but are not independently emitted.
This models a "sample the latest state of other on each main event" pattern: main drives the output rate; other provides current state.
When to use: Combining a high-frequency event stream with a low-frequency configuration or rate stream, e.g., apply the latest exchange rate to each transaction.
Options (LatestFromWith): Buffer, WithName.
// Apply the latest config to each incoming request.
processed := kitsune.LatestFrom(requests, configUpdates)
// Each Pair has: First=request, Second=most-recent config
CombineLatest / CombineLatestWith
func CombineLatest[A, B any](a *Pipeline[A], b *Pipeline[B]) *Pipeline[Pair[A, B]]
func CombineLatestWith[A, B, O any](
a *Pipeline[A],
b *Pipeline[B],
fn func(context.Context, A, B) (O, error),
opts ...StageOption,
) *Pipeline[O]
Emits a new value whenever either a or b emits, combining the latest values from each. Emitting begins only after both pipelines have emitted at least one item. Unlike LatestFrom, both pipelines drive the output.
When to use: UI state combinations, sensor fusion where you want a new output whenever either reading changes, e.g., combine temperature and humidity sensors into a comfort index.
Options (CombineLatestWith): Buffer, WithName.
// Recompute risk score whenever either signal updates.
risk := kitsune.CombineLatestWith(creditScore, marketIndex,
func(ctx context.Context, cs CreditScore, mi MarketIndex) (RiskScore, error) {
return computeRisk(cs, mi), nil
},
)
Enrichment
Enrichment operators bulk-fetch external data for a batch of items and attach it to each item. Keys are deduplicated before each fetch call; if multiple items share a key, only one lookup is made.
LookupBy
func LookupBy[T any, K comparable, V any](
p *Pipeline[T],
cfg LookupConfig[T, K, V],
opts ...StageOption,
) *Pipeline[Enriched[T, V]]
Enriches each item with a value fetched in bulk, emitting Enriched[T, V] (fields Item and Value). Items whose key is absent from the fetch result carry the zero value for V. LookupConfig carries:
Key func(T) K: extracts the lookup key from each itemFetch func(context.Context, []K) (map[K]V, error): bulk fetcherBatchSize int: how many items to collect before callingFetch(default: 100)BatchTimeout time.Duration: when non-zero, flushes a partial batch after the duration elapses with no new item. Without this, items sit in the internal buffer untilBatchSizeis reached or the source closes, which can introduce unbounded latency under low throughput.
Options: Buffer, WithName, BatchTimeout.
cfg := kitsune.NewLookupConfig(
func(e Event) string { return e.UserID },
func(ctx context.Context, ids []string) (map[string]User, error) {
return userDB.BulkFetch(ctx, ids)
},
)
withUsers := kitsune.LookupBy(events, cfg)
// each item: Enriched[Event, User]{Item: event, Value: user}
Enrich
func Enrich[T any, K comparable, V, O any](
p *Pipeline[T],
cfg EnrichConfig[T, K, V, O],
opts ...StageOption,
) *Pipeline[O]
Like LookupBy but calls a Join function to combine the item and its fetched value into the output type directly, without an intermediate Pair. EnrichConfig carries:
Key func(T) KFetch func(context.Context, []K) (map[K]V, error)Join func(T, V) OBatchSize int: default 100BatchTimeout time.Duration: when non-zero, flushes a partial batch after the duration elapses with no new item.
Options: Buffer, WithName, BatchTimeout.
cfg := kitsune.NewEnrichConfig(
func(e Event) string { return e.UserID },
func(ctx context.Context, ids []string) (map[string]User, error) {
return userDB.BulkFetch(ctx, ids)
},
func(e Event, u User) EnrichedEvent {
return EnrichedEvent{Event: e, UserName: u.Name}
},
)
enriched := kitsune.Enrich(events, cfg)
Aggregation & Collection
Running aggregates
Running aggregates emit a new value after each input item without waiting for the source to close.
Scan
Accumulates running state across items using fn, emitting the running state after each item. The first emission is fn(initial, firstItem). Unlike Reduce, Scan emits intermediate states as items arrive rather than waiting for the source to complete.
Options: Buffer, WithName.
// Running total of prices.
runningTotal := kitsune.Scan(prices, 0.0, func(acc float64, p float64) float64 {
return acc + p
})
RunningFrequencies / RunningFrequenciesBy
func RunningFrequencies[T comparable](p *Pipeline[T], opts ...StageOption) *Pipeline[map[T]int64]
func RunningFrequenciesBy[T any, K comparable](p *Pipeline[T], keyFn func(T) K, opts ...StageOption) *Pipeline[map[K]int64]
Like Frequencies/FrequenciesBy but emits a fresh count snapshot after each input item, as a pipeline. Each emitted map is a copy: safe to retain across iterations. Use this when downstream stages need to react to evolving counts; for a single buffered map use Frequencies with Single.
Options: Buffer, WithName.
Buffering aggregates
Buffering aggregates consume the entire stream before emitting their result. The source pipeline must be finite; use Single to extract the single emitted value.
GroupBy
func GroupBy[T any, K comparable](p *Pipeline[T], keyFn func(T) K, opts ...StageOption) *Pipeline[map[K][]T]
Buffers all input and emits a single map[K][]T when the source closes. Items are grouped by the key returned by keyFn; items with the same key appear in arrival order within their slice.
This is a buffering pipeline operator, not a terminal. The entire stream must fit in memory. Combine with Single to collect the result, or use downstream operators to process the map.
When to use: Partitioning a finite stream into named groups for further processing, batch reports, or building lookup tables.
Options: Buffer, WithName.
// Group events by type, then collect with Single.
grouped, err := kitsune.Single(ctx, kitsune.GroupBy(events,
func(e Event) string { return e.Type },
))
// grouped is map[string][]Event
Reduce
Folds all items into a single value using fn. The result is emitted exactly once when the source completes. If the source emits no items, initial is emitted. Unlike Scan, no intermediate values are emitted.
Options: Buffer, WithName.
total := kitsune.Reduce(prices, 0.0, func(acc, p float64) float64 { return acc + p })
// emits one value: the sum of all prices
ReduceWhile
func ReduceWhile[T, S any](p *Pipeline[T], initial S, fn func(S, T) (S, bool), opts ...StageOption) *Pipeline[S]
Folds items until fn signals stop by returning (state, false). The current state is emitted exactly once and no further items are consumed. If the source closes without fn returning false, the accumulated state is emitted on close. Empty input emits initial once.
This is a buffering pipeline operator. Use [Single] to extract the result.
// Sum until we exceed 1000.
partial, _ := kitsune.Single(ctx, kitsune.ReduceWhile(prices, 0.0,
func(acc float64, p float64) (float64, bool) {
next := acc + p
return next, next <= 1000.0
},
))
TakeRandom
Buffers all input and emits a single []T containing a random sample of up to n items using reservoir sampling (Algorithm R). Each item has an equal probability of being selected. The emitted slice has min(n, pipelineSize) items. Order within the slice is not guaranteed.
This is a pipeline (buffering) operator, not a terminal. Combine with Single to extract the result.
Options: Buffer, WithName.
ToMap / Frequencies / FrequenciesBy
func ToMap[T any, K comparable, V any](p *Pipeline[T], keyFn func(T) K, valueFn func(T) V, opts ...StageOption) *Pipeline[map[K]V]
func Frequencies[T comparable](p *Pipeline[T], opts ...StageOption) *Pipeline[map[T]int]
func FrequenciesBy[T any, K comparable](p *Pipeline[T], keyFn func(T) K, opts ...StageOption) *Pipeline[map[K]int]
Buffering pipeline operators that emit a single map on close. ToMap uses last-writer-wins for duplicate keys. Frequencies and FrequenciesBy count occurrences. Empty input emits one empty map.
These are pipeline operators: use Single to extract the result, or pipe the map into further stages.
For grouping into map[K][]T, use GroupBy.
Options: Buffer, WithName.
byID, err := kitsune.Single(ctx, kitsune.ToMap(users, func(u User) int { return u.ID }, func(u User) User { return u }))
counts, err := kitsune.Single(ctx, kitsune.Frequencies(kitsune.Map(events, extractType)))
Sort / SortBy
func Sort[T any](p *Pipeline[T], less func(a, b T) bool, opts ...StageOption) *Pipeline[T]
func SortBy[T any, K any](p *Pipeline[T], keyFn func(T) K, less func(a, b K) bool, opts ...StageOption) *Pipeline[T]
Collects all items, sorts them, then emits in sorted order. The source pipeline must be finite. This is a blocking, memory-intensive operation; the entire stream is buffered before any output is emitted.
Options: Buffer, WithName.
Scalar terminals
Scalar terminals run the pipeline and return a plain Go value directly; they are not Pipeline[T] values and cannot be composed further.
Collect / First / Last / Count / Any / All / Find / Contains
func Collect[T any](ctx context.Context, p *Pipeline[T], opts ...RunOption) ([]T, error)
func First[T any](ctx context.Context, p *Pipeline[T], opts ...RunOption) (T, bool, error)
func Last[T any](ctx context.Context, p *Pipeline[T], opts ...RunOption) (T, bool, error)
func Count[T any](ctx context.Context, p *Pipeline[T], opts ...RunOption) (int64, error)
func Any[T any](ctx context.Context, p *Pipeline[T], pred func(T) bool, opts ...RunOption) (bool, error)
func All[T any](ctx context.Context, p *Pipeline[T], pred func(T) bool, opts ...RunOption) (bool, error)
func Find[T any](ctx context.Context, p *Pipeline[T], pred func(T) bool, opts ...RunOption) (T, bool, error)
func Contains[T comparable](ctx context.Context, p *Pipeline[T], value T, opts ...RunOption) (bool, error)
Terminal collectors. All run the pipeline and block until completion. First, Any, All, Find, and Contains short-circuit; they stop the pipeline as soon as the answer is known. First and Last return (zero, false, nil) if the pipeline is empty.
All are also available as methods on *Pipeline[T] (except Find and Contains, which require type parameters).
items, err := kitsune.Collect(ctx, p)
first, ok, err := kitsune.First(ctx, p)
n, err := kitsune.Count(ctx, p)
found, err := kitsune.Any(ctx, p, func(v int) bool { return v > 0 })
Sum / Min / Max / MinMax
func Sum[T Numeric](ctx context.Context, p *Pipeline[T], opts ...RunOption) (T, error)
func Min[T any](ctx context.Context, p *Pipeline[T], less func(a, b T) bool, opts ...RunOption) (T, bool, error)
func Max[T any](ctx context.Context, p *Pipeline[T], less func(a, b T) bool, opts ...RunOption) (T, bool, error)
func MinMax[T any](ctx context.Context, p *Pipeline[T], less func(a, b T) bool, opts ...RunOption) (MinMaxResult[T], bool, error)
Terminal aggregators. Sum works on any Numeric type. Min and Max take a less comparator and return (zero, false, nil) if the pipeline is empty. MinMax computes both in a single pass and returns a MinMaxResult[T] with fields Min and Max.
total, err := kitsune.Sum(ctx, prices)
min, ok, err := kitsune.Min(ctx, prices, func(a, b float64) bool { return a < b })
MinBy / MaxBy
func MinBy[T any, K any](ctx context.Context, p *Pipeline[T], keyFn func(T) K, less func(a, b K) bool, opts ...RunOption) (T, bool, error)
func MaxBy[T any, K any](ctx context.Context, p *Pipeline[T], keyFn func(T) K, less func(a, b K) bool, opts ...RunOption) (T, bool, error)
Like Min/Max but compares items by a key derived from keyFn.
cheapest, ok, err := kitsune.MinBy(ctx, products,
func(p Product) float64 { return p.Price },
func(a, b float64) bool { return a < b },
)
SequenceEqual
func SequenceEqual[T comparable](ctx context.Context, a, b *Pipeline[T], opts ...RunOption) (bool, error)
Returns true if a and b emit the same items in the same order and have the same length.
Iter
func Iter[T any](ctx context.Context, p *Pipeline[T], opts ...RunOption) (iter.Seq[T], func() error)
Returns a Go 1.23 iter.Seq[T] iterator and an error function. Use the iterator with range. Call the error function after the loop (or after breaking out) to retrieve any pipeline error. Breaking out of the loop cancels the pipeline; the error function returns nil in that case.
Also available as p.Iter(ctx).
seq, errFn := kitsune.Iter(ctx, p)
for item := range seq {
process(item)
}
if err := errFn(); err != nil {
log.Fatal(err)
}
Time-Based Operators
Throttle
Emits at most one item per window duration. The first item in each window is emitted; subsequent items within the same window are silently dropped. This is "throttle-leading" or rate-limiting on item arrival.
When to use: Limiting how often a downstream stage is called, suppressing rapid event bursts while keeping the first event in each burst.
Options: WithClock, Buffer, WithName.
Debounce
Suppresses rapid bursts: an item is only emitted after no new items have arrived for silence. If items arrive faster than silence, only the last item in each burst is forwarded. The last pending item is flushed when the source closes.
When to use: Typeahead debouncing, saving documents after a user stops typing, coalescing rapid config changes.
Options: WithClock, Buffer, WithName.
// Fire a search only after 300ms of silence.
queries := kitsune.Debounce(keystrokes, 300*time.Millisecond)
Sample
Emits the most-recently-seen item from p at each tick of a d-duration interval. If no item has arrived since the last tick, that tick is skipped. Unlike Throttle (which fires on item arrival) and Debounce (which waits for quiet), Sample fires at a fixed wall-clock rate.
When to use: Live dashboards, periodic snapshots of high-frequency streams.
Options: WithClock, Buffer, WithName.
// Emit the latest quote at most once every 100ms.
sampled := kitsune.Sample(liveQuotes, 100*time.Millisecond)
SampleWith
Emits the most recent item from p whenever the sampler pipeline fires. If no item has arrived since the last sampler signal, that signal is skipped silently. The latest item is consumed on emit: if the sampler fires twice without a new source item arriving in between, only the first fire emits.
Unlike Sample (driven by a fixed wall-clock interval) and Throttle (rate-limits on item arrival), SampleWith is driven by an arbitrary pipeline. The sampler's item values are discarded; only the occurrence of each item matters.
The pipeline completes when the sampler closes. If the source closes and its last item has already been emitted, the pipeline also completes early.
When to use: Polling a high-frequency stream at an externally defined rate; for example, sampling sensor readings on each heartbeat, or snapshotting the latest price whenever a timer ticks.
Options: Buffer, WithName.
// Emit the latest quote once per second, driven by a Ticker.
clock := kitsune.Ticker(1 * time.Second)
polled := kitsune.SampleWith(liveQuotes, clock)
// Snapshot the latest sensor value on each heartbeat signal.
snapped := kitsune.SampleWith(sensorStream, heartbeatPipeline)
RateLimit
func RateLimit[T any](
p *Pipeline[T],
ratePerSec float64,
rlOpts []RateLimitOpt,
stageOpts ...StageOption,
) *Pipeline[T]
Limits throughput to ratePerSec items per second using a token bucket. In RateLimitWait mode (default), the pipeline blocks when the bucket is empty (backpressure). In RateLimitDrop mode, excess items are silently discarded.
Rate-limit options (rlOpts):
- Burst(n): allow short bursts of up to n tokens above the steady rate
- RateMode(RateLimitDrop): drop items instead of blocking
Options (stageOpts): Buffer, WithName.
// Allow up to 100 events/sec with bursts of up to 10.
limited := kitsune.RateLimit(events, 100, []kitsune.RateLimitOpt{kitsune.Burst(10)})
// Drop items that exceed 50/sec.
lossy := kitsune.RateLimit(events, 50, []kitsune.RateLimitOpt{kitsune.RateMode(kitsune.RateLimitDrop)})
Resilience
CircuitBreaker
func CircuitBreaker[I, O any](
p *Pipeline[I],
fn func(context.Context, I) (O, error),
cbOpts []CircuitBreakerOpt,
stageOpts ...StageOption,
) *Pipeline[O]
Wraps fn in a three-state circuit breaker:
- Closed (normal):
fnis called for every item. Consecutive failures increment a counter. - Open (tripped): after
FailureThresholdconsecutive failures, the circuit opens. All items immediately receiveErrCircuitOpenwithout callingfn. The circuit stays open forCooldownDuration. - Half-open (probing): after the cooldown, up to
HalfOpenProbesitems are tested. If all succeed, the circuit closes. If any fail, the circuit opens again.
The circuit breaker is built on top of Map, so all StageOption values apply. Use OnError(Skip()) to silently drop items while the circuit is open, or OnError(Return(zero)) to substitute a default.
Circuit-breaker options (cbOpts):
- FailureThreshold(n): consecutive failures to open (default: 5)
- CooldownDuration(d): open duration before probing (default: 10s)
- HalfOpenProbes(n): successes required to close from half-open (default: 1)
- HalfOpenTimeout(d): deadline on the half-open state
Options (stageOpts): Concurrency, OnError, Buffer, Overflow, WithName, Timeout.
results := kitsune.CircuitBreaker(requests, callExternalAPI,
[]kitsune.CircuitBreakerOpt{
kitsune.FailureThreshold(3),
kitsune.CooldownDuration(30 * time.Second),
kitsune.HalfOpenProbes(2),
},
kitsune.OnError(kitsune.Skip()), // drop items while open
kitsune.Concurrency(4),
)
MapPooled
func MapPooled[I, O any](
p *Pipeline[I],
pool *Pool[O],
fn func(context.Context, I, *Pooled[O]) error,
opts ...StageOption,
) *Pipeline[*Pooled[O]]
Transforms each item using fn, acquiring a pre-allocated *Pooled[O] from pool before each call. The result is the filled *Pooled[O] wrapper. Downstream code must call Release() on each received item (or ReleaseAll for batches); failing to release leaks pool objects.
If fn returns an error, the slot is automatically released back to the pool.
Use-after-release protection: Release() panics on double-call, so misuse is caught early rather than causing silent data corruption. Use buf.MustValue() instead of buf.Value when you want a panic on access after release (zero-overhead Value remains available for hot paths where you control the lifecycle).
When to use: High-throughput transforms where allocating a new output buffer per item is expensive: JSON encoding, protobuf marshalling, audio/video frame processing.
Options: Concurrency, OnError, Buffer, Overflow, WithName, Timeout.
pool := kitsune.NewPool(func() []byte { return make([]byte, 0, 4096) })
encoded := kitsune.MapPooled(events, pool,
func(ctx context.Context, e Event, out *kitsune.Pooled[[]byte]) error {
var err error
out.Value, err = json.Marshal(e)
return err
})
// Downstream: release each buffer after use.
encoded.ForEach(func(_ context.Context, buf *kitsune.Pooled[[]byte]) error {
defer buf.Release()
return conn.Write(buf.Value)
}).Run(ctx)
Utility & Metadata
WithIndex
Tags each item with its 0-based stream position. Emits Indexed[T]{Index int; Value T}.
Options: Buffer, WithName.
Tap / TapError / Finally
func Tap[T any](p *Pipeline[T], fn func(context.Context, T) error, opts ...StageOption) *Pipeline[T]
func TapError[T any](p *Pipeline[T], fn func(context.Context, error)) *Pipeline[T]
func Finally[T any](p *Pipeline[T], fn func(context.Context, error)) *Pipeline[T]
Side-effect operators that forward all items unchanged.
Tapcallsfnfor each item as a side effect. Iffnreturns an error, the pipeline halts.TapErrorcallsfnwhen the pipeline terminates with an error, then re-propagates the error.Finallycallsfnwhen the pipeline exits for any reason (success, error, or cancellation), then re-propagates the outcome.
The method forms accept simpler signatures: p.Tap(func(T)), p.TapError(func(error)), p.Finally(func(error)).
Options (Tap): Buffer, WithName.
p.Tap(func(e Event) { metrics.Inc("events_processed") }).
TapError(func(err error) { log.Printf("pipeline error: %v", err) }).
Finally(func(err error) { conn.Close() }).
ForEach(store).Run(ctx)
Scan (see Aggregation section)
Terminal Operators
ForEach
func (p *Pipeline[T]) ForEach(fn func(context.Context, T) error, opts ...StageOption) *ForEachRunner[T]
Returns a ForEachRunner that calls fn for every item. No processing occurs until Run or RunAsync is called.
ForEachRunner has:
- Run(ctx, opts...): blocks until complete
- RunAsync(ctx, opts...): runs in background, returns a RunHandle
- Build(): returns a Runner (retained for backwards compatibility; not required for MergeRunners)
With Concurrency(n) > 1, fn is called from n goroutines. Add Ordered() to call fn in input order even with concurrency.
Options: Concurrency, Ordered, OnError, Buffer, WithName, Timeout, Supervise.
err := events.ForEach(func(ctx context.Context, e Event) error {
return db.Insert(ctx, e)
}, kitsune.Concurrency(8), kitsune.OnError(kitsune.RetryMax(3, kitsune.FixedBackoff(100*time.Millisecond)))).
Run(ctx)
Single
Runs the pipeline and expects it to emit exactly one item. Returns that item on success. Returns an error if:
- The pipeline emits zero items (plain error, unless
OrDefaultorOrZerois supplied). - The pipeline emits more than one item (plain error, always).
- The pipeline itself returns an error.
Use OrDefault(v) to return a default value instead of an error on empty input, and OrZero[T]() to return the zero value of T instead.
When to use: Collecting the single output of a buffering aggregator such as GroupBy, TakeRandom, or Reduce, where you expect exactly one output item.
// Collect a grouped map.
grouped, err := kitsune.Single(ctx, kitsune.GroupBy(events, keyFn))
// With a default if the pipeline is empty.
val, err := kitsune.Single(ctx, maybeEmpty, kitsune.OrDefault(0))
// With zero value instead of ErrEmpty.
val, err := kitsune.Single(ctx, maybeEmpty, kitsune.OrZero[int]())
Drain
Discards all items. Useful for running a pipeline for its side effects (e.g., when all work is done by Tap stages) without collecting any output.
Runner / RunAsync
func (r *Runner) Run(ctx context.Context, opts ...RunOption) error
func (r *Runner) RunAsync(ctx context.Context, opts ...RunOption) *RunHandle
Run executes the pipeline, blocking until completion. RunAsync starts the pipeline in a background goroutine and returns a RunHandle.
RunHandle provides:
- Wait() error: block until done
- Done() <-chan struct{}: closed when done
- Err() <-chan error: receives exactly one value
- Pause() / Resume() / Paused(): pause/resume source stages
Run options:
- WithStore(s Store): state backend for MapWith, FlatMapWith
- WithHook(h Hook): observability hook
- WithDrain(timeout): graceful drain on context cancellation
- WithCache(cache, ttl): default cache backend for CacheBy stages
- WithErrorStrategy(h): pipeline-wide default error handler
- WithPauseGate(gate): attach an external gate
- WithCodec(c): serialisation codec for state and cache
- WithDefaultBuffer(n): default channel buffer size for all stages (default 16); per-stage Buffer(n) takes precedence
- WithDefaultKeyTTL(d): default inactivity TTL for all MapWithKey and FlatMapWithKey stages; per-stage WithKeyTTL takes precedence
handle := runner.RunAsync(ctx)
// ... later ...
if err := handle.Wait(); err != nil {
log.Fatal(err)
}
MergeRunners
Combines multiple terminal stages into one runner. Use this when a pipeline forks (via Partition, Broadcast, Share) into multiple terminal branches that must run together on a shared graph.
Both *Runner and *ForEachRunner[T] satisfy Runnable, so terminal stages can be passed directly without calling Build().
valid, invalid := kitsune.Partition(records, isValid)
runner, _ := kitsune.MergeRunners(
valid.ForEach(store),
invalid.ForEach(logInvalid),
)
runner.Run(ctx)
Stage Composition
Stage[I, O] / Then / Through / Or
type Stage[I, O any] func(*Pipeline[I]) *Pipeline[O]
func Then[I, M, O any](s Stage[I, M], next Stage[M, O]) Stage[I, O]
func (s Stage[I, O]) Apply(p *Pipeline[I]) *Pipeline[O]
func (p *Pipeline[T]) Through(s Stage[T, T]) *Pipeline[T]
func Or[I, O any](primary, fallback func(context.Context, I) (O, error), opts ...StageOption) Stage[I, O]
Stage[I, O] is a composable pipeline transformer: a function from *Pipeline[I] to *Pipeline[O]. It lets you name and reuse multi-step pipeline fragments.
Then chains two stages: the output of s becomes the input of next.
Apply is syntactic sugar for calling the stage as a function.
Through applies a same-type stage to a pipeline inline; useful for chaining stages that preserve the element type.
Or creates a Stage that tries primary and falls back to fallback if primary returns an error. Both functions are called with the same item.
If both primary and fallback return errors, the returned error wraps both via errors.Join so neither is silently discarded. Both causes are inspectable with errors.Is / errors.As.
// Define reusable pipeline stages.
var ParseStage kitsune.Stage[string, Event] = func(lines *kitsune.Pipeline[string]) *kitsune.Pipeline[Event] {
return kitsune.Map(lines, func(ctx context.Context, line string) (Event, error) {
return parseJSON(line)
})
}
var EnrichStage kitsune.Stage[Event, EnrichedEvent] = func(events *kitsune.Pipeline[Event]) *kitsune.Pipeline[EnrichedEvent] {
return kitsune.Map(events, enrich, kitsune.Concurrency(4))
}
// Chain them.
pipeline := kitsune.Then(ParseStage, EnrichStage)
result := pipeline(kitsune.FromSlice(rawLines))
// Or with Through for same-type stages:
normalised := kitsune.FromSlice(rawLines).
Through(normalize).
Through(deduplicate)
// Or for fallback:
fetch := kitsune.Or(fetchFromCache, fetchFromDB, kitsune.WithName("fetch"))
Error Handling Options
Error handling is configured per-stage with OnError(handler) or pipeline-wide with WithErrorStrategy(handler) in run options.
Halt
Stop the pipeline on the first error. This is the default.
Skip
Drop the failing item and continue processing subsequent items.
Return
Replace the failed item with val and continue. In FlatMap stages, behaves like Skip.
Type safety caveat: ErrorHandler is not parameterized on the stage's output type. The type parameter T on Return is inferred from val and is not checked against the stage's output type at compile time. If they do not match, the substitution silently fails at runtime: the original error is propagated as though Halt has been used. Use a typed variable or prefer TypedReturn (see below) for a compile-time guarantee.
Return can be composed as a fallback inside RetryThen and RetryIfThen. TypedReturn cannot.
TypedReturn
A type-safe alternative to OnError(Return(val)). The output type O is verified at the call site, so a mismatch between val and the stage's output type is a compile-time error rather than a silent runtime fallback to Halt:
TypedReturn returns a StageOption directly, so it cannot be composed inside RetryThen or RetryIfThen. For composed retry chains, use Return with a typed variable:
var fallback User
kitsune.OnError(kitsune.RetryThen(3, kitsune.FixedBackoff(time.Second), kitsune.Return(fallback)))
In FlatMap stages, TypedReturn behaves like ActionDrop because FlatMap has no single replacement value to emit.
RetryMax / RetryThen
func RetryMax(n int, b Backoff) ErrorHandler
func RetryThen(n int, b Backoff, fallback ErrorHandler) ErrorHandler
Retry the current item up to n times with backoff b. RetryMax halts after exhausting retries; RetryThen delegates to fallback (e.g., ActionDrop()).
These are error handlers for use with OnError; they retry the individual item's transformation function, not the pipeline as a whole. To re-subscribe to an entire upstream source on failure, use the Retry operator instead.
Backoff helpers
func FixedBackoff(d time.Duration) Backoff
func ExponentialBackoff(initial, max time.Duration) Backoff
kitsune.OnError(kitsune.RetryThen(3,
kitsune.ExponentialBackoff(100*time.Millisecond, 5*time.Second),
kitsune.Skip(),
))
Combining OnError and Supervise
OnError and Supervise operate at different levels and can be used together on the same stage. OnError is evaluated per item; Supervise is evaluated when the stage loop itself crashes. The evaluation order is: OnError runs first; only when its final decision is Halt (including after retry exhaustion) does Supervise see the error and decide whether to restart the stage.
Stateful stages under Supervise: For stateful stages (MapWith, MapWithKey, FlatMapWith, FlatMapWithKey), per-key Ref state is preserved across supervised restarts within a single Run call: the key map is allocated once per run and captured by the restarted loop. State is only lost when the surrounding process terminates and a new Run starts; for cross-run durability, configure an external Store via WithStore.
See the Error Handling guide for the full evaluation model, common combination patterns (retry-then-restart, skip-unless-fatal-then-restart), and observability.
Stage Options Reference
| Option | Type | Applies to | Description |
|---|---|---|---|
Concurrency(n) |
StageOption |
Map, FlatMap, MapWith, FlatMapWith, MapWithKey, FlatMapWithKey, ForEach |
Run n goroutines in parallel. Default: 1. |
Ordered() |
StageOption |
Map, FlatMap |
Emit results in input order when Concurrency > 1. |
OnError(h) |
StageOption |
Map, FlatMap, MapWith, MapWithKey, ForEach, CircuitBreaker |
Per-stage error handler. Default: Halt(). |
Buffer(n) |
StageOption |
All operators | Channel buffer size between this stage and the next. Default: 16. |
Overflow(s) |
StageOption |
Map, FlatMap, Filter, and most transforms |
What to do when the output buffer is full: Block (default), DropNewest, DropOldest. |
WithName(s) |
StageOption |
All operators | Label the stage for metrics, traces, and Pipeline.Describe(). |
Timeout(d) |
StageOption |
Map, FlatMap, MapWith, FlatMapWith |
Per-item deadline. Cancels the item's context after d. |
Supervise(policy) |
StageOption |
Map, FlatMap, MapWith, ForEach |
Restart the stage on error or panic. See RestartOnError, RestartOnPanic, RestartAlways. |
BatchCount(n) |
StageOption |
Batch |
Flush when n items have accumulated. Required if neither BatchMeasure nor BatchTimeout is set. |
BatchMeasure(fn, n) |
StageOption |
Batch |
Flush when the cumulative measure across buffered items reaches n. |
BatchTimeout(d) |
StageOption |
Batch, MapBatch |
Flush a partial batch after d even if it is not full. |
DedupeWindow(n) |
StageOption |
Dedupe, DedupeBy |
Deduplication scope: 0 = global (default), 1 = consecutive only, n > 1 = sliding window of last n items. |
WithClock(c) |
StageOption |
Ticker, Timer, Batch, Throttle, Debounce, Sample, SessionWindow, Timestamp, TimeInterval |
Substitute a deterministic clock for testing. |
CacheBy(keyFn) |
StageOption |
Map only |
Enable TTL-based result caching. On a hit, fn is skipped. Requires WithCache at run time or CacheBackend. |
WithDedupSet(s) |
StageOption |
Dedupe, DedupeBy, ExpandMap |
External deduplication backend (Redis, Bloom filter). |
VisitedBy(keyFn) |
StageOption |
ExpandMap |
Enable cycle detection by key during graph walks. |
MaxDepth(n int) |
StageOption |
ExpandMap |
Cap BFS depth to n levels below roots. 0 = roots only; default unlimited. |
MaxItems(n int) |
StageOption |
ExpandMap |
Cap total items emitted to n. Stage closes normally when cap is hit. Default unlimited. |
WithKeyTTL(d) |
StageOption |
MapWithKey, FlatMapWithKey |
Evict per-key Ref entries after d of inactivity. 0 disables (default). Overrides WithDefaultKeyTTL. |