Skip to content

Performance Tuning Guide

Kitsune pipelines run as a DAG of goroutines connected by bounded channels. The defaults work well for most pipelines: buffer size 16, single-goroutine stages, no batching. This guide explains when and how to deviate from those defaults.


Overview

Measure first

The default Buffer(16) + Concurrency(1) handles most I/O pipelines at >1 M items/sec. Profile with MetricsHook or the Inspector before tuning; most "obvious" improvements don't move the needle.

Every stage in a pipeline has an input channel. When a stage finishes processing an item, it writes to the next stage's channel. If that channel is full, the writer blocks, which is backpressure. When the channel has room, the writer proceeds without waiting.

This model means: - Slow downstream stages naturally slow down fast upstream stages (backpressure propagates). - No goroutines are started until Run() is called (lazy execution). - Context cancellation propagates to all stages cleanly.

The default buffer size of 16 (engine.DefaultBuffer) is intentionally modest. Most pipelines are I/O-bound and spend most of their time waiting, not queuing.


Fast path and stage fusion

Kitsune has two internal execution shortcuts that can dramatically increase throughput for serial, hook-free pipelines: the fast path and stage fusion. Both are applied automatically when the pipeline meets certain conditions. Understanding them helps you avoid accidentally disabling them — and helps you diagnose throughput drops when you do.

What the fast path is

By default, each item passes through a processing loop that:

  • calls time.Now() before and after user code to measure latency
  • wraps user code in ProcessItem, which handles error policies (retry, skip, return)
  • calls hook.OnItem(...) for every item
  • guards against context cancellation with a select on ctx.Done()

For serial, hook-free pipelines that use default error handling, all of this overhead is unnecessary. The fast path is a simplified loop that:

  • reads items in micro-batches of up to 16 (one blocking receive, then up to 15 non-blocking non-blocking receives)
  • passes items directly to user code with no hook calls, no time.Now, and no ProcessItem wrapper
  • returns the first error immediately

Throughput improvements of 2–5x are common in CPU-bound pipelines. I/O-bound pipelines (HTTP, database) benefit less because I/O wait dominates.

What stage fusion is

When a Map → Filter → ForEach chain is serial and hook-free, Kitsune can go further: it composes all three stages into one goroutine with no inter-stage channel hops. Items flow from source through Map and Filter directly into ForEach without ever being written to an intermediate chan T. This eliminates two channel sends and two goroutine handoffs per item.

Fusion is only possible for single-consumer chains. If two operators both consume the output of a Map stage, fusion cannot be used for that stage.

Operators that support fusion: Map, Filter. FlatMap has a fast path but does not fuse. Batch, windowing operators, and multi-input operators never fuse.

Fusion boundaries

Fusion propagates upstream as long as every operator in the chain sets a fusionEntry. Only Map and Filter do this. Every other operator is a fusion boundary: it resets the fusion chain, and any Map/Filter stages downstream of it start a fresh fusion group (if they are themselves eligible).

Operator Breaks fusion? Notes
Map No — participates Sets fusionEntry; fuses with downstream Map, Filter, or ForEach
Filter No — participates Sets fusionEntry; fuses with downstream Map, Filter, or ForEach
ForEach Terminal — ends chain Triggers fused execution when upstream is a single-consumer fusable chain
FlatMap Yes Has its own fast path (drain + micro-batching) but never sets fusionEntry
Batch, Window, SlidingWindow, SessionWindow, BufferWith, ChunkBy, ChunkWhile Yes Windowing and batching operators do not set fusionEntry
Throttle, Debounce, Sample, SampleWith Yes Rate and timing operators do not set fusionEntry
Merge, Concat, Amb, Zip, CombineLatest, LatestFrom Yes Multi-input operators do not set fusionEntry
Partition, Broadcast, Balance, Share Yes Fan-out operators do not set fusionEntry
Sources (FromSlice, Generate, FromChan, Ticker, etc.) Yes Sources have no upstream to fuse with
Tap, TapError, Finally, IgnoreElements, ExpandMap Yes Utility operators do not set fusionEntry
MapWith, FlatMapWith, MapWithKey, FlatMapWithKey Yes Stateful key-sharding operators do not set fusionEntry
Any stage with Concurrency(n > 1), OnError, Overflow(DropOldest/DropNewest), Timeout, Supervise, CacheBy, or a run-time WithHook Yes Fast-path conditions not met; fusionEntry is not set or is discarded at run time

