Skip to content

Examples

All examples use only the core go-kitsune package and run in the Go Playground. Run any locally with:

go run ./examples/<name>

At a glance


Getting started

basic

A minimal linear pipeline. Trims and uppercases strings, then collects squared numbers.

Demonstrates: FromSlice, Map, Filter, ForEach, Collect

Run in Playground View source

Full source
// Example: basic — a minimal linear pipeline.
//
// Demonstrates: FromSlice, Map, Filter, ForEach, Collect
package main

import (
    "context"
    "fmt"
    "strings"

    kitsune "github.com/zenbaku/go-kitsune"
)

func main() {
    ctx := context.Background()

    // --- ForEach: emit lines as we go ---

    words := []string{"  hello  ", "  world  ", "  kitsune  ", "  go  "}

    trimmed := kitsune.Map(kitsune.FromSlice(words),
        func(_ context.Context, s string) (string, error) {
            return strings.TrimSpace(s), nil
        })

    long := kitsune.Filter(trimmed,
        func(_ context.Context, s string) (bool, error) {
            return len(s) > 3, nil
        })

    upper := kitsune.Map(long,
        func(_ context.Context, s string) (string, error) {
            return strings.ToUpper(s), nil
        })

    err := upper.ForEach(func(_ context.Context, s string) error {
        fmt.Println(s)
        return nil
    }).Run(ctx)
    if err != nil {
        panic(err)
    }

    // --- Collect: materialise results into a slice ---

    nums := kitsune.Map(kitsune.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8}),
        func(_ context.Context, n int) (int, error) { return n * n, nil })

    evens := kitsune.Filter(nums,
        func(_ context.Context, n int) (bool, error) { return n%2 == 0, nil })

    squares, err := kitsune.Collect(ctx, evens)
    if err != nil {
        panic(err)
    }
    fmt.Println("even squares:", squares)
}

concurrent

Parallel processing with and without ordering guarantees.

Demonstrates: Concurrency, Ordered, Buffer, WithName

Run in Playground View source

Full source
// Example: concurrent — parallel processing with and without ordering guarantees.
//
// Demonstrates: Concurrency, Ordered, Buffer, WithName
package main

import (
    "context"
    "fmt"
    "time"

    kitsune "github.com/zenbaku/go-kitsune"
)

func main() {
    ctx := context.Background()
    items := make([]int, 20)
    for i := range items {
        items[i] = i
    }

    simulate := func(_ context.Context, n int) (string, error) {
        time.Sleep(10 * time.Millisecond)
        return fmt.Sprintf("item-%02d", n), nil
    }

    // --- Unordered: fastest, output arrives in completion order ---

    fmt.Println("=== Unordered (completion order) ===")
    start := time.Now()
    unordered, err := kitsune.Collect(ctx,
        kitsune.Map(kitsune.FromSlice(items), simulate,
            kitsune.Concurrency(5),
            kitsune.Buffer(32),
            kitsune.WithName("parallel"),
        ))
    if err != nil {
        panic(err)
    }
    fmt.Printf("  processed %d items in %v\n\n", len(unordered), time.Since(start).Round(time.Millisecond))

    // --- Ordered: same concurrency, output in original input order ---

    fmt.Println("=== Ordered (input order preserved) ===")
    start = time.Now()
    ordered, err := kitsune.Collect(ctx,
        kitsune.Map(kitsune.FromSlice(items), simulate,
            kitsune.Concurrency(5),
            kitsune.Ordered(),
            kitsune.WithName("parallel-ordered"),
        ))
    if err != nil {
        panic(err)
    }
    fmt.Printf("  processed %d items in %v\n", len(ordered), time.Since(start).Round(time.Millisecond))
    fmt.Printf("  first=%s last=%s\n", ordered[0], ordered[len(ordered)-1])
}

pause

Temporarily stop a running pipeline without cancelling it. Sources block; in-flight items drain normally. Resume restarts emission with no data loss.

Demonstrates: RunAsync, RunHandle.Pause, RunHandle.Resume, RunHandle.Paused, NewGate, WithPauseGate

Run in Playground View source

Full source
// Example: pause — temporarily stop a running pipeline without cancelling it.
//
// Demonstrates:
//   - RunAsync + RunHandle.Pause / Resume / Paused
//   - NewGate + WithPauseGate for use with blocking Runner.Run
//   - Behaviour during pause: sources block, in-flight items drain normally
package main

