Kitsune Internals
This document explains how Kitsune works under the hood: the data structures, concurrency model, and runtime machinery that powers every pipeline. It is aimed at contributors and at users who want a mental model deeper than the public API.
The big picture
A Kitsune pipeline is a directed acyclic graph (DAG) of processing stages.
That graph is assembled lazily during pipeline construction: no goroutines
start, no channels are allocated, and then materialised at a single point when
Run (or Collect) is called.
flowchart LR
subgraph construction ["Construction time"]
A[FromSlice] --> B[Map] --> C[Filter] --> D[ForEach]
end
subgraph runtime ["Run time"]
E[validate] --> F[wire channels] --> G[launch goroutines]
end
construction -- "Run(ctx)" --> runtime
The public kitsune package is the runtime. Each operator constructs a typed
*Pipeline[T] carrying a build closure; no channels are allocated and no
type information is lost until Run is called. Generics are preserved
end-to-end: there is no type erasure in the hot path.
Pipeline[T] and stageMeta
Pipeline[T] is the core type: a lazy, reusable blueprint for one stage of
computation. It holds:
id int: a process-unique stage ID (assigned at construction time via an atomic counter,nextPipelineID).meta stageMeta: static description of the stage (name, kind, inputs, concurrency, buffer size, …).build func(*runCtx) chan T: the closure that materialises the stage whenRunis called. It recursively callsbuildon upstream pipelines, allocates a fresh typed channel, registers the stage'sstageFuncinto therunCtx, and returns the channel so downstream stages can read from it. The closure is memoised viarunCtx: if this stage was already built in the currentRun, the existing channel is returned immediately without re-registering: this is what makes diamond (shared-upstream) graphs safe.fusionEntry func(*runCtx, func(ctx, T) error) stageFunc: non-nil for fast-path-eligibleMapandFilterstages (see Stage fusion below).consumerCount atomic.Int32: incremented at construction time by every operator that consumes this pipeline.fusionEntryis safe to use only whenconsumerCount == 1.
stageMeta holds introspection data for a single stage:
stageMeta {
id int
name string
kind string // "map", "filter", "source", "batch", …
inputs []int // upstream stage IDs
concurrency int
buffer int
overflow internal.Overflow
batchSize int
timeout time.Duration
hasRetry bool
hasSuperv bool
getChanLen func() int // set during build; queries live channel length
getChanCap func() int // set during build; queries channel capacity
}
runCtx is created fresh on every Runner.Run call. As build functions
are called recursively from the terminal back to sources, each stage registers
its stageFunc here and its output channel is memoised by stage ID:
runCtx {
stages []stageFunc // goroutine bodies, one per stage
metas []stageMeta // static descriptions, one per stage
chans map[int]any // stage ID → typed channel (type-erased for storage)
hook internal.Hook
refs *refRegistry // keyed state, populated during build phase
done chan struct{} // closed by early-exit stages (Take, TakeWhile)
// …cache, codec fields
}
inputs []int on stageMeta lists the upstream stage IDs this stage reads
from. It is almost always a single element; the exceptions are multi-input nodes
(Zip, LatestFrom, Merge). Multi-output nodes (Partition, MapResult)
are implemented as two independent Pipeline values that share the same build
closure and memoisation ID: the second call to build returns the already-
memoised channel immediately.
Stage kind reference
The kind string on stageMeta is used for graph visualisation and hooks. Each
kind corresponds to a stage runner function:
| Kind string | Stage runner | Channels out |
|---|---|---|
"source" |
inline in source build closure | 1 |
"map" |
runMapSingle / runMapConcurrent / runMapConcurrentOrdered |
1 |
"flatmap" |
same variants as map | 1 |
"filter" |
runFilter |
1 |
"tap" |
part of Map fast path | 1 |
"take" |
runTake |
1 |
"takewhile" |
runTakeWhile |
1 |
"batch" |
runBatch |
1 |
"reduce" |
runReduce |
1 (emits once on close) |
"throttle" |
runThrottle |
1 |
"debounce" |
runDebounce |
1 |
"partition" |
runPartition |
2 (match / rest) |
"mapresult" |
runMapResult |
2 (ok / error) |
"broadcast" |
runBroadcast |
N |
"merge" |
runMerge |
1 |
"zip" |
runZip |
1 |
"latest_from" |
runLatestFrom |
1 |
"sink" / "foreach" |
runSink |
0 (terminal) |
Channel wiring
Channels are not allocated in a centralised pass. Instead, each operator's
build closure allocates its own typed output channel inline when called:
// Inside Map's build closure:
ch := make(chan O, cfg.buffer)
rc.setChan(id, ch) // memoised by stage ID for diamond-graph safety
The result is stored in rc.chans map[int]any keyed by stage ID. Downstream
stages retrieve their input channel by calling rc.getChan(upstreamID) and
type-asserting to the expected chan T.
flowchart LR
N0["stage 0\nSource"] -->|"chan T (id=0)"| N1["stage 1\nMap"]
N1 -->|"chan U (id=1)"| N2["stage 2\nFilter"]
N2 -->|"chan U (id=2)"| N3["stage 3\nSink"]
Multi-output stages (Partition, MapResult, Broadcast) are implemented as
two (or N) independent Pipeline values that share the same stage ID. The first
build call allocates the channel and registers the stage function; subsequent
calls hit the memoisation guard and return the existing channel:
flowchart LR
P["Partition\n(id=n)"] -->|"chan T true branch"| MA[match pipeline]
P -->|"chan T false branch"| MB[rest pipeline]
Each build closure wraps its raw channel in an Outbox (via
internal.NewOutbox or internal.NewBlockingOutbox) that enforces the overflow
strategy configured on that stage (see next section).
Outboxes and overflow
Every stage writes to its output through an Outbox, never to the raw channel
directly. The Outbox interface has two methods:
This indirection is where the three overflow strategies diverge:
Block (default): a simple select that sends the item or returns
ctx.Err() on cancellation. Zero overhead; no counters.
DropNewest: a non-blocking try-send. If the buffer is full the incoming
item is discarded immediately. An atomic counter records the drop and
OverflowHook.OnDrop is called if the hook implements it. No locks.
DropOldest: evicts the oldest buffered item to make room. The fast path (buffer has space) is identical to Block. The slow path (buffer full) takes a mutex, reads one item from the channel to free a slot, then writes the new item. The mutex is held only during that eviction, so normal sends remain contention-free.
flowchart TD
S[Send called] --> Q{buffer full?}
Q -->|no| W[write item to channel]
Q -->|yes, Block| BL[block until space\nor ctx cancelled]
Q -->|yes, DropNewest| DN[discard incoming item\nOnDrop hook]
Q -->|yes, DropOldest| DO[read oldest item\nwrite new item\nOnDrop hook for evicted]
How Run ties it all together
flowchart TD
A["Runner.Run"] --> B["newRunCtx()"]
B --> C["r.terminal(rc)\nrecursive build — allocates channels,\nregisters stageFuncs into rc"]
C --> D["rc.refs.init()\nmaterialise state keys"]
D --> E["GraphHook.OnGraph\nif implemented"]
E --> F["BufferHook.OnBuffers\nif implemented"]
F --> G{DrainTimeout > 0?}
G -->|yes| H["runWithDrain\n(kitsune.go)"]
G -->|no| I["internal.RunStages\nerrgroup per stage"]
H --> J["stageFunc × N goroutines"]
I --> J
r.terminal(rc) starts the recursive build: the terminal stage calls build
on its upstream pipeline, which calls build on its upstream, and so on back
to the source. Each build closure allocates a typed channel and appends a
stageFunc to rc.stages. When the recursion unwinds, every stage is
registered and every channel is wired.
Each stage then runs in its own goroutine inside an errgroup. When any
goroutine returns a non-nil error the group's shared context (egCtx) is
cancelled, causing every other goroutine to see ctx.Done() on its next check
and exit.
Each build closure registers a stage goroutine that does two things before
entering its processing loop:
- Defers closing its output channel(s) on exit: this is how downstream stages
learn the stream is exhausted (
range inChterminates, orok == false). - Wraps the processing function in
internal.Supervise(seeinternal/process.go) if supervision is configured.
// Conceptual shape of each build closure's registered stageFunc:
func(ctx context.Context) error {
defer close(outCh) // closes output channel(s) on exit
return internal.Supervise(ctx, policy, hook, name, inner)
}
The done channel: early exit without context cancellation
When Take or TakeWhile decides no more items are needed, it must signal
upstream sources to stop. Cancelling the shared context would prematurely abort
downstream stages that are still draining in-flight items.
Instead, a separate done chan struct{} (closed via a sync.Once) is used.
Sources check it on every yield:
select {
case <-done: return false // stop producing — clean exit
case <-ctx.Done(): return false
default:
}
runSource goes a step further: it derives a srcCtx from the parent context
that is also cancelled when done fires, so source functions that block on
<-ctx.Done() (e.g. infinite generators) wake up correctly:
flowchart LR
P[parentCtx] -->|derives| SC[srcCtx]
D["done channel\nclosed by Take"] -->|"cancels"| SC
SC -->|passed to| SF[source function]
SC -->|"ctx.Err() suppressed\n= clean exit"| RS[runSource]
This means sources never need to know about done directly: they only see a
context that happens to cancel when the pipeline no longer needs them.
Cooperative drain: unblocking producers on early exit
When a downstream stage exits early (for example, Take(1) after reading one
item), it stops reading from its input channel. The upstream stage may be
blocked on a full-buffer send with no one left to drain it. Without
intervention, that upstream stage goroutine would leak indefinitely.
The old approach: DrainChan goroutines
The original fix was a deferred goroutine in every stage:
DrainChan reads from inCh until it is closed, unblocking any producer that
is stuck on a send. This is correct, but on a 20-stage pipeline with Take(1),
teardown spawns 20 simultaneous goroutines. In pipelines that cycle frequently
(via Retry) this creates sustained goroutine pressure.
Why ctx.Done() does not help
The errgroup context is cancelled only when a stage returns a non-nil error.
Take(1) and TakeWhile return nil on success, so ctx is never cancelled
in the happy-path case. Upstream stages cannot exit via <-ctx.Done() and must
be unblocked some other way.
The cooperative drain protocol
Each stage owns a drain-notify entry: a ref-counted channel that closes when all downstream consumers of that stage have exited. Instead of spawning a goroutine, a consumer signals its producer directly, and the producer exits through its existing select loop. The cascade propagates upstream without any additional goroutines.
Data structures (in pipeline.go):
type drainEntry struct {
ch chan struct{}
refs atomic.Int32
}
// field on runCtx:
drainNotify map[int64]*drainEntry // producerID → entry
Three methods on runCtx:
// initDrainNotify registers a drain entry for producerID.
// consumerCount is clamped to at least 1; non-fusion stages always
// have consumerCount == 0 at build time.
func (rc *runCtx) initDrainNotify(producerID int64, consumerCount int32)
// signalDrain decrements the ref count. When it reaches zero the
// drainNotify channel is closed, unblocking the producer's select.
func (rc *runCtx) signalDrain(producerID int64)
// drainCh returns the closed-on-drain channel for the given stage ID.
func (rc *runCtx) drainCh(id int64) <-chan struct{}
Pattern in every converted intermediate stage:
// build phase:
rc.initDrainNotify(id, out.consumerCount.Load())
drainFn := func() { rc.signalDrain(p.id) } // signal MY upstream
drainCh := rc.drainCh(id) // listen for MY drain signal
// stage goroutine:
defer close(outCh)
cooperativeDrain := false
defer func() {
if !cooperativeDrain {
go internal.DrainChan(inCh) // fallback: unblock unconverted upstreams
}
}()
defer drainFn() // signal upstream first (LIFO — fires before DrainChan check)
for {
select {
case item, ok := <-inCh:
// ... process ...
select {
case outCh <- result:
case <-drainCh: cooperativeDrain = true; return nil
}
case <-ctx.Done(): return ctx.Err()
case <-drainCh: cooperativeDrain = true; return nil
}
}
The cooperativeDrain flag suppresses the DrainChan goroutine when the stage
exits via drainCh (all downstream consumers have gone; the producer's exit was
already cooperative). When the stage exits via context cancellation or an error,
DrainChan still fires as a fallback to unblock any unconverted upstream.
Terminal stages (e.g. ForEach) always spawn DrainChan unconditionally as
a fallback: they are leaf nodes with no drainCh of their own. Because every
upstream stage is now cooperative, this fallback fires only when a stage exits
via context cancellation or error (not on the cooperative-drain path), so no
goroutine burst occurs on normal teardown.
Cascade pattern
Repeatedly → Map → Map → Take(1) → ForEach
Take exits (nil)
→ defer drainFn() signals Map2's drainNotify
→ Map2's drainCh fires → Map2 exits (cooperativeDrain=true)
→ defer drainFn() signals Map1's drainNotify
→ Map1's drainCh fires → Map1 exits (cooperativeDrain=true)
→ defer drainFn() signals Repeatedly's drainNotify
→ Repeatedly's drainCh fires → Repeatedly exits (cooperativeDrain=true)
Zero extra goroutines on the cooperative path.
Conversion status
The full operator rollout is complete. Every operator uses cooperative drain.
The cooperativeDrain = false fallback (which falls back to go internal.DrainChan)
still exists in each stage but fires only on context-cancelled or error exits,
not on the cooperative-drain path.
| Operator group | Status |
|---|---|
All sources (FromSlice, Repeatedly, Generate, Channel, etc.) |
Cooperative drain |
Map, Filter (all paths including concurrent/ordered/fused) |
Cooperative drain |
Take, TakeWhile, Drop, DropWhile, TakeUntil, SkipUntil |
Cooperative drain |
ForEach (serial and fast-path) |
Dual-defer bridge (DrainChan fallback + drainFn) |
Batch, BufferWith, ChunkBy, ChunkWhile, Window, SlidingWindow, SessionWindow |
Cooperative drain |
FlatMap, ConcatMap, SwitchMap, ExhaustMap (all variants) |
Cooperative drain |
Merge, Zip, CombineLatest, WithLatestFrom, SampleWith |
Cooperative drain |
Partition, Broadcast, Balance, KeyedBalance, Share |
Cooperative drain |
MapWith, MapWithKey, FlatMapWith, FlatMapWithKey (all variants) |
Cooperative drain |
Scan, Reduce, GroupByStream, FrequenciesByStream, all aggregates |
Cooperative drain |
Unzip, MapResult (fan-out, multi-output) |
Cooperative drain |
All remaining operators (RateLimit, Dematerialize, Pairwise, etc.) |
Cooperative drain |
Concurrency patterns inside a stage
Single worker (default)
The inner loop is a straightforward for { select { case item := <-inCh: … } }.
No synchronisation beyond the channel itself.
Concurrent unordered: Concurrency(n)
runMapConcurrent spawns n worker goroutines that all read from the same
input channel. The channel is Go's natural work queue: no extra synchronisation
needed for item distribution.
flowchart LR
I[inCh] --> W0[worker 0]
I --> W1[worker 1]
I --> W2[worker 2]
W0 --> O[outbox\nthread-safe]
W1 --> O
W2 --> O
Error coordination uses an errOnce/firstErr/innerCancel triple: the first
worker to hit an error atomically records it, calls innerCancel() to stop
the others, and a sync.WaitGroup ensures the caller waits for all workers
before returning the error.
Concurrent ordered: Concurrency(n) + Ordered()
runMapConcurrentOrdered preserves input order using a slot pipeline:
flowchart LR
I[inCh] --> D[dispatcher\nallocates slots\nenqueues to pending]
D -->|jobs chan| W0[worker 0]
D -->|jobs chan| W1[worker 1]
D -->|pending chan\nslot pointers in order| C[collector\nblocks on slot.done\nemits in order]
W0 -->|"fills slot.result"| C
W1 -->|"fills slot.result"| C
C --> O[outbox]
A slot is {result any; err error; done chan struct{}}. The dispatcher
creates one slot per item, sends the slot pointer to both jobs (for a worker
to fill) and pending (to maintain order). Workers run concurrently; the
collector always drains pending in arrival order, blocking on <-slot.done
before forwarding each result.
Fan-out: Partition, MapResult, and Broadcast
runPartition evaluates a boolean predicate and routes each item to one
of two outboxes. Every item goes to exactly one branch (port 0 = true, port
1 = false).
runMapResult applies a transformation function and routes based on whether
it succeeded. Successful outputs go to port 0; items where the function returns
an error go to port 1, wrapped in an ErrItem{Item, Err} by the
MapResultErrWrap function stored on the node. Unlike regular Map, it never
invokes the ErrorHandler: every error is always routed, never halted or
retried.
Both Partition and MapResult follow the same shared-ID pattern in the build
closures and are treated identically in the BufferHook buffer-query closure.
runBroadcast sends every item to all N outboxes sequentially. Because the
sends are sequential, a slow consumer on one branch will backpressure the entire
broadcast. Size buffers generously on broadcast branches when consumers run at
different speeds.
All fan-out nodes close all of their output channels on exit, cascading shutdown down every branch.
Fan-in: Merge and Zip
runMerge spawns one goroutine per input channel. All goroutines write to
the same shared outbox:
flowchart LR
I0[inCh 0] --> G0[goroutine 0]
I1[inCh 1] --> G1[goroutine 1]
Ik["inCh k"] --> Gk[goroutine k]
G0 --> O[shared outbox]
G1 --> O
Gk --> O
O --> Out[outCh]
The output channel is closed once all input goroutines have exited. Errors use
the same errOnce/innerCancel coordination as concurrent map.
runZip reads from two input channels sequentially, not concurrently:
for {
a, ok := <-inCh1 // blocks until item or close
b, ok := <-inCh2 // then blocks for the partner
outbox.Send(ctx, convert(a, b))
}
The sequential read means: if inCh1 is producing faster than inCh2, items
accumulate in inCh1's buffer while runZip waits for inCh2. Buffer the
faster branch generously (Buffer option) when the two sources run at different
rates.
LatestFrom
runLatestFrom maintains a mutex-protected "latest secondary value" that is
updated by a background goroutine, while the main loop combines primary items
with that value:
flowchart LR
S[secondaryCh] --> BG["background goroutine\nmu.Lock()\nlatest = item\nhasValue = true\nmu.Unlock()"]
BG --> L[latest value\nprotected by mu]
P[primaryCh] --> ML["main loop\nmu.Lock()\nread hasValue, latest\nmu.Unlock()"]
L --> ML
ML -->|"convert(primary, latest)"| O[outbox]
ML -->|"!hasValue → drop"| X[dropped silently]
Primary items that arrive before the secondary has emitted a single value are
silently dropped: this matches RxJS semantics. The background goroutine exits
when secondaryCh is closed or ctx is cancelled.
Independent-graph support: LatestFrom (like Merge and Zip) works
with pipelines from separate graphs. When the two pipelines share a graph the
engine-native node is used; otherwise the secondary pipeline drains into a
mutex-protected latest value in a background goroutine while the primary is
forwarded through a channel: mirroring the engine implementation but at the
Generate layer. The Partition pattern is still useful when config updates
and primary events are multiplexed into the same source channel:
cfgBranch, reqBranch := kitsune.Partition(src.Source(), func(e Event) bool {
return e.IsConfig
})
combined := kitsune.LatestFrom(reqBranch, cfgBranch)
Time-based stages: Throttle and Debounce
Both stages capture their duration directly in the build closure as a
time.Duration local variable.
runThrottle records the lastEmit timestamp. Each incoming item is
compared against the elapsed time: if now - lastEmit >= d, the item is emitted
and lastEmit is updated; otherwise the item is dropped. Items dropped this way
trigger OverflowHook.OnDrop.
stateDiagram-v2
[*] --> Idle
Idle --> Emit : item arrives\n(no prior emission)
Emit --> Cooldown : emit item,\nrecord lastEmit
Cooldown --> Emit : item arrives,\nnow - lastEmit ≥ d
Cooldown --> Cooldown : item arrives,\nnow - lastEmit < d\n[drop item]
runDebounce keeps a single pending slot and a resettable time.Timer.
Each incoming item replaces pending and resets the timer. When the timer fires
with no new arrivals, pending is emitted. On input close, any remaining
pending item is flushed immediately.
The timer management is careful to drain timer.C after a Stop() that may
have already fired: a standard Go timer-reset pattern to avoid receiving a
stale tick on the next select.
Reduce and Scan
runReduce accumulates into a single value using the seed value captured in
its build closure. It does not emit on every item; it only emits when the
input channel closes:
acc := seed // closed over from construction time
for item := range inCh { acc = fn(acc, item) }
outbox.Send(ctx, acc) // emits once
This means Reduce always emits exactly one value: even on an empty stream
(it emits the seed).
Scan is implemented at the kitsune layer as a Map with a closure that
closes over an accumulator variable. It emits after every item rather than
waiting for close, so it is purely an operator-layer concept with no special
engine support.
Batching
runBatch accumulates items in a typed slice and flushes when either the
size limit is reached or a timeout fires.
stateDiagram-v2
[*] --> Empty
Empty --> Accumulating : first item arrives\n(start timer if timeout set)
Accumulating --> Accumulating : item arrives\n(append to batch)
Accumulating --> Empty : batch full\n(flush, stop timer)
Accumulating --> Empty : timer fires\n(flush partial batch)
Accumulating --> Empty : inCh closed\n(flush remaining items)
The timer is off when the batch is empty, started on the first item,
and reset after each flush. This ensures a partial batch always drains
within timeout of its first item regardless of upstream throughput.
The final flush on channel close is what makes graceful drain work for batch stages: once upstream sources stop and their channels close, any partial batch is emitted rather than silently discarded.
Cache integration
When a Map stage uses CacheBy, the construction-time code stores a
cacheWrapFn: a factory that produces a cache-wrapped replacement for the
stage function. The factory is not invoked at construction time; it is deferred
to build time so it can receive runner-level defaults (WithCache) from rc.
Inside the Map build closure, the cache wrapper is resolved before the
stageFunc is registered:
fn := userFn
if cfg.cacheWrapFn != nil {
fn = cfg.cacheWrapFn(rc.cache, rc.cacheTTL)
}
// fn is now closed over by the stageFunc registered into rc
Because Pipeline[T] build closures are called fresh on every Runner.Run, the
resolved function is scoped to a single run: there is no shared mutable state
that could be corrupted across repeated runs.
The Timeout StageOption wraps the user function at construction time (before
the cacheWrapFn factory is built), ensuring both the direct call path and any
cache-miss path get the per-item deadline:
construction time: userFn → timeout-wrapped fn → cacheWrapFn factory (closes over it)
build time (Run): factory invoked → cache-wrapped fn
cache miss path calls the timeout-wrapped inner fn
State management
runCtx carries a refRegistry for pipeline-level state (pipeline.go):
inits:map[string]func(Store, Codec) any: registered during the build phase byMapWith/FlatMapWithbuild closures. Associates a key name with a factory that creates a*Ref[T]given the store backend and codec.vals:map[string]any: populated byrc.refs.init(store, codec)at run time. Each factory is called once, producing the concrete*Ref[T]that stages share.
The registry uses a sync.RWMutex. register (build phase) and init (once,
before any stage goroutine starts) take the write lock; get, called from stage
goroutines during execution, takes only the read lock. This reflects the
lifecycle invariant: vals is append-only during build and init, then strictly
read-only for the remainder of the run. The write-lock release in init
establishes the happens-before edge that lets all subsequent stage goroutines
safely observe the fully-populated map without additional synchronisation.
Stage functions that use MapWith/FlatMapWith close over rc.refs.get(name):
they receive the materialised ref from vals, not the factory. This means the
same pipeline definition can be run against different store backends simply by
passing a different WithStore(s) run option.
Supervision
internal.Supervise is a zero-cost abstraction when inactive
(MaxRestarts == 0 && OnPanic == PanicPropagate): it calls the stage function
directly and returns, with no overhead.
When active, it wraps each execution in runProtected: a defer recover()
guard: and loops up to MaxRestarts times:
flowchart TD
A[supervise] --> B{policy active?}
B -->|no| C[call inner directly]
B -->|yes| D[runProtected\ndefer recover]
D --> E{result?}
E -->|nil| F[return nil]
E -->|error| G{budget left?}
E -->|panic, PanicRestart| G
E -->|panic, PanicPropagate| H[re-panic]
E -->|panic, PanicSkip| I[discard item\ncontinue]
G -->|yes| J[SupervisionHook.OnStageRestart\nbackoff sleep\nretry]
J --> D
G -->|no| K[return errBudgetExhausted]
The Window field resets the restart counter after a quiet period, preventing
a stage with occasional hiccups from eventually exhausting its budget.
Observability hooks
The hook system uses optional interface extension: a single base Hook
with several opt-in extensions checked via type assertion at run time. This
means existing Hook implementations never need to be updated when new
extension points are added.
| Interface | When called | Use case |
|---|---|---|
Hook (base) |
Stage start/stop, per-item | Metrics, logging |
OverflowHook |
Item dropped by overflow strategy or Throttle | Drop counters |
SupervisionHook |
Stage restarted after error or panic | Alerting |
SampleHook |
~every 10th successful item | Value-level tracing |
GraphHook |
Once before execution, with full DAG snapshot | Topology export |
BufferHook |
Once before execution, with a channel-fill query fn | Backpressure dashboards |
GraphHook.OnGraph receives a []GraphNode snapshot of the compiled graph,
including node IDs, kinds, inputs, concurrency, and buffer sizes. This fires before any stage
starts, making it useful for registering metric labels or rendering a static
topology view.
BufferHook.OnBuffers receives a func() []BufferStatus closure. The
closure captures the live channel map and returns {Stage, Length, Capacity}
for every non-sink node when called. The hook implementation calls this closure
periodically (e.g. every 250 ms) to track fill levels over time. The kotel
tail uses this to register an OTel Int64ObservableGauge with a
metric.WithInt64Callback so the OTel SDK pulls fresh buffer readings on each
collection interval.
Graceful drain
When WithDrain(timeout) is set, Run uses a two-phase shutdown:
sequenceDiagram
participant Caller
participant Monitor as monitor goroutine
participant Stages as pipeline stages
Caller->>Monitor: parentCtx cancelled
Note over Monitor: Phase 1
Monitor->>Stages: signalDone() — sources stop
Note over Stages: sources exit cleanly,\ndownstream stages drain
alt drains within timeout
Stages->>Monitor: drainCtx completes naturally
else timeout exceeded
Monitor->>Stages: drainCancel() — hard stop
end
The key design decision is that stages run on drainCtx, an independent
context derived from context.Background(), not from parentCtx. Cancelling
parentCtx therefore does not directly stop any stage. Only two events can
cancel drainCtx:
- The drain timeout fires (
drainCancel()is called by the monitor). - A stage returns an error (the errgroup cancels
egCtx).
The monitor goroutine lives outside the errgroup. If it were inside, a
normal pipeline completion would leave the monitor blocking on
<-parentCtx.Done() forever, and eg.Wait() would never return. The
defer drainCancel() in runWithDrain unblocks the monitor after
eg.Wait() returns, allowing a clean exit.
Type safety
The current engine is fully typed end-to-end. Pipeline[T] carries the
item type as a Go type parameter; every channel is a concrete chan T; every
stage function closes over typed values directly. There is no any in the hot
path and no type assertions at item-processing time.
The only place any appears is in runCtx.chans map[int]any, which stores
channels type-erased for storage. Each build closure performs a single
type-assertion when retrieving its input channel:
This assertion happens once per stage per Run, not per item. It is safe
because the only code that sets a channel in rc.chans is the build closure for
that specific stage: the type is always correct by construction.
rc.refs.vals map[string]any similarly stores *Ref[T] values type-erased.
Again, the retrieval is a one-time assertion scoped to the build phase.
The practical effect is that type errors surface as compile errors at the API
level (wrong fn signature passed to Map), not as panics at run time.