Roadmap Archive
Completed milestones, preserved for reference. Active work is in roadmap.md.
Developer experience
- [x]
WithStore(store)option for persistent inspector state (2026-04-11):inspector.New(inspector.WithStore(store))attaches anInspectorStoreto the Inspector. State (graph topology, per-stage counters, log ring buffer) is restored on construction and saved every 250 ms and onClose.inspector.NewMemoryInspectorStore(logTTL)is the built-in in-process implementation with optional log TTL. AStoreErr() erroraccessor surfaces non-fatal store errors. Seeinspector/store.goanddoc/inspector.md.
Code organization
- [x] Split large implementation files by concern:
operator.go(2118 lines),state.go(1924 lines), andfan.go(979 lines) split into 11 focused files.operator.go→operator_map.go,operator_flatmap.go,operator_filter.go,operator_take.go,operator_transform.go.state.go→state_ref.go,state_with.go,state_withkey.go.fan.go→fan_out.go,fan_combine.go. Shared helpers (orDefault,itemContext,reportErr) extracted tohelpers.go. Emptyfusion.godeleted. No public API changes; all symbols remain in the root package.
Operators
-
[x]
TapError: side-effect on the error path without consuming or rerouting the error. Natural complement toTap: observe errors for logging, metrics, or alerting while letting them propagate unchanged. Fires a callbackfunc(ctx, error)and re-returns the original error. Context cancellation does not trigger the callback. Implemented viaGenerate/ForEach(same pattern asCatch) to observe the terminal error from the upstream run. -
[x]
Finally: guaranteed cleanup hook that fires when a stage exits (completion, cancellation, or error). Useful for resource tracking, test assertions, and teardown logic that must run regardless of how the pipeline terminates. Implemented viaGenerate/ForEach(same pattern asTapErrorandCatch). On early consumer stop (e.g. downstreamTake), fn receives nil. -
[x]
Using[T, R]: acquire a resource, build a pipeline from it, release the resource on exit. Signature:Using(acquire func(ctx) (R, error), build func(R) *Pipeline[T], release func(R)). Eliminates boilerplatedeferpatterns aroundRuncalls for resource-bound stages (DB connections, file handles, etc.). If acquire fails, release is not called. Otherwise release is guaranteed exactly once viadefer. -
[x]
ExpandMap[T]: recursive BFS expansion.fn(item)returns a new*Pipeline[T]; all emitted items are fed back throughfnagain until the inner pipeline is empty. The defining primitive for tree traversal, recursive API pagination, and graph walks. Emits items at depth N before depth N+1. fn may return nil for leaf nodes.ExpandMapFuncprovided as a context-free convenience wrapper. Accepts...StageOption:WithName(stage label for hooks/metrics),Buffer, andVisitedBy(keyFn)(cycle detection: skips items whose key was already seen, along with their subtrees; defaults toMemoryDedupSet, overridable withWithDedupSet). -
[x]
KeyedBalance[T]: content-based fan-out by consistent hash. Routes all items with the same key to the same downstream branch, enabling per-entity parallelism without lock contention. ComplementsMapWithKeyfor stateful workloads. Counterpart to the round-robinBalance. -
[x]
Share: hot multicast without a fixed subscriber count. UnlikeBroadcast(n),Sharelets downstream stages subscribe at any point; late subscribers receive items from the moment they attach. Returns a factory (func(...StageOption) *Pipeline[T]); each call creates a new branch. All branches share one fan-out stage with synchronised blocking delivery (same semantics asBroadcast). Late-subscriber semantics: every registered branch receives items from the start of execution; no replay, no runtime subscription. Per-branchBufferandWithNameare supported; factory opts act as defaults. Calling the factory afterRun()has started panics. A single subscriber is allowed (unlikeBroadcast's n≥2 requirement). -
[x]
SwitchMap[I, O]: cancels the active inner pipeline when a new upstream item arrives, then starts a fresh one. The defining pattern for search-as-you-type and any scenario where a new request supersedes the previous one. -
[x]
ExhaustMap[I, O]: ignores new upstream items while an inner pipeline is still active. The defining pattern for "submit once, wait for completion" flows: form submissions, idempotent API calls, debounced writes. -
[x]
ConcatMap[I, O]: sequential inner pipelines; each inner pipeline completes before the next starts. -
[x]
MapResult[I, O]: returns(*Pipeline[O], *Pipeline[ErrItem[I]]); both branches share one stage and must be consumed together (same rule asPartition). -
[x]
MapRecover[I, O]:fnis called for each item;recover func(ctx, I, error) Ois called on error, producing a fallback value instead of propagating the error. -
[x]
Return[T]error handler:OnError(Return[T](val))replaces a failed item with a caller-supplied default value instead of dropping it (Skip) or halting the pipeline (Halt). Essential for enrichment pipelines where a failed lookup should produce a sentinel rather than a gap. (Planned as a standaloneOnErrorReturnStageOption; landed as theReturn[T]ErrorHandler used viaOnError.) -
[x]
MapBatch[I, O]: collects up tosizeitems, passes the slice tofn, flattens results; supportsBatchTimeout,Concurrency,OnError. Built onBatch+FlatMap. -
[x]
LookupBy[T, K, V]: batches items, deduplicates keys, callsFetch(ctx, []K) (map[K]V, error), emitsPair{Item, Value}; items whose key is absent receive the zero value forV.BatchSizedefaults to 100. -
[x]
Enrich[T, K, V, O]: likeLookupBybut callsJoin func(T, V) Oto produce the output directly. -
[x]
SessionWindow: gap-based window that closes after a configurable period of inactivity:kitsune.SessionWindow(p, gap time.Duration). Produces[]Tslices likeWindowandSlidingWindow. -
[x]
ChunkBy/ChunkWhile: consecutive same-key grouping and consecutive predicate grouping, respectively. -
[x]
WindowByTime: tumbling time window; emits a[]Tslice per elapsed interval. NamedWindowByTimeto avoid collision with the count-basedWindow(p, n). -
[x]
ConsecutiveDedup/ConsecutiveDedupBy: emit only when the value (or extracted key) changes; stateless per-item comparison, not set-based. Safe by engine design atConcurrency(1). -
[x]
CountBy/SumBy: streaming operators that emit a full snapshot of the count/sum map after each item; run atConcurrency(1). Compose withThrottleorDebouncefor periodic snapshots. -
[x]
MapIntersperse[T, O]: appliesfnto each item then insertssepbetween consecutive mapped outputs. -
[x]
RateLimit: token-bucket viagolang.org/x/time/rate;RateLimitWait(backpressure) andRateLimitDrop(skip excess) modes;Burst(n),RateMode(m)options. -
[x]
CircuitBreaker: Closed → Open → Half-Open state machine built onMap;FailureThreshold(n),CooldownDuration(d),HalfOpenProbes(n)options; emitsErrCircuitOpenwhen open. -
[x]
Pool[T]/MapPooled:NewPool[T](newFn),Pooled[T]withRelease(),MapPooled[I,O]acquires a slot → calls fn → emits*Pooled[O];ReleaseAllbulk helper. -
[x]
TakeEvery/DropEvery/MapEvery: operate on every Nth item. -
[x]
WithIndex: tags each item with its 0-based position, emittingIndexed[T]{Index, Value}. -
[x]
Intersperse: inserts a separator value between consecutive items. -
[x]
StartWith[T]: prepend one or more items before a pipeline. -
[x]
DefaultIfEmpty[T]: emit a default value when the upstream produces no items. -
[x]
Contains[T comparable]: terminal: returnstrueif any item equals the given value; stops early on first match. -
[x]
ElementAt: terminal: return the item at a 0-based index;(zero, false, nil)if the stream is shorter than the index.ErrEmptykept for this operator. -
[x]
ToMap[T, K, V]: terminal: collect the stream intomap[K]V; duplicate keys: last value wins. -
[x]
SequenceEqual[T comparable]: terminal: compare two finite pipelines item-by-item. -
[x]
Timestamp[T]: tag each item with wall-clock time; emitsTimestamped[T]{Value, Time}. RespectsWithClock. -
[x]
TimeInterval[T]: tag each item with elapsed duration since the previous item; emitsTimedInterval[T]{Value, Elapsed}; first item hasElapsed == 0. RespectsWithClock.
Sources & composition helpers
-
[x] Push-based source:
Channel[T]withSend,TrySend,Close; the canonical pattern for bridging external event streams into a pipeline. -
[x] Time sources:
Ticker,Timer; respectWithClockfor deterministic tests. -
[x] Generative sources:
Unfold,Iterate,Repeatedly,Cycle;Concat(factory-based, strictly ordered). -
[x]
Amb[T]: race multiple pipeline factories; forward items exclusively from whichever factory emits first, cancelling all others. Useful for redundant sources, failover, and latency hedging. -
[x]
Stage[I, O]: named function type (func(*Pipeline[I]) *Pipeline[O]); zero-cost pipeline transformer compatible withMap.Thencomposes two stages;Pipeline.Throughis the method form.Stage.Or(fallback)tries the primary and, on no output, falls back with the same input. -
[x]
LiftPure/LiftFallible: ergonomic wrappers for context-free, error-free (func(I) O) and fallible (func(I) (O, error)) functions; removes the most common friction point when writing simple transforms.
Multi-graph and composition
-
[x] Universal multi-stream operators:
Merge,Zip,ZipWith, andWithLatestFromall accept pipelines from independent graphs. Same-graph inputs use the engine-native node (fast path); independent inputs fall back to aGenerate-based implementation that runs each pipeline concurrently. -
[x]
CombineLatest[A, B]: symmetric counterpart toWithLatestFrom: either side emitting triggers an output, always paired with the latest value from the other side. -
[x]
Balance[T]: round-robin fan-out: each item goes to exactly one of N output pipelines. Completes the fan-out vocabulary alongsideBroadcastandPartition.
State management
-
[x]
Key[T]+Ref[T]+Storemodel:NewKey[T](name, initial, opts...)declares a named piece of typed run-scoped state.Ref[T]is the concurrent-safe handle injected into stage functions; API:Get,Set,Update,UpdateAndGet,GetOrSet. In-memory by default (mutex); Store-backed whenWithStoreis configured. -
[x]
MapWith/FlatMapWith: likeMap/FlatMapbut the function receives a*Ref[S]for mutable per-run state; runs atConcurrency(1). -
[x]
MapWithKey/FlatMapWithKey: variant where each item is routed to a per-item-keyRefshard viaitemKeyFn func(I) string; enables per-user session tracking, per-device state machines, per-entity rate limiting without users managing maps manually. TheStoreinterface supports arbitrary string keys, so the implementation delta is entirely in the public API andReflookup path. -
[x] State TTL / expiry: optional TTL parameter to
NewKey:kitsune.NewKey("session", SessionState{}, kitsune.StateTTL(30*time.Minute)).Ref.Getreturns the zero value and resets the slot when the TTL has elapsed.MemoryStoreuses lazy expiry on read; distributed backends (Redis, DynamoDB) get this via their native TTL mechanisms. -
[x]
CacheBywired intoMap:rc.cache/rc.cacheTTL/rc.codecthreaded throughrunCtx;Map's build closure wrapsfnwith cache lookup/write whenCacheByis set; stage-levelCacheBackend/CacheTTLoverride runner defaults. -
[x]
WithDedupSetwired into operators:DistinctByandDedupeBycheckcfg.dedupSet; when set, uses the external backend with string keys; forDedupeBy, presence of a set upgrades consecutive dedup to global dedup. -
[x]
BloomDedupSet: a probabilistic Bloom filter backend forDedupSet; bounded memory with approximate correctness, useful when exact dedup is not required and the key space is huge. Implemented ininternal/bloom.gousinghash/maphashdouble-hashing (Kirsch-Mitzenmacker) with a[]uint64bit array. Constructor:BloomDedupSet(expectedItems int, falsePositiveRate float64); panics on invalid input. Thread-safe viasync.RWMutex. Zero false-negative rate guaranteed; false-positive rate bounded by the configured probability.
Foundation & typed engine
-
[x]
Pipeline[T]blueprint model:Pipeline[T]is a lazy description carrying abuild func(*runCtx) chan Tclosure; channels are allocated fresh on everyRun()via a memoisingrunCtx, making pipelines reusable and diamond-graph-safe. Replaced the priorchan anyengine entirely, eliminating all per-item boxing. Result: ~21 M/s, 47 allocs/run (0/item) for the trivial Map→Filter→Drain benchmark. -
[x]
Runner/RunAsync/RunHandle:Runner.Runblocks until the pipeline completes;Runner.RunAsyncreturns aRunHandlewithPause(),Resume(),Wait(),Done(), andErr().MergeRunnerscombines forked terminals into a single run so all branches drain together. -
[x] Stage options:
Concurrency,Ordered,Buffer,Overflow,WithName,Timeout,OnError,Supervise,WithSampleRate,WithDedupSet. -
[x] Run options:
WithHook,WithStore,WithCodec,WithCache,WithDrain,WithPauseGate. -
[x] Error handlers & backoff:
Retry,RetryThen,Return,Skip;FixedBackoff,ExponentialBackoff. -
[x] Supervision:
RestartAlways,RestartOnError,RestartOnPanic. -
[x] Hooks:
GraphHook,BufferHook,SampleHook,SupervisionHook,OverflowHook;MultiHook,LogHook; sharedgithub.com/zenbaku/go-kitsune/hooksmodule so tails implement the interface once and satisfy both the engine and any tail-level consumer without conversion.
Error handling & reliability
-
[x] Structured errors: errors returned by
Runner.Runcarry the originating stage name, attempt number, and underlying cause in a typedStageErrorstruct. -
[x] Dead-letter routing:
DeadLetter[I, O]wraps a function with retry; items that exhaust all retries route to a second*Pipeline[ErrItem[I]]branch.DeadLetterSinkdoes the same for sink functions, returning the dead-letter pipeline and the runner separately so the caller can wire them before callingRun. -
[x]
ErrItem[I any]:struct { Item I; Err error }used byDeadLetter,DeadLetterSink, andMapResult.
API correctness & completeness
-
[x]
RestartOnPanicmust not restart on regular errors:RestartOnPanicandRestartAlwayswere identical: both restarted on errors and panics. Fixed by adding aPanicOnlyguard to the supervision loop. -
[x]
MapBatchopts double-application:optswere forwarded to both the internalBatchandFlatMapstages. Fixed by extracting onlyBatchTimeoutfor the collection stage and passing all opts to the processing stage. -
[x]
Dedupecontext and error propagation:Dedupeusedcontext.Background()for store calls and silently discardedAdderrors. Rewritten as aMapnode so the pipeline context flows through andAdderrors halt the pipeline. -
[x] Mutable-state closures made explicit:
SlidingWindowandScanclosed over mutable state with no synchronization. Both now passConcurrency(1)explicitly.ConsecutiveDedup/ConsecutiveDedupByare safe by engine design. -
[x]
Min/Max/MinMaxare O(1) memory: rewritten usingReduce+ anoptional[T]accumulator.MinBy/MaxByuse abyAcc[T,K]variant. All return(T, bool, error);bool=false, err=nilfor empty stream. -
[x]
MinMaxreturnsPair[T,T]: fields namedFirst/Second. -
[x]
MinBy/MaxBy/SortByrestorelessparameter:less func(a, b K) boolrestored; dropped thecmp.Orderedconstraint onK. -
[x]
ScanandDedupeacceptStageOption:ScanappendsConcurrency(1)last to enforce sequential execution;DedupeusesWithDedupSet(s DedupSet)to accept a custom backend. -
[x]
Frequencies/FrequenciesByconcurrency guard: enforceConcurrency(1)explicitly. Terminal forms:Frequencies(ctx, p) (map[T]int, error)andFrequenciesBy(ctx, p, keyFn) (map[K]int, error). Streaming variants kept asFrequenciesStream/FrequenciesByStream. -
[x]
GroupByterminal:GroupBy(ctx, p, keyFn) (map[K][]T, error). Streaming ordered-by-first-seen variant available asGroupByStream(emits oneGroup[K,T]per distinct key when the source closes; was namedGroupByOrderedduring development). -
[x] Terminals as methods:
Collect,First,Last,Count,Any,All,Find,ReduceWhileadded as methods on*Pipeline[T]alongside the existingForEach. -
[x]
MapResultbranching return: returns(*Pipeline[O], *Pipeline[ErrItem[I]]).
Observability
-
[x] Per-item span propagation (
ContextCarrier): items implementContextCarrier(Context() context.Context) to carry a trace span from their origin (HTTP request, queue message, etc.). The engine threads the item's context into stage function calls; cancellation still comes from the pipeline stage context, item context contributes values only; so stage functions calltracer.Start(ctx, "my-work")normally with no signature changes. Zero overhead for items that don't implement the interface.koteldocuments the pattern; per-item child spans appear automatically in any OTel-compatible backend. Not yet covered: fan-in link propagation forBatch: when N items with individual span contexts are collected into a batch, OTel recommends creating the batch span withtrace.WithLinks(...)referencing each item's span. This requires either a newBatchHookinterface thatkotelimplements, called by theBatchstage with the collected items before forwarding the slice, or a convention where the[]Tbatch type itself implementsContextCarrierwith a pre-merged context. Deferred until a concrete use case drives the design. -
[x] Configurable
SampleHookrate:OnItemSamplefired every 10th item, hard-coded. AddedWithSampleRate(n int)RunOption; pass a negative value to disable sampling entirely. -
[x] Structured stage metadata in
GraphNode: exposesBatchSize,Timeout,HasRetry, andHasSupervisionin addition toKind,Name,Concurrency,Buffer, andOverflow. -
[x]
MetricsHook: lock-free atomic counters per stage (processed, errors, dropped, restarts, total processing time);Stage(name)/Snapshot()/Reset();Snapshot().JSON();AvgLatency()onStageMetrics; implementsHook + OverflowHook + SupervisionHook. -
[x] Hook engine wiring:
OnStageStart,OnItem(with timing),OnStageDonecalled by all stage runners;LogHookand all other hooks fire correctly. -
[x]
Pipeline.Describe(): return aGraphNodetree representing the wired pipeline without executing it. Enables static validation that a pipeline is correctly assembled, and unit-testing of graph structure, without needing a fullRun. Complements the liveinspectordashboard.
Performance
-
[x]
[]anyallocation pressure in FlatMap:FlatMapnow takes a yield callback (func(ctx, I, func(O) error) error) that emits items one at a time, eliminating the intermediate slice allocation entirely. All internal operators were migrated to the new signature. -
[x] JSON-only cache and store serialization: added
Codecinterface andWithCodecRunOption; a custom codec replacesencoding/jsonfor both cache and store-backedRefserialization.JSONCodecremains the default. -
[x]
Concurrency(1)fast paths: forMap,FlatMap,Filter, andSinkatConcurrency(1)with no custom hook or error handler, a stripped-down loop bypassestime.Now(),ProcessItem()retry dispatch, hook dispatch, and outbox dispatch. Addedsync.Poolreuse for ordered-concurrent slot structs, andclear+reslice batch-buffer reuse inrunBatch. -
[x] Receive-side micro-batching: fast-path dispatchers drain up to 15 additional items non-blocking after each blocking channel receive, processing them as a local
[16]Tbuffer. Trades 16 expensive goroutine-handoff receives for 1 expensive + 15 cheap non-blocking checks. Gated on!n.Supervision.HasSupervision()so pre-fetched items are never silently lost on supervision restart. Net result: MapLinear improved from 5,066 µs → 2,855 µs. -
[x] Stage fusion: consecutive fusible stages (Map, Filter, Sink at
Concurrency(1)with default config andNoopHook) are detected at run time and collapsed into a single goroutine. At the typed engine layer,fusionEntry func(*runCtx, func(ctx, T) error) stageFuncis set at construction time; whenForEachdetects a single-consumer fast-path chain it callsfusionEntrydirectly, composing the entire chain with zero channel hops and zero boxing.consumerCount atomic.Int32prevents fusion for shared (diamond) pipelines. Net result: 3-stage comparison improved from ~2.96 M/s to ~5.31 M/s. -
[x] Concurrent fast paths: extended the fast-path pattern to
runMapConcurrentandrunFlatMapConcurrent(unordered). Result:MapConcurrent−26%,MapOrdered/unordered−22%. -
[x] Drain protocol (send-side select elimination): replaced
select { case ch <- v: case <-ctx.Done(): }with plainch <- vin all fast-path stage runners. Safety maintained by drain goroutines (for range inCh {}) deferred at stage exit.nodeRunneralso defers drain goroutines for all non-Source stages. Net result: 3-stage comparison improved from 189 ms → 75.7 ms (~2.16× faster than raw goroutines).
Test coverage & correctness
-
[x] Property-based testing:
pgregory.net/rapidproperty tests for operator algebra invariants: commutativity and multiset preservation ofMerge,Take(n) ∘ Sortyields the n smallest elements,Broadcastfan-out completeness (every branch receives all items in order),Balanceitem-count preservation and round-robin fairness (per-branch counts differ by at most 1). Tests live inproperties_test.gobehind thepropertybuild tag; run withtask test:property. Catches classes of bugs that example-based tests miss. -
[x]
Pool.PutandPool.Warmup:Pool.Put(*Pooled[T])exports the previously internalputmethod, allowing callers to return a pooled object without going throughPooled.Release.Pool.Warmup(n int)pre-populates the pool by calling the factoryntimes and immediately returning the objects; reduces first-request latency for latency-sensitive start-up paths. No-op forn ≤ 0. Best-effort persync.Poolsemantics (GC may evict objects at any time). -
[x]
CircuitBreakerHalfOpenTimeout: newHalfOpenTimeout(d time.Duration) CircuitBreakerOpt; if the required number of successful probes has not been received withind, the circuit opens again (resetting the cooldown clock). Implemented incircuitBreaker.allow()by recording ahalfOpenDeadlineon each Closed→HalfOpen transition and checking it on every subsequentallow()call in the half-open state. Cleared when the circuit closes normally. -
[x]
RateLimitacceptsStageOption: signature changed fromopts ...RateLimitOpttorlOpts []RateLimitOpt, stageOpts ...StageOption, matching theCircuitBreakerAPI pattern.WithNameandBufferare applied to the stage metadata. All existing call sites updated. -
[x] Test coverage parity with v1: ported 51 missing test scenarios from the v1 archive across 10 test files: overflow (
DropNewest,DropOldest, hook callbacks, broadcast, concurrent load),WithDrain(partial flush, hard stop, normal completion),RunAsync/pause-resume (7 scenarios),WithPauseGate, State TTL (4 scenarios), cache TTL/eviction, error handler combos (ExponentialBackoff,RetryThenFallback,RestartAlways), merge/broadcast edge cases, ordered concurrency edge cases,Sort/SortBy/Unzip,TimeoutonFlatMap, CircuitBreaker (all gaps now closed), RateLimit (all gaps now closed). All skip stubs removed. -
[x] Gate/Pause wired into sources:
WithPauseGatesetcfg.gatebut the gate was never propagated torunCtxor checked by any source stage, makingPause()/Resume()a no-op. Fixed by addinggate *internal.GatetorunCtx, threading it throughRunner.Run, and checking it insourceStage,Generate(both paths), andFromSlice. -
[x]
WithDraintwo-phase shutdown:runWithDraincancelleddrainCtxto stop sources, but this simultaneously cancelled the processing context that downstream stages (e.g.Batch) needed to flush buffered items. Fixed by callingrc.signalDone()in Phase 1 (sources watchrc.donevia a goroutine inGeneratethat cancelsstageCtx), so sources stop cleanly while downstream stages retain a live drain context. Context errors from the hard-stop timeout are suppressed on return. -
[x]
RateLimitswallowed preemptive context deadline errors:rate.Limiter.Waitreturns a custom"rate: Wait(n=1) would exceed context deadline"error before the context deadline actually fires. The stage was doingreturn ctx.Err()(which is nil at that point) instead ofreturn err. Fixed. -
[x]
kitsune/testkitpackage:MustCollect,CollectAndExpect,CollectAndExpectUnordered, andRecordingHook.RecordingHookimplements all hook interfaces and provides typed accessors (Items,Errors,Drops,Restarts,Graph,Dones). -
[x] Virtual time /
TestClock:Clockinterface threaded through time-sensitive operators;testkit.TestClockadvances on demand:
clock := testkit.NewTestClock()
w := kitsune.Window(source, 5*time.Second, kitsune.WithClock(clock))
// ... send items ...
clock.Advance(5 * time.Second) // triggers flush immediately, no sleep
Benchmarks & performance evidence
-
[x] Benchmark vs raw goroutines: 3-stage linear graph (Map → Filter → ForEach) at 1M items; comparisons against
sourcegraph/concandreugn/go-streams. Results published indoc/benchmarks.md. -
[x] Allocation benchmark suite:
AllocsPerOptracking forMap,FlatMap,Batch,RateLimit, andCircuitBreaker; zero-alloc hot paths verified and regressions caught in CI.
Ecosystem: tails
-
[x] RabbitMQ / AMQP tail (
kamqp): source (consume queue) and sink (publish to exchange) for RabbitMQ and any AMQP 0-9-1 broker. -
[x] NATS JetStream tail (
kjetstream): dedicatedtails/kjetstreammodule covering patterns thatknatscannot express: pull-batch consumers (Fetch,FetchBytes), ordered consumers with auto-recovery (OrderedConsume), key-value watch streams (WatchKV), and high-throughput async publish (PublishAsync, returns a sink + flush pair). KV upsert sink viaPutKV. User owns all NATS objects; kitsune never creates or closes them. -
[x] Azure Service Bus tail (
kazsb): source and sink for Azure Service Bus queues and topics. -
[x] Azure Event Hubs tail (
kazeh): consumer group-based source for Azure Event Hubs. -
[x] Elasticsearch / OpenSearch tail (
kes): bulk-index sink and scroll/search source. -
[x] Google Cloud Storage tail (
kgcs): object source (list + read) and sink (upload).
Developer experience
- [x] Expanded examples: worked examples for the patterns most often asked about:
MapWithKeyfor per-user rate limiting,MapWithfor running totals,DeadLetterfor retry-with-fallback, andStagecomposition viaThen/Or.
API: Go 1.23 iterator protocol
- [x]
Pipeline[T]asiter.Seq[T]:Iter(ctx, p, opts…)exposes a pipeline as aniter.Seq[T]for range-over-func. Returns(iter.Seq[T], func() error). Breaking out of the loop early cancels the pipeline and suppresses the resultingcontext.Canceled.
API: dynamic flow control
- [x]
RunHandle.Pause()/RunHandle.Resume(): a source-onlyGateblocks sources from emitting while downstream stages continue draining in-flight items naturally.RunAsynccreates a gate automatically;WithPauseGate(g)provides external gate control for synchronousRun.Gate.Waitis lock + nil-check only when open; zero channel ops on the fast path.
Community & discoverability
- [x]
CONTRIBUTING.md: dev setup, running tests and examples, PR workflow, commit message convention. - [x] GitHub issue templates: separate templates for bug reports and feature requests.
- [x] GitHub Discussions: canonical place for "how do I…?" questions and design proposals.
- [x] Fuzz testing:
go test -fuzztargets for the scheduler, overflow logic, and context cancellation paths; run as a nightly CI job.