import (
    "context"
    "fmt"
    "time"

    kitsune "github.com/zenbaku/go-kitsune"
)

func main() {
    ctx := context.Background()

    // --- RunAsync: pause/resume via RunHandle ---

    fmt.Println("=== RunAsync: pause and resume via RunHandle ===")

    src := kitsune.NewChannel[int](4)
    processed := kitsune.Map(src.Source(), func(_ context.Context, n int) (string, error) {
        time.Sleep(10 * time.Millisecond)
        return fmt.Sprintf("item-%02d", n), nil
    }, kitsune.WithName("process"))

    var received []string
    handle := processed.ForEach(func(_ context.Context, s string) error {
        received = append(received, s)
        return nil
    }).RunAsync(ctx)

    for i := 1; i <= 5; i++ {
        src.Send(ctx, i) //nolint
    }
    time.Sleep(100 * time.Millisecond)

    handle.Pause()
    fmt.Printf("paused: %v\n", handle.Paused())

    go func() {
        for i := 6; i <= 8; i++ {
            src.Send(ctx, i) //nolint
        }
    }()

    time.Sleep(80 * time.Millisecond)
    fmt.Printf("processed while paused: %d items total\n", len(received))

    handle.Resume()
    fmt.Printf("resumed: %v\n", handle.Paused())
    time.Sleep(100 * time.Millisecond)

    src.Close()
    if err := handle.Wait(); err != nil {
        panic(err)
    }
    fmt.Printf("total processed: %d items\n", len(received))

    // --- Gate + WithPauseGate: pause a blocking Runner.Run ---

    fmt.Println("\n=== Gate + WithPauseGate: pause a blocking Run ===")

    gate := kitsune.NewGate()
    nums := kitsune.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
    pipeline := kitsune.Map(nums, func(_ context.Context, n int) (string, error) {
        time.Sleep(5 * time.Millisecond)
        return fmt.Sprintf("n=%d", n), nil
    }, kitsune.WithName("map"))

    var out []string
    r := pipeline.ForEach(func(_ context.Context, s string) error {
        out = append(out, s)
        return nil
    })

    go func() {
        time.Sleep(30 * time.Millisecond)
        gate.Pause()
        fmt.Println("gate paused")
        time.Sleep(50 * time.Millisecond)
        gate.Resume()
        fmt.Println("gate resumed")
    }()

    if err := r.Run(ctx, kitsune.WithPauseGate(gate)); err != nil {
        panic(err)
    }
    fmt.Printf("total: %d items\n", len(out))
}

Fan-out & Fan-in

fanout

Split a stream into two typed branches and run them concurrently.

Demonstrates: Partition, MergeRunners

Run in Playground View source

Full source
// Example: fanout — split a stream and run each branch concurrently.
//
// Demonstrates: Partition, MergeRunners
package main

import (
    "context"
    "fmt"
    "sync"

    kitsune "github.com/zenbaku/go-kitsune"
)

func main() {
    ctx := context.Background()

    nums := kitsune.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})

    // Partition splits the stream into two typed pipelines based on a predicate.
    evens, odds := kitsune.Partition(nums, func(n int) bool { return n%2 == 0 })

    var mu sync.Mutex
    var evenResults, oddResults []int

    // MergeRunners starts both branches from the same shared source and waits
    // for both to finish. All branches must complete before Run returns.
    merged, err := kitsune.MergeRunners(
        evens.ForEach(func(_ context.Context, n int) error {
            mu.Lock()
            evenResults = append(evenResults, n)
            mu.Unlock()
            return nil
        }),
        odds.ForEach(func(_ context.Context, n int) error {
            mu.Lock()
            oddResults = append(oddResults, n)
            mu.Unlock()
            return nil
        }),
    )
    if err != nil {
        panic(err)
    }
    if err := merged.Run(ctx); err != nil {
        panic(err)
    }

    fmt.Println("evens:", evenResults)
    fmt.Println("odds: ", oddResults)
}

broadcast

Fan a single stream out to N independent consumers; each sees every item.

Demonstrates: Broadcast, MergeRunners

Run in Playground View source

Full source
// Example: broadcast — fan-out a single stream to N independent consumers.
//
// Demonstrates: Broadcast, MergeRunners
package main

import (
    "context"
    "fmt"
    "sync"

    kitsune "github.com/zenbaku/go-kitsune"
)

