Kitsune
Type-safe, concurrent data pipelines for Go.
Compose functions into stages: channels, goroutines, backpressure, and error routing are handled for you.
raw := kitsune.From(kafkaStream)
orders := kitsune.Map(raw, parseOrder)
enriched := kitsune.Map(orders, enrichWithCustomer, kitsune.Concurrency(20))
batched := kitsune.Batch(enriched, 500, kitsune.BatchTimeout(2*time.Second))
err := batched.ForEach(bulkInsert, kitsune.Concurrency(4)).Run(ctx)
What Kitsune handles for you
-
Bounded channels between every stage. A slow consumer blocks its upstream: no unbounded queuing, no dropped items.
-
Pipeline[T]carries its element type through the graph. Every stage transition is checked at compile time via Go generics. -
Add
Concurrency(20)to any stage to spin up parallel workers. Preserve arrival order withOrdered(). -
Per-stage
OnError:Skip,Retrywith exponential backoff,RetryThen,Return. UseMapResultto route errored items to a separate pipeline. Errors are values, not panics. -
MetricsHook,LogHook(structuredslog), and a live inspector dashboard. OTel, Prometheus, and Datadog via tails. -
Kafka, NATS, RabbitMQ, Postgres, Redis, S3, MongoDB, ClickHouse, SQS, Kinesis, Pub/Sub, and more. Each a separate module via tails.
Live inspector

Add one line to any running pipeline to open a real-time web dashboard with a live DAG, per-stage metrics, and stop/restart controls. See the inspector guide →
Where to go next
-
Mental model, first pipeline, concurrency, error handling, branching, and testing in ~10 minutes.
-
Every source, transform, terminal, and option: with exact signatures and descriptions.
-
Connect pipelines to Kafka, Redis, S3, Postgres, NATS, SQS, MongoDB, and 15+ more systems.
-
Buffer sizing, concurrency settings, batching strategies, and GC-pressure trade-offs.
-
When to reach for
Concurrency(n),Ordered(),MapWithKey, orPartition: decision flowchart and worked examples. -
20 runnable examples with full source code and Go Playground links.