Getting Started with Kitsune
This guide takes you from zero to a working pipeline in about 10 minutes. It covers the mental model, the key patterns you'll use daily, and where to go next.
Mental model
A Kitsune pipeline is a directed acyclic graph (DAG) of processing stages. You assemble it by calling functions: no goroutines start, no channels are allocated. Everything is lazy. When you call Run (or Collect, or First), the runtime:
- validates the graph
- allocates bounded channels between every pair of stages
- launches one goroutine per stage inside an errgroup
- runs until the source is exhausted, the context is cancelled, or a stage returns an unhandled error
Backpressure is automatic. Each inter-stage channel has a bounded buffer (16 by default). A slow downstream stage blocks the upstream stage rather than allowing unbounded queuing.
flowchart LR
S([Source<br/>FromSlice / Kafka / …])
M[Map<br/>parseLog]
F[Filter<br/>isCritical]
B[Batch<br/>100 items]
T([Terminal<br/>ForEach / Collect])
S -->|"bounded chan (16)"| M
M -->|"bounded chan (16)"| F
F -->|"bounded chan (16)"| B
B -->|"bounded chan (16)"| T
T -.->|backpressure| B
B -.->|backpressure| F
F -.->|backpressure| M
M -.->|backpressure| S
classDef src fill:#ffe0b2,stroke:#e65100,color:#000;
classDef term fill:#c8e6c9,stroke:#1b5e20,color:#000;
class S src
class T term
Solid arrows show item flow; dashed arrows show backpressure propagating upstream when a downstream stage is slow.
Context propagates everywhere. Cancelling the context stops all stages cleanly.
Vertical style, not fluent chains
Go's type system requires a specific code style. Methods preserve the element type and can be chained. Free functions change the type and must be assigned to a new variable:
lines := kitsune.FromSlice(rawLines) // *Pipeline[string]
parsed := kitsune.Map(lines, parseLog) // *Pipeline[LogEntry] — type changed: free function
critical := parsed.Filter(isCritical) // *Pipeline[LogEntry] — type preserved: method
batched := kitsune.Batch(critical, kitsune.BatchCount(100)) // *Pipeline[[]LogEntry]; type changed: free function
err := batched.ForEach(store).Run(ctx)
This is a Go language constraint: methods cannot introduce new type parameters. But the style is an asset: each variable name documents what's flowing, and the compiler checks every type transition.
Rule of thumb:
- Free functions (type may change, or extra type parameters required): Map, FlatMap, Batch, Unbatch, MapWith, FlatMapWith, Reject, ChunkBy, Sort, SortBy, ZipWith, Unzip, Enrich, …
- Methods (type-preserving, no extra type parameters): .Filter, .Tap, .Take, .Skip, .Through, .ForEach, .Drain
Not every operator fits neatly. Reject keeps the type but is a free function because the method form would be ambiguous with complex generics. When in doubt, look for it in both places; the operator catalog lists every operator with its exact call form.
Your first pipeline
package main
import (
"context"
"fmt"
"strconv"
kitsune "github.com/zenbaku/go-kitsune"
)
func main() {
// Source: emit each string from a slice
input := kitsune.FromSlice([]string{"1", "2", "3", "4", "5"})
// Transform: parse each string to int
// kitsune.LiftFallible wraps a context-free func(I)(O,error) for use with Map
parsed := kitsune.Map(input, kitsune.LiftFallible(strconv.Atoi))
// Terminal: collect all results into a slice
results, err := parsed.Filter(func(n int) bool { return n > 2 }).
Collect(context.Background())
if err != nil {
panic(err)
}
fmt.Println(results) // [3 4 5]
}
FromSlice + Collect is the testing pattern too: deterministic, no goroutines, no infrastructure. See the basic example for the runnable version.
Consuming output with Iter
Collect is the right terminal when you need the full result set: a test assertion, a bulk insert, or building a response. But sometimes you want to process items one-by-one as they arrive, without buffering the whole stream. That's what Iter is for.
Iter returns two values: an iter.Seq[T] for use with range, and an error function to call once the loop finishes:
seq, errFn := kitsune.Map(events, enrich, kitsune.Concurrency(20)).Iter(ctx)
for event := range seq {
fmt.Println(event)
}
if err := errFn(); err != nil {
log.Fatal(err)
}
The pipeline starts in the background as soon as Iter is called. Items flow from the pipeline into the for range loop through a buffered channel; the loop blocks when the pipeline hasn't produced anything yet, and the pipeline backs off when the consumer is too slow.
The error function must always be called: it blocks until the pipeline finishes and reports any stage error. Think of it like rows.Close() or resp.Body.Close(): forgetting it leaks resources.
Breaking out early
If you only need the first few results, break as usual. Kitsune detects the break, cancels the pipeline, and drains any in-flight items. The error function returns nil: the context.Canceled caused by the break is suppressed, since it was expected:
seq, errFn := kitsune.FromIter(infiniteStream).Iter(ctx)
var first []Event
for e := range seq {
first = append(first, e)
if len(first) == 10 {
break // pipeline is cancelled; resources are freed
}
}
if err := errFn(); err != nil { // nil — break-induced cancellation is suppressed
log.Fatal(err)
}
If the caller's own context is cancelled (timeout, shutdown signal), errFn returns context.Canceled as normal, only the break-path suppresses it.
When to use Iter vs Collect
Iter |
Collect |
|
|---|---|---|
| Output size | Unbounded or large | Bounded, fits in memory |
| Processing | Item-by-item as they arrive | After all items are in hand |
| Early exit | Natural with break |
Use .Take(n) then Collect |
| Composability | Works with slices.Collect, maps.Collect, any iter.Seq[T] consumer |
Returns []T directly |
For the common case of "run the whole pipeline and give me a slice", Collect is more concise. Reach for Iter when streaming, when memory matters, or when you want to feed pipeline output directly into another iter.Seq[T]-based API.
Pausing and resuming a pipeline
Sometimes you need to temporarily stop a pipeline without cancelling it, for example, during a maintenance window, when a downstream system signals it's overloaded, or to implement rate-adaptive ingestion. Pause and Resume handle this without tearing down the pipeline or losing in-flight work.
RunAsync automatically creates a Gate and exposes it on the returned RunHandle:
h := pipeline.ForEach(store).RunAsync(ctx)
// Later, from any goroutine:
h.Pause() // sources stop emitting; in-flight items continue draining
h.Resume() // sources start again
While paused, sources block before sending each item into the pipeline. Downstream stages, Map, Filter, Batch, sinks, continue running and drain whatever is already in-flight. No items are dropped and no goroutines exit.
Checking pause state
External gate for synchronous Run
RunHandle requires RunAsync. To pause a pipeline running under the blocking Runner.Run, create a Gate externally and attach it with WithPauseGate:
gate := kitsune.NewGate()
go func() {
time.Sleep(5 * time.Second)
gate.Pause()
performMaintenance()
gate.Resume()
}()
// Blocking run, pauseable from the goroutine above.
if err := runner.Run(ctx, kitsune.WithPauseGate(gate)); err != nil {
log.Fatal(err)
}
Cancellation while paused
Cancelling the context while the pipeline is paused unblocks sources immediately and shuts the pipeline down cleanly. Pause does not interfere with shutdown.
When not to use pause
Pause is a coarse control: it stops all sources simultaneously. For fine-grained flow control on a single stage, use RateLimit or the Overflow(DropOldest) buffer strategy instead. For stopping the pipeline entirely, cancel the context.
See the pause example for a runnable version.
Adding concurrency
Stage functions receive a context.Context as their first argument:
func enrichUser(ctx context.Context, id string) (User, error) {
return db.GetUser(ctx, id) // real I/O
}
To run multiple requests in parallel, add Concurrency(n):
This starts 20 goroutines that all read from the same input channel. Output order is not preserved by default; goroutines finish in whatever order the I/O completes. Ordered() uses a slot-based resequencer: workers still run in parallel, but results are emitted in arrival order.
Starting point: 10–20 for HTTP or database calls; runtime.NumCPU() for CPU-bound work.
See the concurrent example for a runnable version with LogHook.
Error handling
Every stage function returns (O, error). By default, any error halts the entire pipeline (context cancelled, Run returns the error). You can change this per stage with OnError:
For more advanced routing: send failures to a dead-letter queue instead of discarding them. Use MapResult, wrapping transform with your own retry loop if you need transient-failure handling first:
// MapResult routes errored items to a second pipeline as ErrItem[Input].
ok, dlq := kitsune.MapResult(items, transform)
// ok is *Pipeline[Output]
// dlq is *Pipeline[ErrItem[Input]]
When Runner.Run returns an error, it is wrapped in a kitsune.StageError carrying the stage name, attempt count, and original cause:
if err := runner.Run(ctx); err != nil {
var se *kitsune.StageError
if errors.As(err, &se) {
fmt.Printf("stage %q failed on attempt %d: %v\n", se.Stage, se.Attempt, se.Cause)
}
}
See the examples directory for more.
Branching: fan-out and fan-in
Partition routes each item to one of two outputs based on a predicate:
orders := kitsune.FromSlice(allOrders)
high, regular := kitsune.Partition(orders, func(o Order) bool { return o.Amount >= 100 })
vip := kitsune.Map(high, priorityProcess).ForEach(notifyVIP)
std := kitsune.Map(regular, standardProcess).ForEach(store)
// MergeRunners runs both branches, blocks until both complete
merged, err := kitsune.MergeRunners(vip, std)
if err != nil { /* handle */ }
err = merged.Run(ctx)
Broadcast copies every item to all N output pipelines (unlike Partition, where each item goes to exactly one):
Merge fans multiple same-type pipelines back into one:
See the fanout and broadcast examples.
Testing pipelines
Because pipelines are assembled lazily, you can test any fragment in isolation:
func TestParseLine(t *testing.T) {
input := kitsune.FromSlice([]string{"INFO: started", "WARN: slow", "ERROR: failed"})
result := kitsune.Map(input, parseLine, kitsune.OnError(kitsune.Skip()))
entries, err := result.Collect(context.Background())
require.NoError(t, err)
require.Len(t, entries, 3)
assert.Equal(t, "ERROR", entries[2].Level)
}
FromSlice + Collect is the core test pattern: no goroutines to manage, no ports to open, fully deterministic output.
The kitsune/testkit package wraps this pattern with assertion helpers:
import "github.com/zenbaku/go-kitsune/testkit"
testkit.CollectAndExpect(t, p, []int{2, 4, 6})
testkit.CollectAndExpectUnordered(t, p, []int{6, 2, 4})
got := testkit.MustCollect(t, p) // fails the test on error
// Inspect lifecycle events:
hook := &testkit.RecordingHook{}
runner.Run(ctx, kitsune.WithHook(hook))
hook.Errors() // items that produced errors
hook.Restarts() // supervision restart events
For reusable pipeline fragments, define them as Stage[I,O] values and test each independently:
var ParseStage kitsune.Stage[string, Event] = func(p *kitsune.Pipeline[string]) *kitsune.Pipeline[Event] { ... }
var EnrichStage kitsune.Stage[Event, Enriched] = func(p *kitsune.Pipeline[Event]) *kitsune.Pipeline[Enriched] { ... }
// Compose for production
var Pipeline = kitsune.Then(ParseStage, EnrichStage)
// Test each stage independently
events, _ := ParseStage.Apply(kitsune.FromSlice(testLines)).Collect(ctx)
enriched, _ := EnrichStage.Apply(kitsune.FromSlice(events)).Collect(ctx)
See the stages example for the full stage composition and testing pattern.
Where to go next
Reference: - Operator catalog: every operator with signature and description - Testing guide: mocking clients, error paths, time-sensitive operators, and testkit reference - Tuning guide: buffer sizing, concurrency, batching, memory trade-offs - Benchmarks: throughput numbers on Apple M1
Deeper understanding: - Internals: DAG construction, runtime compilation, concurrency models, supervision, graceful drain
External systems: - Tails: connecting to Kafka, Redis, S3, Postgres, and 18 more systems
Live observability: - Inspector: real-time web dashboard for running pipelines
Runnable examples: see the Examples page for all 20 examples with full source and Go Playground links.