func main() {
    ctx := context.Background()

    events := kitsune.FromSlice([]string{"login", "purchase", "logout", "search", "purchase"})

    // Broadcast fans the stream out to 3 independent channels. Each consumer
    // sees every item. The source is consumed at the speed of the slowest consumer.
    branches := kitsune.Broadcast(events, 3)

    var mu sync.Mutex
    counts := make([]int, 3)

    runners := make([]kitsune.Runnable, 3)
    for i, branch := range branches {
        i, branch := i, branch
        runners[i] = branch.ForEach(func(_ context.Context, s string) error {
            mu.Lock()
            counts[i]++
            mu.Unlock()
            return nil
        })
    }

    merged, err := kitsune.MergeRunners(runners...)
    if err != nil {
        panic(err)
    }
    if err := merged.Run(ctx); err != nil {
        panic(err)
    }

    for i, c := range counts {
        fmt.Printf("consumer %d received %d items\n", i, c)
    }
}

share

Multicast a stream to a dynamically-built subscriber list — consumers registered one at a time with independent options.

Demonstrates: Share, per-branch Buffer and WithName, MergeRunners

Run in Playground View source

Full source
// Example: share — multicast a stream to a dynamically-built subscriber list.
//
// Share lets you register consumers one at a time, each with its own options.
// Unlike Broadcast, the number of consumers doesn't need to be fixed upfront.
package main

import (
    "context"
    "fmt"
    "sync"
    "sync/atomic"

    kitsune "github.com/zenbaku/go-kitsune"
)

type OrderEvent struct {
    ID     int
    Amount float64
}

func main() {
    ctx := context.Background()

    orders := kitsune.FromSlice([]OrderEvent{
        {ID: 1, Amount: 49.99},
        {ID: 2, Amount: 1250.00},
        {ID: 3, Amount: 7.50},
        {ID: 4, Amount: 3400.00},
        {ID: 5, Amount: 22.00},
    })

    subscribe := kitsune.Share(orders)

    audit   := subscribe(kitsune.WithName("audit"), kitsune.Buffer(256))
    metrics := subscribe(kitsune.WithName("metrics"))
    fraud   := subscribe(kitsune.WithName("fraud-detection"))

    var auditLog []string
    var mu sync.Mutex
    auditRunner := audit.ForEach(func(_ context.Context, o OrderEvent) error {
        mu.Lock()
        auditLog = append(auditLog, fmt.Sprintf("order #%d: $%.2f", o.ID, o.Amount))
        mu.Unlock()
        return nil
    })

    var totalRevenue atomic.Value
    totalRevenue.Store(0.0)
    var eventCount atomic.Int64
    metricsRunner := metrics.ForEach(func(_ context.Context, o OrderEvent) error {
        eventCount.Add(1)
        for {
            old := totalRevenue.Load().(float64)
            if totalRevenue.CompareAndSwap(old, old+o.Amount) {
                break
            }
        }
        return nil
    })

    var flagged atomic.Int64
    fraudRunner := fraud.ForEach(func(_ context.Context, o OrderEvent) error {
        if o.Amount > 1000 {
            flagged.Add(1)
            fmt.Printf("fraud alert: order #%d amount $%.2f\n", o.ID, o.Amount)
        }
        return nil
    })

    merged, _ := kitsune.MergeRunners(auditRunner, metricsRunner, fraudRunner)
    merged.Run(ctx)

    fmt.Println("\n--- audit log ---")
    for _, entry := range auditLog {
        fmt.Println(" ", entry)
    }
    fmt.Printf("\nevents: %d  revenue: $%.2f  flagged: %d\n",
        eventCount.Load(), totalRevenue.Load().(float64), flagged.Load())
}

Composition

stages

Define reusable, composable pipeline fragments with Stage[I,O].

Demonstrates: Stage[I,O], Then, Through, Stage.Or

Run in Playground View source

Full source
// Example: stages — composable, reusable pipeline transformers.
//
// Demonstrates: Stage[I,O], Then, Through, Stage.Or
package main

import (
    "context"
    "fmt"
    "strconv"
    "strings"

    kitsune "github.com/zenbaku/go-kitsune"
)

var ParseInt kitsune.Stage[string, int] = func(p *kitsune.Pipeline[string]) *kitsune.Pipeline[int] {
    return kitsune.Map(p, func(_ context.Context, s string) (int, error) {
        return strconv.Atoi(strings.TrimSpace(s))
    })
}

var Double kitsune.Stage[int, int] = func(p *kitsune.Pipeline[int]) *kitsune.Pipeline[int] {
    return kitsune.Map(p, func(_ context.Context, n int) (int, error) { return n * 2, nil })
}

