Roadmap
Completed milestones are preserved in roadmap-archive.md.
Upcoming
Correctness & safety
-
[x]
runWithDraintimer goroutine leak:runWithDraincreates atime.Afterinside a goroutine to enforce the drain timeout. If the pipeline finishes before the timeout fires,drainCtx.Done()exits the select — but the allocatedtime.Timergoroutine continues running until the duration elapses, creating a temporary goroutine leak perRuncall that usesWithDrainTimeout. Replacetime.After(drainTimeout)withtime.NewTimer(drainTimeout)and calltimer.Stop()whendrainCtxis cancelled first. -
[x]
RunHandle.errChsingle-consumer footgun:Wait()consumes from the buffered(1)errCh;Err()returns the same channel. Concurrent callers ofWait()and<-handle.Err()will have one block forever after the other consumes the single value. RedesignRunHandleto store the result in an atomic field written once and signal completion by closing the existingdonechannel, so bothWait()and channel-select callers are safe to use concurrently. -
[x]
kkafka.Consumeuncommitted last message on early exit: Whenyield(v)returns false (downstream closed viaTakeorTakeWhile), the function exits without callingCommitMessagesfor the last fetched message. On reconnect, that message redelivers. This is correct at-least-once behaviour but is undocumented. Add an explicit note to theConsumegodoc stating that the last message before a pipeline boundary will redeliver on reconnect. -
[x]
Supervise+MapWithKeysilent state loss on restart: When a supervisedMapWithKeystage restarts after a panic or error, the in-memory key map is re-initialized and all accumulatedRefstate is silently discarded. WithMemoryStore, state is unrecoverable; with an external Store, state survives only if the Ref was flushed before the failure. Document this interaction prominently in theSupervisegodoc and indoc/operators.md: supervised stateful stages require an external Store to survive restarts. -
[x]
ExpandMaphas no depth or fanout bound:ExpandMapperforms BFS on an arbitrary graph with no limit on expansion depth or total items. A graph with high branching factor producesfan^depthitems, exhausting memory silently. AddMaxDepth(n int)andMaxItems(n int)options that stop expansion when the limit is reached. At minimum add a prominent godoc warning and recommend pairing withTake(n)downstream.
API and ergonomics
-
[x]
MergeRunnersshould accept aRunnableinterface instead of*Runner:Build()currently leaks into user-facing code solely becauseMergeRunnersrequires*Runnerarguments. Introduce aRunnableinterface implemented by both*ForEachRunner[T]and*Runner, changeMergeRunnersto accept...Runnable, and addRunAsyncdirectly toForEachRunner[T].Build()is retained for backwards compatibility but nothing in the new design requires it and the docs should stop recommending it. AddedRunnableinterface satisfied by*Runnerand*ForEachRunner[T]; addedRunAsyncto*ForEachRunner[T]and*DrainRunner[T];Build()retained as an identity on*Runnerand unchanged on*ForEachRunner[T]for backwards compatibility. -
[x] Operator cohesiveness — cleanup pass: remove dead/duplicated symbols and standardise naming across the surface. Removed
ConsecutiveDedup/ConsecutiveDedupBy,DeadLetter/DeadLetterSink,WindowByTime,ElementAt,BroadcastN,Skip()alias,Liftalias. RenamedSkipLast→DropLast,SkipUntil→DropUntil,WithLatestFrom→LatestFrom,FrequenciesStream→RunningFrequencies(and converted to true running semantics). Replaced string-keyedCountBy/SumBywith genericRunningCountBy/RunningSumBy. RelocatedMapBatch,MapIntersperse,EndWith,Stage.Orfromcompat.goto their natural homes; deletedcompat.goentirely. See cleanup plan. -
[x] Operator cohesiveness — API changes: redesign the operator surface for consistent flush triggers, dedup semantics, and grouping shape. Replace positional
Batch(p, size)with option-driven flush triggers (BatchCount,BatchMeasure,BatchTimeout). MakeDedupe/DedupeByglobal by default with aDedupeWindow(n)option (n=1 = consecutive, n=k = sliding). DropWithDedupSetfromDistinct/DistinctBy. Replace terminalGroupByandGroupByStreamwith a single buffering pipeline operator returning*Pipeline[map[K][]T]. ReclassifyTakeRandomfrom terminal to buffering pipeline operator. AddSingleterminal withOrDefault/OrZero. AddWithinfor applying pipeline stages to slice contents. AddRandomSamplestreaming probabilistic filter. See design and API plan. -
[x]
LookupBy/Enrichbatch timeout: Under low throughput, items accumulate in the internalMapBatchbuffer until the full batch size is reached, introducing unbounded latency beforeFetchis called. Add aBatchTimeout time.Durationfield toLookupConfigandEnrichConfig, using the same semantics asBatch'sBatchTimeoutoption: flush a partial batch when the duration elapses with no new item. -
[x] Named result types to replace
Pairproliferation:LookupByreturnsPair[T,V],ZipreturnsPair[A,B],Timestampwraps items inPair[T,time.Time],WithIndexwraps inPair[T,int]. Users read.First/.Secondeverywhere without type context. Replace with named, self-documenting types:Timestamped[T]{Value T; Time time.Time},Indexed[T]{Value T; Index int},Enriched[T,V]{Item T; Value V}.Paircan remain as a utility but named types should be the canonical output of their respective operators. -
[x]
ConcatMapshould reject incompatible options at construction time:ConcatMapappendsConcurrency(1)last, silently overriding any user-suppliedConcurrency(n). A user who passesConcurrency(4)gets Concurrency(1) with no warning. Validate at construction time and panic (with a clear message) or return an error when a concurrency incompatible option is detected. -
[x] Document
StageOptionlast-write-wins semantics: PassingBuffer(16), Buffer(512)produces buffer 512 — last option wins. This applies to every option and is not stated anywhere indoc/options.mdor theStageOptiontype godoc. Add one sentence to both locations. -
[x]
GeneratevsChannel[T]selection guide: Both bridge external code into a pipeline —Generateis pull-based (callback yields values),Channel[T]is push-based (send from any goroutine). Neither godoc mentions the other, leaving users to discover the difference by trial and error. Add a "When to use each" note to both godocs and include a comparison indoc/operators.md.
Performance
-
[x] Bypass codec serialization for
MemoryStoreRef operations: Everyref.Get()andref.Set()inMapWith/MapWithKeymarshals to/from[]bytevia the Codec even when the backing Store isMemoryStore(in-process). For latency-sensitive inner loops this is wasted allocation. Add a fast path: if the Store implements anInProcessStoremarker interface (or is type-asserted to be*memoryStore), store the value asanydirectly, bypassing codec. -
[x]
DrainChangoroutine burst on mass teardown: Every transform stage doesdefer func() { go internal.DrainChan(inCh) }(). ATake(1)on a 20-stage pipeline launches 20 drain goroutines simultaneously at teardown. For pipelines that cycle frequently (viaRetry), this creates sustained goroutine pressure. Investigate cooperative drain: pass a drain-ready signal down the topology so upstream stages can self-drain without spawning goroutines. -
[x] Cooperative drain: full operator rollout — complete conversion of all operators deferred from the prototype:
Mapconcurrent/ordered paths and fused fast path;Filter;Batch,BufferWith,ChunkBy,ChunkWhile,SlidingWindow,SessionWindow;FlatMap,ConcatMap;Merge,Amb,Zip,CombineLatest,LatestFrom;TakeUntil,DropUntil;GroupByStream,Balance,Partition,Broadcast;MapWith,MapWithKey,FlatMapWith,FlatMapWithKey;Scan,Reduce,Aggregate;LookupBy,Enrich; all sources. Multi-consumer fan-out stages require explicit ref-count initialization using the actual downstream count. See plan. -
[x] Sharded
DropOldestoutbox:dropOldestOutboxholds a single mutex protecting drain-and-resend when the buffer is full. Under sustained backpressure withConcurrency(n), all n workers serialize on this mutex; exactly the scenarioDropOldestis designed for becomes its hot path. Implemented a sharded outbox (worker i uses shard i % n) to eliminate cross-worker contention when bothOverflow(DropOldest)andConcurrency(n > 1)are active.
Testing
-
[x] Property tests for windowing operators:
Batch,BufferWith,SlidingWindow,SessionWindow,ChunkBy, andChunkWhilehave no property-based tests. Laws to verify:Batch(n)completeness — every input item appears in exactly one batch; all batches except the last have exactly n items.SlidingWindow(size, step)— adjacent windows share exactlysize - stepelements.SessionWindow(gap)— items separated by more than gap appear in different sessions.ChunkBy(keyFn)— consecutive same-key items always cogroup; key boundaries produce new chunks. Usetestkit.NewTestClock()for deterministic timing. -
[x] Property tests for
GroupByStream:GroupByStreamroutes items to per-key sub-pipelines and is structurally one of the most complex operators in the library, with no property tests. Key law: for any input stream, items with key K must appear in arrival order in exactly the sub-pipeline rooted at K, with no cross-key contamination. -
[x] Test
Supervise+MapWithKeystate contract on restart: Add a test that panics a supervisedMapWithKeystage mid-stream and verifies the exact post-restart state of per-keyRefvalues — zeroed withMemoryStore, preserved with an external Store. This test codifies the contract that is currently only implied by the documentation item above. -
[x] Verify
benchstatregression baseline (re-open: marked done in roadmap but not confirmed): The roadmap marks thebenchstatperformance regression baseline as complete, buttestdata/bench/baseline.txtand CI integration were not confirmed present. Verify the baseline file is committed and the CI diff step is active; if not, implement from scratch.
Developer experience
-
[x] Source selection guide (
doc/sources.md): Fourteen source operators exist with overlapping use cases and no unified decision guide. Cover:FromSlicefor in-memory data;Fromto wrap an existing channel;Generatefor pull-based external sources;Channel[T]for push-based multi-sender bridging;Ticker/Timerfor time-driven emission;Unfold/Iteratefor mathematical sequences;Concatfor sequential chaining;Ambfor racing sources. Include theGeneratevsChannel[T]comparison from the ergonomics item above. -
[x]
ContextCarriervsWithContextMapperdecision guide: The two approaches for per-item trace propagation have meaningfully different trade-offs:ContextCarrierrequires modifying the item type (impossible for third-party types);WithContextMapperis a stage option requiring no type changes. The comparison exists only as a one-line godoc mention. Add a section todoc/operators.mdor a newdoc/tracing.mdwith a comparison table and worked examples for both. -
[x] Tail godoc quality baseline and template: Audited all 27 tail packages against a standard template. Added missing caller-owns statements, worked examples, and delivery semantics declarations to each package godoc. Replaced all em dashes in godoc with colons or semicolons. Template codified in
doc/tails.md.
Ecosystem
-
[x] Shared tail interface contract (
doc/tails.md): Wrotedoc/tails.mdcovering: naming conventions (primary verbs plus accepted domain-idiomatic alternatives), parameter order, connection lifecycle ownership, error propagation, delivery semantics, the package godoc template, and a tail matrix with all 27 tails. Existing tails were audited and brought to conform. -
[x]
kkafkabatched commit variant:kkafka.Consumecommits each Kafka message individually afteryield, which serializes commit latency with processing latency. For high-throughput consumers, batched commits (flush N offsets at once, or on a timer) reduce broker round-trips significantly. Implemented asBatchSize(n)andBatchTimeout(d)options onConsume(variadic, backward-compatible) rather than a standalone function.
Long-term
-
[ ] Event-time / watermark support: All time-based operators (
Debounce,SessionWindow,Throttle, etc.) use processing time — the wall clock when the item arrives. For Kafka, event sourcing, and log-replay workloads, items carry their own timestamps and windowing correctness requires event time. Add aWithEventTime[T](fn func(T) time.Time)option to windowing operators and a watermark mechanism (advancing a per-pipeline event-time frontier) that drives window closure based on event timestamps rather thantime.Now(). This is the most significant remaining gap relative to production streaming systems (Flink, Kafka Streams). -
[ ] Checkpointing and fault-tolerant restart: Process restarts lose all
MapWithKeystate (unless an external Store is configured), all window accumulators, and all in-progress batch buffers. Add aCheckpointmechanism: periodic serialization of operator state snapshots to the configured Store, with deterministic recovery on restart from the last checkpoint. Even a basic implementation forMapWithKey(flush key map on a signal or interval) would make fault-tolerant pipelines possible without external state management overhead. -
[ ] First-class composable segment type: There is no way to define and name a reusable pipeline segment — a
*Pipeline[A] → *Pipeline[B]transform that can be introspected, documented, and composed as a named unit. Users work around this with Go functions, which works but does not integrate withDescribe(),IsOptimized(), or the inspector dashboard. ASegment[A,B]type wrapping a transform function with its own name and metadata would enable building reusable pipeline libraries on top of kitsune that remain observable.
Active / Near-term
Operators
-
[x]
Retry[T]standalone operator: first-classRetry(p, policy)stage that re-subscribes to the upstream on failure, independent of theOnError(RetryMax(...))handler path. The handler-based retry is per-item; standaloneRetryre-runs the entire upstream pipeline, making it the right primitive for sources that should reconnect on drop (e.g. a websocket tail that disconnects). Policy controls max attempts, backoff, and which errors are retryable. -
[x]
SampleWith(p, sampler): emit the most recent item frompwhenever thesamplerpipeline fires. Distinct fromThrottle(which limits emission rate) andDebounce(which waits for a gap):SampleWithis driven by an external pipeline signal. Useful for "poll latest value every N seconds" patterns without holding a reference to the latest value manually. -
[x]
IgnoreElements(p): drainpfor side effects and emit nothing downstream. Currently requiresFilter(p, func(_ T) bool { return false })which reads as intent-obscuring. A named combinator is clearer and optimizable (no outbox allocation needed). -
[x]
Empty[T]()andNever[T](): named source primitives.Empty()completes immediately with no items;Never()blocks forever until context cancellation. Both are implied by existing combinators (FromSlice(nil),Generatethat never yields) but unnamed, which makes pipeline algebra tests awkward. Used as identity elements in composition proofs. -
[x]
Materialize[T]/Dematerialize[T]:Materializewraps each item and the terminal error into a sum typeNotification[T]{Value T; Err error; Done bool};Dematerializeunwraps it. Enables passing error events through operators that only handleT, and makes error routing composable without needingMapResultat every stage. -
[x]
BufferWith(p, closingSelector): signal-driven buffering: collect items until theclosingSelectorpipeline fires, then emit the accumulated slice and reset. GeneralizesBatch(size)andBatchTimeoutto arbitrary boundary signals. The defining pattern for "accumulate until external trigger" (e.g. flush on heartbeat, flush on upstream signal). NamedBufferWithto avoid collision with theBuffer(n)stage option.
State
-
[x]
WithKeyTTL(d)forMapWith/FlatMapWith: evict per-key goroutines and their associatedRefstate afterdof inactivity. Without this, long-running pipelines keyed on high-cardinality fields (user IDs, session tokens) accumulate goroutines unboundedly. Eviction should be lazy (triggered on next access or a background sweeper, not a hard timer per key) to avoid thundering-herd on periodic activity bursts. -
[x]
TTLDedupSet(ttl): a time-boundedDedupSetimplementation that forgets keys afterttl.MemoryDedupSetgrows unbounded on infinite streams;BloomDedupSetis bounded but cannot expire.TTLDedupSetenables safeDistinct/Dedupeon never-ending streams where "seen in the last N minutes" is the correct semantic. Implement with a ring-buffer of(key, expiry)pairs and lazy eviction onContains.
API and ergonomics
-
[x]
WithDefaultBuffer(n)RunOption: set the channel buffer size for all stages in a run without annotating every operator individually. Currently every stage defaults to 16; users tuning for latency (smaller buffers, lower memory) or throughput (larger buffers, less scheduling) must annotate each stage. A run-level default would let a single option flip the entire pipeline's buffering posture, with per-stageBuffer(n)still taking precedence. -
[x] Consolidate
Ticker/Interval:Intervalremoved. UseTicker(which emitstime.Time) andMapto derive a counter if needed. -
[x] Consolidate
Drain/ForEach:DrainandDrainRunnerare deprecated with a godoc note.ForEachis now the canonical terminal stage for all item processing, including the discard case. -
[x] Numeric type constraint on
Sum: already implemented;Sum[T Numeric]uses theNumericconstraint defined in collect.go.Min,Max, andMinMaxalso use it. -
[x] Error action naming audit:
ActionDrop()is now the canonical name.Skip()is kept as a deprecated alias pointing toActionDrop(). -
[x] Per-error-type retry control:
RetryIf(predicate func(error) bool, backoff)andRetryIfThen(predicate, backoff, fallback)added.RetryIfretries when the predicate returns true and halts otherwise;RetryIfThendelegates to a fallback handler on non-retryable errors.
Developer experience
-
[x] "Choosing a concurrency model" guide: document when to reach for each of the four concurrency primitives:
Concurrency(n)(embarrassingly parallel, order optional),Ordered()(parallel but preserve input order),MapWithkey-sharding (per-entity sequential, no locks),Balance+Partition(explicit fan-out). Include a decision flowchart and worked examples for the most common patterns: per-user rate limiting, parallel enrichment with ordering, stateful aggregation. -
[x] Fast-path eligibility documentation: the fast-path and stage-fusion optimizations are entirely opaque. Users who add a
WithHookfor debugging or setConcurrency(2)don't know they've disabled the fast path, and can't diagnose the resulting throughput drop. Add a section indoc/tuning.mdlisting the exact conditions for fast-path eligibility and stage fusion, and exposePipeline.IsOptimized() bool(or similar) for use in tests. -
[x]
WithInspectorStore(store)for persistent inspector state: the live inspector dashboard holds all pipeline metrics in-memory and loses them on restart. AWithInspectorStoreoption would let operators persist node snapshots and metric history to an external store (or even the existingMemoryStoreequivalent with a longer TTL), enabling post-mortem analysis of pipeline behaviour after a crash or restart. -
[x]
benchstatperformance regression baseline: commit atestdata/bench/baseline.txtsnapshot produced bybenchstatfrom the main branch. Add a CI step that runs benchmarks on PRs and diffs against the baseline, failing if any benchmark regresses beyond a threshold (e.g. 10%). Prevents silent throughput regressions from landing unnoticed, especially around fast-path and fusion logic. -
[x] Property-based tests in the default test run: the
pgregory.net/rapidproperty tests inproperties_test.goare gated behind a// +build propertytag and excluded fromtask test. They catch classes of bugs (operator algebra invariants, fan-out completeness, ordering guarantees) that example-based tests miss. Remove the build tag and include them intask test; if runtime is a concern, run them with a reduced number of iterations (rapid.Settings{MaxRuns: 50}) in the default run and the full count intask test:all. -
[x] Unified tail integration test matrix: the 27 tail packages each have their own test module, but there is no single CI step that reports their combined pass/fail status.
task test:extruns them sequentially but the output is scattered. Add a unified matrix report — a table of tail name, pass/fail, and skipped-reason (e.g. "no broker in CI") — so regressions across tails are visible at a glance rather than buried in individual log streams. -
[x] Supervision + error handler interaction documentation: it is not documented whether
OnErrorandSupervisecan be used together on the same stage, and if so, which takes precedence. Add a dedicated section todoc/operators.md(or a newdoc/error-handling.md) that covers: the evaluation order of error handler → supervision policy, worked examples for common combinations (retry-then-restart, skip-unless-fatal-then-restart), and the distinction between per-item errors (OnError) and stage-level restart (Supervise).
Correctness & safety
-
[x]
Pooled.Valueuse-after-release protection:Pooled.Valueis a public field, and callingRelease()followed by a read ofValueis documented as undefined behaviour — but Go cannot enforce it. Under time pressure, a developer who releases and then readsValuein a logging call gets silent corruption. Add areleased atomic.Boolguard and panic onValueaccess after release (opt-in via a build tag or constructor flag if the overhead is unacceptable in hot paths). At minimum, promote the warning to a prominent// WARNING:block in the type godoc rather than a single sentence inRelease. -
[x]
globalIDSeqtruncation on 32-bit platforms:nextPipelineID()atomically increments anint64counter but casts the result toint. On 32-bit targetsintis 32 bits; at 2^31 stage IDs the cast silently wraps and channel-memoisation inrunCtx.chansstarts colliding, producing incorrect DAG wiring. Either returnint64throughout (stage IDs,stageMeta.id,stageMeta.inputs), or add a//go:build !386 && !armconstraint and document the limitation. -
[x]
refRegistry.getshould use a read lock:refRegistry.get()acquires a fullsync.Mutexon every call. Becauseinit()is called exactly once during the build phase — before any stage goroutine starts — all subsequentget()calls are read-only against a fully-initialised map. Switching tosync.RWMutexwithRLock()inget()is more semantically correct and avoids unnecessary write-lock contention when many stage goroutines callget()at startup. Alternatively, replace the map withsync.Map, which is optimised for append-once, read-many workloads.
Testing
-
[x] Property tests for windowing operators:
Batch,BufferWith,SlidingWindow,SessionWindow,ChunkBy, andChunkWhilehave no property-based tests. These are the most stateful, most timing-sensitive operators in the library and the most likely to have subtle partitioning bugs. Laws to verify:Batch(n)completeness (every input item appears in exactly one batch; all batches except possibly the last have exactlynitems);BufferWithpartition (concatenation of all emitted slices equals the input in order);SlidingWindowoverlap invariant (adjacent windows share exactlysize - 1elements for stride 1);SessionWindowclosure (items separated by more than the timeout appear in different sessions). The property tests already caught a realAmbbug missed by 27 example tests — windowing operators deserve the same treatment. -
[x] Fuzz targets: No fuzz tests exist. Add fuzz targets for: (1) operators that accept structured input where a malformed item could panic inside user-supplied functions (e.g. a
Mapfn that callsjson.Unmarshal); (2)BloomDedupSetto verify no panics or incorrect contains-results under adversarial key inputs. Even a minimalFuzzFromSlicethat drivesFromSlice → Map(panicRecover) → Collectprovides a panic-safety baseline. Fuzz targets live alongside tests and run automatically withgo test -fuzz.
API and ergonomics
-
[x]
Orerror discard documentation: whenprimaryerrors andfallbackalso errors, the primary error is silently discarded and only the fallback error is returned. This is the right default (most recent error wins), but it is not documented. Users who want both errors for logging or metrics will be surprised. Either document the discard explicitly in the godoc, wrap both errors witherrors.Join(making them both visible), or accept an optionalonPrimaryError func(error)callback so callers can observe the discarded error without changing the return value. -
[x]
ContextCarriernon-interface alternative: tracing context propagation viaContextCarrierrequires every item type to implement the interface. Third-party structs (Kafka messages, Protobuf-generated types, stdlib types) cannot be retrofitted without a wrapper. Add aWithContextMapper[T](fn func(T) context.Context) RunOption(orStageOption) that extracts a context from items by value rather than by interface. This makes per-item tracing a configuration choice rather than a type constraint, and removes the need for wrapper types in the common case where only one field carries the trace context.
Developer experience
-
[x]
IsOptimized()should surface ineligibility reasons:IsOptimized()returns a per-stage boolean but does not say why a stage lost eligibility. A user who addsOnError(Skip())to a hotMapstage silently falls off the fast path with no feedback.stageMetaalready carriesisFastPathCfgandsupportsFastPath; extend the return type to[]OptimizationReport(or equivalent) where each report names the failing condition:"OnError handler disables fast path","Timeout set","Hook active","consumerCount > 1 disables fusion". This makesIsOptimizedgenuinely actionable rather than a binary indicator. -
[x] Document fusion boundaries in the tuning guide: stage fusion applies to
Map → Filterchains ending atForEach, but any operator that does not setfusionEntry(sources,FlatMap,Batch, all fan-out/fan-in operators) is a silent fusion boundary. After aFlatMap, even a longMap → Filter → Mapchain will not fuse. The tuning guide explains fast-path conditions but does not list fusion boundaries. Add a table of operators that break fusion so users tuning a hot path know which operators to avoid or isolate. -
[x] Document
MapPooledmutex contention under high concurrency:Pool.Get()acquires async.Mutex. WithConcurrency(8), eight goroutines callpool.Get()in the hot loop and all serialize on that lock.sync.Poolavoids this via per-P caches but does not offer LIFO semantics or no-eviction guarantees. Add a note to theMapPooledandPoolgodoc explaining the contention behaviour at high worker counts, and consider providing a sharded pool variant (ShardedPool[T]) for use cases where allocation avoidance and high concurrency are both required. -
[x] Fix
Pool.Warmupgodoc: the current comment reads "Warmup is best-effort: sync.Pool may evict objects at any time (e.g. on GC)." This is copied fromsync.Pooldocumentation and does not apply to the customPool[T]implementation, which never evicts. Rewrite the comment to accurately describe actual behaviour: objects pre-populated byWarmupremain in the pool until retrieved byGet()and not yet returned byRelease(). -
[x] Document
DropOldestbehaviour under sustained load:dropOldestOutboxuses a fast lock-free send when the buffer has space but falls back to a mutex-protected drain-and-resend when full. In a pipeline where downstream is consistently slower than upstream — exactly the scenarioDropOldestis designed for — the slow path becomes the hot path and allConcurrency(n)workers serialize on the mutex. Add a note to theOverflow(DropOldest)godoc and the tuning guide explaining this, so users can make an informed choice betweenDropOldest,DropNewest, and back-pressure.
Long-term
-
[x] Typed
ErrorHandler[T]:TypedReturn[O](val O) StageOptionadded as a compile-time-safe alternative for the standalone case. A mismatched type inReturnis documented and has a regression test. Full parameterization ofErrorHandler[T]across all handler combinators is deferred to v2. -
[x] Pull-based (iterator) execution path: deferred.
Iter(ctx, p)exposes the pull interface externally. Internal engine adoption is a future concern.