The practical rule: a fusion group starts at the first eligible Map or Filter after a boundary and ends at ForEach (or the next boundary). A FlatMap in the middle of an otherwise hot chain always introduces a channel hop and goroutine handoff. If you need to eliminate that hop, structure the pipeline so the FlatMap output feeds a fresh Map → Filter → ForEach sub-chain that is itself fusion-eligible.

Use IsOptimized() to confirm which stages are fused:

src      := kitsune.FromSlice(records)        // boundary (source)
flat     := kitsune.FlatMap(src, expand)       // boundary (FlatMap has fast path, no fusion)
mapped   := kitsune.Map(flat, transform)       // starts new fusion group
filtered := kitsune.Filter(mapped, isValid)    // continues fusion group
runner   := filtered.ForEach(store)            // ends fusion group — Map+Filter+ForEach fuse

for _, r := range mapped.IsOptimized() {
    fmt.Printf("%s: fused=%v fast=%v reasons=%v\n", r.Name, r.Fused, r.FastPath, r.Reasons)
}

Exact eligibility conditions

Both fast path and fusion require all of the following conditions to hold simultaneously:

Condition How to disable it (unintentionally)
Serial execution (Concurrency(1)) Concurrency(n) with n >= 2
No supervision Supervise(...) on the stage
Default error handler OnError(...) on the stage
Block overflow Overflow(DropNewest) or Overflow(DropOldest)
No per-item timeout Timeout(d) on the stage
No hook at run time WithHook(...) on Runner.Run (any non-NoopHook)
No pipeline-level error strategy WithErrorStrategy(...) on Runner.Run
No cache (Map only) CacheBy(keyFn) on the Map stage
Single consumer (fusion only) Passing the same pipeline to two operators, or using MergeRunners with two ForEach on the same upstream

Inspecting optimisation status with IsOptimized

Pipeline[T].IsOptimized(opts ...RunOption) returns an []OptimizationReport showing, for each stage, whether it would use the fast path and whether it would be fused. Call it after the full DAG (including the terminal ForEach) is constructed.

src    := kitsune.FromSlice(records)
mapped := kitsune.Map(src, enrich)
runner := mapped.Filter(isValid).ForEach(store)

// Assert the chain stays on the fast path in your test suite:
for _, r := range mapped.IsOptimized() {
    if r.SupportsFastPath && !r.FastPath {
        t.Errorf("stage %s left fast path: %v", r.Name, r.Reasons)
    }
}

Pipeline[T].IsFastPath(opts ...RunOption) bool is a convenience wrapper that returns false if any fast-path-capable stage would leave the fast path:

if !mapped.IsFastPath() {
    t.Error("expected pipeline to stay on the fast path")
}

Both methods accept the same RunOptions as Runner.Run, so you can test the run-time hook condition too:

// Verify the pipeline stays fast-path without a hook,
// but correctly reports the drop when LogHook is added.
assert.True(t, pipeline.IsFastPath())
assert.False(t, pipeline.IsFastPath(kitsune.WithHook(kitsune.LogHook(slog.Default()))))

IsOptimized is non-destructive: it allocates a temporary channel graph (like Describe) and returns before any goroutines are started.

Common pitfalls

Adding WithHook(LogHook(...)) for debugging silently disables the fast path across every Map and Filter stage in the run. If you need per-item logging without losing throughput, install the hook only in non-production builds, or use WithSampleRate(-1) to disable the sampling hooks while keeping stage lifecycle events.

Setting Concurrency(2) on a cheap CPU-bound Map is almost always slower than staying on the fast path. The goroutine scheduling overhead, channel synchronisation, and cache-line contention typically exceed the gains. Measure with pprof before reaching for concurrency on fast operations.

CacheBy on Map always disables the Map fast path. If you need caching on one stage but want the rest of the chain to be fast, put the cached Map first in the chain and let the downstream Maps stay cache-free.


Buffer Sizing (Buffer(n))

The channel buffer between two stages holds up to n items in memory. Each stage sees at most Buffer pending items at any time.

Increase the buffer when: - Your source is bursty, emitting many items in rapid succession before pausing. - Adjacent stages have variable latency (a slow stage occasionally falls behind, but catches up quickly). - You see goroutines blocking frequently under profiling and want to reduce that overhead.