var Stringify kitsune.Stage[int, string] = func(p *kitsune.Pipeline[int]) *kitsune.Pipeline[string] {
    return kitsune.Map(p, func(_ context.Context, n int) (string, error) {
        return fmt.Sprintf("result=%d", n), nil
    })
}

var Uppercase kitsune.Stage[string, string] = func(p *kitsune.Pipeline[string]) *kitsune.Pipeline[string] {
    return kitsune.Map(p, func(_ context.Context, s string) (string, error) {
        return strings.ToUpper(s), nil
    })
}

func main() {
    ctx := context.Background()
    input := kitsune.FromSlice([]string{"1", "2", "3", "4", "5"})

    // Then: compose two stages into one
    pipeline := kitsune.Then(kitsune.Then(ParseInt, Double), Stringify)
    results, _ := kitsune.Collect(ctx, pipeline(input))
    fmt.Println("ParseInt → Double → Stringify:", strings.Join(results, "  "))

    // Through: apply a Stage[T,T] as a method
    out, _ := kitsune.Collect(ctx,
        kitsune.FromSlice([]string{"hello", "world"}).Through(Uppercase))
    fmt.Println("Uppercase:", out)

    // Or: primary with fallback
    var primary kitsune.Stage[int, string] = func(p *kitsune.Pipeline[int]) *kitsune.Pipeline[string] {
        return kitsune.Map(p, func(_ context.Context, n int) (string, error) {
            if n%2 == 0 {
                return "", fmt.Errorf("primary failed for %d", n)
            }
            return fmt.Sprintf("primary:%d", n), nil
        })
    }
    var fallback kitsune.Stage[int, string] = func(p *kitsune.Pipeline[int]) *kitsune.Pipeline[string] {
        return kitsune.Map(p, func(_ context.Context, n int) (string, error) {
            return fmt.Sprintf("fallback:%d", n), nil
        })
    }
    orResults, _ := kitsune.Collect(ctx, primary.Or(fallback)(kitsune.FromSlice([]int{1, 2, 3, 4, 5})))
    fmt.Println("Or:", strings.Join(orResults, "  "))
}

Error handling

circuitbreaker

Protect a flaky dependency: after N consecutive failures the circuit opens, fast-failing subsequent items without calling the backend.

Demonstrates: CircuitBreaker, FailureThreshold, CooldownDuration, HalfOpenProbes, ErrCircuitOpen

Run in Playground View source

Full source
// Example: circuitbreaker — protect a flaky dependency with a circuit breaker.
package main

import (
    "context"
    "errors"
    "fmt"

    kitsune "github.com/zenbaku/go-kitsune"
)

func main() {
    ctx := context.Background()

    callCount := 0
    backend := func(_ context.Context, n int) (string, error) {
        callCount++
        if n >= 3 && n <= 7 {
            return "", fmt.Errorf("backend error on item %d", n)
        }
        return fmt.Sprintf("ok-%d", n), nil
    }

    items := kitsune.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
    out := kitsune.CircuitBreaker(items, backend,
        []kitsune.CircuitBreakerOpt{
            kitsune.FailureThreshold(3),
            kitsune.CooldownDuration(0),
            kitsune.HalfOpenProbes(1),
        },
        kitsune.OnError(kitsune.Skip()),
        kitsune.WithName("backend"),
    )

    results, err := kitsune.Collect(ctx, out)
    if err != nil {
        panic(err)
    }

    fmt.Println("results:", results)
    fmt.Printf("backend called %d times (circuit blocked some calls)\n", callCount)

    errBackend := func(_ context.Context, n int) (string, error) {
        return "", errors.New("always fails")
    }
    err = kitsune.CircuitBreaker(kitsune.FromSlice([]int{1, 2, 3}), errBackend,
        []kitsune.CircuitBreakerOpt{kitsune.FailureThreshold(2)},
    ).Drain().Run(ctx)

    if errors.Is(err, kitsune.ErrCircuitOpen) {
        fmt.Println("pipeline stopped with ErrCircuitOpen")
    }
}

timeout

Enforce per-item deadlines on slow stages. Timed-out items can be skipped or replaced with a default value.

Demonstrates: Timeout, OnError(Skip()), OnError(Return(...))

Run in Playground View source

Full source
// Example: timeout — enforce per-item deadlines on slow stages.
package main

import (
    "context"
    "fmt"
    "time"

    kitsune "github.com/zenbaku/go-kitsune"
)

func main() {
    ctx := context.Background()

    delays := []time.Duration{
        5 * time.Millisecond, 50 * time.Millisecond,
        5 * time.Millisecond, 80 * time.Millisecond,
        5 * time.Millisecond, 30 * time.Millisecond,
        5 * time.Millisecond,
    }

    slow := func(ctx context.Context, d time.Duration) (string, error) {
        select {
        case <-time.After(d):
            return fmt.Sprintf("done in %v", d), nil
        case <-ctx.Done():
            return "", ctx.Err()
        }
    }

    // Skip timed-out items
    results, _ := kitsune.Collect(ctx,
        kitsune.Map(kitsune.FromSlice(delays), slow,
            kitsune.Timeout(20*time.Millisecond),
            kitsune.OnError(kitsune.Skip()),
        ))
    fmt.Printf("Skip: received %d of %d items\n", len(results), len(delays))

    // Replace timed-out items with a default value
    results2, _ := kitsune.Collect(ctx,
        kitsune.Map(kitsune.FromSlice(delays), slow,
            kitsune.Timeout(20*time.Millisecond),
            kitsune.OnError(kitsune.Return("timed out")),
        ))
    fmt.Printf("Return: received %d items\n", len(results2))
    for _, r := range results2 {
        fmt.Println(" ", r)
    }
}

Stateful processing

runningtotal

Accumulate a running total across a stream using MapWith — one shared Ref for the lifetime of the run.

Demonstrates: NewKey, MapWith, Ref.UpdateAndGet

Run in Playground View source

Full source
// Example: runningtotal — shared mutable state with MapWith.
package main

import (
    "context"
    "fmt"

    kitsune "github.com/zenbaku/go-kitsune"
)

type Tx struct {
    ID     int
    Amount int
}

type Summary struct {
    ID, Amount, Total, Max, Count int
}

type totals struct{ sum, max, count int }

var runningTotalsKey = kitsune.NewKey[totals]("totals", totals{})

func buildPipeline(txns []Tx) *kitsune.Pipeline[Summary] {
    return kitsune.MapWith(
        kitsune.FromSlice(txns),
        runningTotalsKey,
        func(ctx context.Context, ref *kitsune.Ref[totals], tx Tx) (Summary, error) {
            s, _ := ref.UpdateAndGet(ctx, func(t totals) (totals, error) {
                t.sum += tx.Amount
                t.count++
                if tx.Amount > t.max {
                    t.max = tx.Amount
                }
                return t, nil
            })
            return Summary{tx.ID, tx.Amount, s.sum, s.max, s.count}, nil
        },
    )
}

func main() {
    ctx := context.Background()
    txns := []Tx{{1, 120}, {2, 45}, {3, 300}, {4, 80}, {5, 210}}

    results, _ := kitsune.Collect(ctx, buildPipeline(txns))
    for _, s := range results {
        fmt.Printf("tx#%d amount=%-4d  total=%-5d max=%-4d count=%d\n",
            s.ID, s.Amount, s.Total, s.Max, s.Count)
    }
}

keyedstate

Per-entity stateful processing. Items are routed by key hash so each entity's state never crosses goroutine boundaries — the in-process actor model.

Demonstrates: MapWithKey, Concurrency with key-sharded routing, Ref.UpdateAndGet

Run in Playground View source

Full source
// Example: keyedstate — per-entity stateful processing with key-sharded concurrency.
package main

import (
    "context"
    "fmt"
    "time"

    kitsune "github.com/zenbaku/go-kitsune"
)

type Event struct {
    UserID string
    Amount int
}

var totalKey = kitsune.NewKey[int]("user_total", 0)

func main() {
    ctx := context.Background()
    events := []Event{
        {"alice", 100}, {"bob", 200}, {"carol", 50},
        {"dave", 300}, {"alice", 150}, {"bob", 100},
        {"carol", 200}, {"alice", 75}, {"dave", 50},
    }

    keyFn := func(e Event) string { return e.UserID }
    mapFn := func(ctx context.Context, ref *kitsune.Ref[int], e Event) (string, error) {
        total, _ := ref.UpdateAndGet(ctx, func(t int) (int, error) {
            return t + e.Amount, nil
        })
        return fmt.Sprintf("%-8s total=%d", e.UserID, total), nil
    }

    // Serial: one goroutine
    start := time.Now()
    results, _ := kitsune.Collect(ctx, kitsune.MapWithKey(
        kitsune.FromSlice(events), keyFn, totalKey, mapFn))
    fmt.Printf("Serial: %d events in %v\n", len(results), time.Since(start).Round(time.Microsecond))
    for _, r := range results {
        fmt.Println(" ", r)
    }

    // Concurrent(4): 4 workers, items routed by hash(userID) % 4
    start = time.Now()
    results2, _ := kitsune.Collect(ctx, kitsune.MapWithKey(
        kitsune.FromSlice(events), keyFn, totalKey, mapFn,
        kitsune.Concurrency(4)))
    fmt.Printf("\nConcurrent(4): %d events in %v\n", len(results2), time.Since(start).Round(time.Microsecond))
}