Decrease the buffer when: - Items are large (structs with big fields, file contents, etc.) and memory is constrained. - You want strict backpressure so a slow consumer immediately slows the producer.

Buffer(0): synchronous channel:

pipe.Buffer(0)
A zero-size buffer makes every send block until the receiver is ready. Useful for testing backpressure behavior or enforcing strict sequential hand-off between stages. Not generally recommended for production throughput.

Buffer(0) is for testing

A synchronous channel forces every hand-off to block. Use it to exercise backpressure in tests, not for production throughput.

Rule of thumb: set buffer ≈ expected burst size, capped by what you can afford in memory. If your source emits 100 items in a burst every few seconds, Buffer(100) lets the source drain quickly while downstream processes at its own pace.


Concurrency (Concurrency(n))

By default each stage runs on a single goroutine. Concurrency(n) starts n goroutines, all reading from the same input channel.

Choosing a concurrency model

This section covers the performance knobs for Concurrency(n). If you are deciding which concurrency primitive to use (Concurrency, Ordered, MapWithKey, or Partition/Balance), see the Concurrency Guide.

Use for I/O-bound stages:

pipe.Concurrency(20) // HTTP enrichment, DB lookups, file reads
I/O-bound stages spend most of their time waiting, for a network response, a disk read, a lock release. Running 20 goroutines means 20 outstanding requests in flight simultaneously with no extra CPU cost.

CPU-bound stages rarely benefit beyond runtime.NumCPU(). Beyond that point you add goroutine scheduling overhead and GC pressure without gaining real parallelism.

Order is not preserved. With Concurrency(n) > 1, items are processed in whatever order goroutines happen to finish. If your pipeline requires deterministic ordering, keep the stage at Concurrency(1).

Ordered() concurrency trade-off

Ordered() with Concurrency(n) uses a slot-based resequencer. Peak throughput is ~10–15% lower than unordered, and a single slow item head-of-lines the downstream channel until it completes.

Buffer interaction: the n goroutines all draw from the same input channel. If items arrive in bursts, consider increasing Buffer alongside Concurrency so workers stay busy between bursts:

pipe.Concurrency(20), pipe.Buffer(64)

Starting point: 10–20 for HTTP enrichment, then profile. CPU-bound: start at runtime.NumCPU().


Overflow strategies

By default, a stage's output channel applies backpressure: when the buffer is full, the sender blocks until space is available. The Overflow option changes this.

Strategy Buffer full behaviour Lock cost Best for
Block (default) Sender blocks until space is available None General-purpose; preserves backpressure
DropNewest Incoming item is discarded; sender never blocks Atomic counter only Bursty sources where the latest data is most important
DropOldest Oldest buffered item is evicted; sender never blocks sync.Mutex on buffer-full path Sources where recent data is most important and some loss is acceptable

DropOldest under sustained load

Overflow(DropOldest) is designed for pipelines where dropping stale data is preferable to blocking the producer. Its implementation has two send paths:

  • Fast path (buffer has space): a non-blocking select succeeds immediately — no lock acquired, effectively lock-free.
  • Slow path (buffer is full): a sync.Mutex is held while the oldest buffered item is drained and the new item is inserted. The lock prevents two concurrent goroutines from interleaving their drain and resend steps, which would corrupt buffer ordering.

The slow path is the hot path under sustained backpressure. DropOldest is typically chosen precisely because downstream is consistently slower than upstream — meaning the buffer is full most of the time. In that scenario every Send call takes the slow path and acquires the mutex. With Concurrency(n), all n workers serialise on a single lock per item.

Mitigation: increase Buffer(n) alongside DropOldest. A larger buffer means more time on the fast path (no lock) and less time on the slow path. The trade-off is memory: each extra buffer slot holds one item.

// Prefer a larger buffer to reduce slow-path frequency with high concurrency.
kitsune.Map(src, fn,
    kitsune.Concurrency(8),
    kitsune.Buffer(256),
    kitsune.Overflow(kitsune.DropOldest),
)

When to prefer DropNewest instead: if you do not need the "keep the most recent item" guarantee, DropNewest achieves similar throughput with only an atomic counter — no mutex contention at any concurrency level. The difference is which item is discarded: DropNewest discards the incoming item (producer pays no extra cost), while DropOldest discards the oldest buffered item (consumer gets newer data, but at the cost of the mutex on the slow path).