caching

Skip redundant work with CacheBy. Cache misses call the function; hits return the stored result immediately.

Demonstrates: CacheBy, CacheTTL, MemoryCache, WithCache

Run in Playground View source

Full source
// Example: caching — skip redundant work with CacheBy + MemoryCache.
package main

import (
    "context"
    "fmt"
    "time"

    kitsune "github.com/zenbaku/go-kitsune"
)

func main() {
    ctx := context.Background()

    calls := 0
    expensive := func(_ context.Context, id string) (string, error) {
        calls++
        time.Sleep(5 * time.Millisecond)
        return "data-for-" + id, nil
    }

    items := kitsune.FromSlice([]string{"a", "b", "a", "c", "b", "a"})
    results := kitsune.Map(items, expensive,
        kitsune.CacheBy(func(s string) string { return s },
            kitsune.CacheTTL(5*time.Minute),
            kitsune.CacheBackend(kitsune.MemoryCache(128)),
        ),
        kitsune.WithName("fetch"),
    )

    out, _ := kitsune.Collect(ctx, results)
    fmt.Println("results:", out)
    fmt.Printf("fn called %d times for %d items (%d cache hits)\n",
        calls, len(out), len(out)-calls)
}

Rate limiting

ratelimit

Throttle pipeline throughput with a token bucket. RateLimitWait applies backpressure; RateLimitDrop silently discards excess items.

Demonstrates: RateLimit, RateLimitWait, RateLimitDrop, Burst

Run in Playground View source

Full source
// Example: ratelimit — throttle pipeline throughput with a token bucket.
package main

import (
    "context"
    "fmt"
    "time"

    kitsune "github.com/zenbaku/go-kitsune"
)

func main() {
    ctx := context.Background()
    items := make([]int, 20)
    for i := range items { items[i] = i }

    // RateLimitWait: backpressure — blocks until a token is available
    start := time.Now()
    results, _ := kitsune.Collect(ctx,
        kitsune.RateLimit(kitsune.FromSlice(items[:10]), 200,
            []kitsune.RateLimitOpt{kitsune.Burst(5)},
        ))
    fmt.Printf("Wait: processed %d items in %v\n", len(results), time.Since(start).Round(time.Millisecond))

    // RateLimitDrop: excess items are silently discarded
    fast := kitsune.FromSlice(items)
    dropped, _ := kitsune.Collect(ctx, kitsune.RateLimit(fast, 5,
        []kitsune.RateLimitOpt{kitsune.RateMode(kitsune.RateLimitDrop), kitsune.Burst(3)},
    ))
    fmt.Printf("Drop: received %d of %d items (%d dropped)\n",
        len(dropped), len(items), len(items)-len(dropped))
}

perkeyratelimit

Per-entity rate limiting: each user gets an independent token budget. Key-sharded routing ensures per-user state never crosses goroutine boundaries — no mutex required.

Demonstrates: MapWithKey, Ref.UpdateAndGet, Concurrency with key sharding

Run in Playground View source

Full source
// Example: perkeyratelimit — per-entity rate limiting with MapWithKey.
//
// Each user is allowed at most 3 requests per window (every 5 ticks).
package main

import (
    "context"
    "fmt"
    "sort"

    kitsune "github.com/zenbaku/go-kitsune"
)

type Request struct{ UserID string; Tick int }
type Result  struct{ UserID string; Tick, Window int; Status string }
type bucket  struct{ windowStart, count int; lastAccepted bool }

const (windowSize = 5; rateLimit = 3)

var rateLimitKey = kitsune.NewKey[bucket]("rate_limit", bucket{})

func main() {
    ctx := context.Background()

    requests := []Request{
        {"alice", 0}, {"alice", 1}, {"alice", 2}, {"alice", 3},
        {"bob", 1}, {"bob", 2},
        {"carol", 0},
        {"alice", 5}, {"alice", 6},
        {"bob", 5}, {"bob", 6}, {"bob", 7}, {"bob", 8}, {"bob", 9},
    }

    results, _ := kitsune.Collect(ctx, kitsune.MapWithKey(
        kitsune.FromSlice(requests),
        func(r Request) string { return r.UserID },
        rateLimitKey,
        func(ctx context.Context, ref *kitsune.Ref[bucket], r Request) (Result, error) {
            window := r.Tick / windowSize
            b, _ := ref.UpdateAndGet(ctx, func(b bucket) (bucket, error) {
                if window != b.windowStart {
                    b = bucket{windowStart: window}
                }
                if b.count < rateLimit {
                    b.count++; b.lastAccepted = true
                } else {
                    b.lastAccepted = false
                }
                return b, nil
            })
            st := "rejected"
            if b.lastAccepted { st = "accepted" }
            return Result{r.UserID, r.Tick, window, st}, nil
        },
        kitsune.Concurrency(4),
    ))

    sort.Slice(results, func(i, j int) bool {
        if results[i].UserID != results[j].UserID {
            return results[i].UserID < results[j].UserID
        }
        return results[i].Tick < results[j].Tick
    })

    for _, r := range results {
        fmt.Printf("  %-6s tick=%-2d window=%d  %s\n", r.UserID, r.Tick, r.Window, r.Status)
    }
}

Push sources

channel

Create a push-based source with NewChannel. External producers call Send while the pipeline runs in the background.

Demonstrates: NewChannel, Channel.Send, Channel.Close, RunAsync, RunHandle

Run in Playground View source

Full source
// Example: channel — push-based source with external producers.
package main

import (
    "context"
    "fmt"
    "sync"

    kitsune "github.com/zenbaku/go-kitsune"
)

func main() {
    ctx := context.Background()

    src := kitsune.NewChannel[int](16)
    doubled := kitsune.Map(src.Source(),
        func(_ context.Context, n int) (int, error) { return n * 2, nil })

    var mu sync.Mutex
    var results []int

    handle := doubled.ForEach(func(_ context.Context, n int) error {
        mu.Lock(); results = append(results, n); mu.Unlock()
        return nil
    }).RunAsync(ctx)

    for i := 1; i <= 10; i++ {
        if err := src.Send(ctx, i); err != nil {
            panic(err)
        }
    }
    src.Close()

    if err := handle.Wait(); err != nil {
        panic(err)
    }
    fmt.Println("results:", results)
}

Time-based

ticker

Time-based sources: periodic ticks and one-shot timers.

Demonstrates: Ticker, Timer, Take, Map

View source

Full source
// Example: ticker — time-based sources and limiting.
package main

import (
    "context"
    "fmt"
    "time"

    kitsune "github.com/zenbaku/go-kitsune"
)

func main() {
    ctx := context.Background()

    // Ticker: emits time.Time, take first 5
    ticks, _ := kitsune.Collect(ctx,
        kitsune.Map(
            kitsune.Take(kitsune.Ticker(20*time.Millisecond), 5),
            func(_ context.Context, t time.Time) (string, error) {
                return t.Format("15:04:05.000"), nil
            }))
    fmt.Println("ticks:", ticks)

    // Ticker as counter: map time.Time to a monotonic index
    var i int64
    counts, _ := kitsune.Collect(ctx,
        kitsune.Map(
            kitsune.Take(kitsune.Ticker(20*time.Millisecond), 4),
            func(_ context.Context, _ time.Time) (int64, error) {
                n := i; i++; return n, nil
            }))
    fmt.Println("counts:", counts)

    // Timer: one-shot after a delay
    msg, _ := kitsune.Collect(ctx,
        kitsune.Timer(20*time.Millisecond, func() string { return "fired!" }))
    fmt.Println("timer:", msg)
}

switchmap

Cancel in-progress work when a newer item arrives. Models type-ahead search or live-reload: only the last item's result is emitted.

Demonstrates: SwitchMap cancellation semantics vs FlatMap

Run in Playground View source

Full source
// Example: switchmap — cancel in-progress work when a newer item arrives.
package main

import (
    "context"
    "fmt"
    "time"

    kitsune "github.com/zenbaku/go-kitsune"
)