Batch Sizing

Batch(p, size, opts...) collects up to size items before passing them downstream as a slice. This amortizes per-call overhead: a single bulk database insert of 100 rows is typically much cheaper than 100 individual inserts.

Larger batches: - Reduce per-call overhead (fewer round trips, better bulk API efficiency). - Increase memory usage (the batch is held in memory until it flushes). - Increase latency to first result.

Smaller batches: - Lower memory pressure and latency. - Higher per-call overhead.

BatchTimeout: preventing stalls:

pipe.BatchTimeout(500 * time.Millisecond)
Without a timeout, a partial batch sits in memory until it fills up. For near-real-time pipelines with variable or low volume, this can cause items to stall indefinitely. BatchTimeout flushes the partial batch after the specified duration, bounding worst-case latency.

Window(p, d): time-bucketed aggregation:

pipe.Window(p, 10*time.Second)
Window is Batch(p, MaxInt, BatchTimeout(d)): it collects all items that arrive within the window and flushes them together. Use it when you want time-bucketed aggregation, rather than size-bounded batching.


Memory

MemoryCache(maxSize)

Grows until it holds maxSize entries, then evicts the least-recently-used entry on each new insert. Size it to your working set of unique keys: - Too small: excessive eviction means cache misses, defeating the purpose. - Too large: memory grows unnecessarily.

If you don't know your working set size, start with an estimate and watch eviction rate via your Hook metrics.

MemoryDedupSet

Grows unboundedly with every unique key seen. Fine for batch jobs with a bounded input. For long-running pipelines that see many distinct keys over time, this will eventually exhaust memory. Switch to a Redis-backed dedup set (kredis.NewDedupSet) to bound memory:

dedup := kredis.NewDedupSet(redisClient, "pipeline:seen-keys")

Buffering operators

Several operators materialise the entire stream in memory before emitting any output. Use them only on bounded (finite) pipelines and size your heap accordingly:

Operator Memory held
GroupBy All items, grouped by key
ChunkBy, ChunkWhile All items, then split into chunks
Sort, SortBy All items, then sorted

If you are sorting or grouping a large dataset, consider pre-sorting upstream (e.g., a sorted database query or a pre-bucketed Kafka topic) so the pipeline can process records without buffering them all.

Large items

For pipelines that process large items (reading big files line by line, large API responses), prefer streaming with small buffers over materializing everything. A Buffer(4) with large items uses far less memory than Buffer(64).


Observability and Profiling

Name every stage:

pipe.WithName("enrich-user")
Stage names appear in Hook events. Without names, profiling output is hard to correlate back to your pipeline definition.

Use the Hook interface for metrics:

type Hook interface {
    OnStageStart(ctx context.Context, stage string)
    OnItem(ctx context.Context, stage string, dur time.Duration, err error)
    OnStageDone(ctx context.Context, stage string, processed int64, errors int64)
}
Implement Hook to collect per-stage timing, throughput, and error counts. See examples/metrics for a working example that writes to a Prometheus registry.

Quick debugging with LogHook:

err := runner.Run(ctx, kitsune.WithHook(kitsune.LogHook(slog.Default())))
LogHook logs stage start and done events with item counts to the provided slog.Logger. Useful for tracing where items are being lost or where a stage is slow.

Control per-item sampling rate:

SampleHook.OnItemSample fires once every 10 items by default. Adjust with WithSampleRate:

runner.Run(ctx,
    kitsune.WithHook(myHook),
    kitsune.WithSampleRate(100), // sample every 100th item
    // kitsune.WithSampleRate(-1), // disable sampling entirely
)
Lower rates reduce hook overhead in high-throughput pipelines.

CPU and memory profiling:

go test -bench=. -cpuprofile cpu.out -memprofile mem.out
go tool pprof cpu.out
go tool pprof mem.out
Run benchmarks with profiling enabled, then inspect with pprof. Look for stages with unexpectedly high CPU time or heap allocations.


Benchmarks

See doc/benchmarks.md for baseline throughput numbers measured on reference hardware.

Note that all benchmarks measure pipeline-construction cost plus execution cost together: each benchmark creates and runs a fresh pipeline. For a long-running production pipeline, the construction overhead is negligible; what matters is per-item throughput, which you can isolate by profiling a running process rather than relying solely on micro-benchmarks.