func main() {
    ctx := context.Background()
    queries := kitsune.FromSlice([]string{"g", "go", "gol", "golang"})

    search := func(ctx context.Context, query string, emit func(string) error) error {
        select {
        case <-time.After(30 * time.Millisecond):
            return emit(fmt.Sprintf("results for %q", query))
        case <-ctx.Done():
            return nil // cancelled by a newer query
        }
    }

    // SwitchMap: only the last query completes
    results, _ := kitsune.Collect(ctx, kitsune.SwitchMap(queries, search))
    fmt.Println("SwitchMap:", results)

    // FlatMap: all queries complete (no cancellation)
    allResults, _ := kitsune.Collect(ctx,
        kitsune.FlatMap(kitsune.FromSlice([]string{"g", "go", "gol", "golang"}),
            func(_ context.Context, query string, emit func(string) error) error {
                time.Sleep(5 * time.Millisecond)
                return emit(fmt.Sprintf("results for %q", query))
            }))
    fmt.Println("FlatMap:", allResults)
}

Deduplication

bloomdedup

Probabilistic global deduplication with a Bloom filter: bounded memory regardless of key-space size, with a configurable false-positive rate.

Demonstrates: BloomDedupSet, WithDedupSet, DistinctBy, DedupeBy

Run in Playground View source

Full source
// Example: bloomdedup — probabilistic deduplication with a Bloom filter.
package main

import (
    "context"
    "fmt"

    kitsune "github.com/zenbaku/go-kitsune"
)

func main() {
    ctx := context.Background()

    // DistinctBy with a Bloom filter (1% FP rate, 1000 expected keys)
    events := kitsune.FromSlice([]string{
        "login:alice", "purchase:bob", "login:alice",
        "logout:carol", "purchase:bob",
        "login:dave",
    })

    set := kitsune.BloomDedupSet(1_000, 0.01)
    unique := kitsune.DistinctBy(events, func(e string) string { return e },
        kitsune.WithDedupSet(set),
    )

    results, _ := kitsune.Collect(ctx, unique)
    fmt.Println("distinct events:", results)

    // Shared filter: keys from run1 are remembered in run2
    sharedSet := kitsune.BloomDedupSet(10_000, 0.01)
    kitsune.Collect(ctx, kitsune.DistinctBy(kitsune.FromSlice([]int{1, 2, 3}),
        func(n int) int { return n }, kitsune.WithDedupSet(sharedSet)))

    newOnly, _ := kitsune.Collect(ctx, kitsune.DistinctBy(
        kitsune.FromSlice([]int{2, 3, 4, 5}),
        func(n int) int { return n },
        kitsune.WithDedupSet(sharedSet),
    ))
    fmt.Println("new items in run 2:", newOnly)
}

Observability

hooks

Attach a MetricsHook and LogHook to a pipeline to collect per-stage throughput, error counts, and average latency.

Demonstrates: WithHook, LogHook, MetricsHook, MultiHook, MetricsSnapshot

Run in Playground View source

Full source
// Example: hooks — observability with LogHook and MetricsHook.
package main

import (
    "context"
    "fmt"
    "log/slog"
    "os"
    "time"

    kitsune "github.com/zenbaku/go-kitsune"
)

func main() {
    ctx := context.Background()

    metrics := kitsune.NewMetricsHook()
    logger  := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))

    slow := func(_ context.Context, n int) (int, error) {
        time.Sleep(2 * time.Millisecond)
        return n * n, nil
    }

    items    := kitsune.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8})
    squared  := kitsune.Map(items, slow, kitsune.WithName("square"))
    filtered := kitsune.Filter(squared,
        func(_ context.Context, n int) (bool, error) { return n > 10, nil },
        kitsune.WithName("filter"),
    )

    results, _ := kitsune.Collect(ctx, filtered,
        kitsune.WithHook(kitsune.MultiHook(metrics, kitsune.LogHook(logger))),
    )
    fmt.Println("\nresults:", results)

    fmt.Println("\n=== Stage Metrics ===")
    for _, s := range metrics.Snapshot().Stages {
        fmt.Printf("  %-12s  processed=%d  errors=%d  avgLatency=%v\n",
            s.Stage, s.Processed, s.Errors, s.AvgLatency().Round(time.Microsecond))
    }
}

inspector

A live web dashboard showing the pipeline DAG, per-stage metrics, and buffer fill levels. Add one line to any pipeline.

Interactive — run locally

The inspector starts an HTTP server and runs indefinitely. It is excluded from automated tests.

go run ./examples/inspector

View source