Beyond Goroutines: Advanced Concurrency Patterns in Go

One of the reasons I fell in love with Go is its elegant approach to concurrency. While most developers get comfortable with basic goroutines and channels, the true power of Go’s concurrency model reveals itself when you start applying advanced patterns to solve complex problems.

In this post, I’ll walk through some of the more sophisticated concurrency patterns I’ve used in production systems that process millions of records daily.

The Limitations of “Naïve Concurrency”

First, let’s acknowledge the elephant in the room: throwing goroutines at every problem isn’t always the answer. I learned this the hard way when a seemingly innocent line of code brought down an entire service:

1// DON'T do this with unbounded input!
2for _, item := range hugeListOfItems {
3    go processItem(item)
4}

This “fire-and-forget” approach might work in demos, but in production, it can:

  1. Exhaust system resources (each goroutine consumes memory)
  2. Overwhelm downstream services
  3. Make error handling virtually impossible
  4. Provide no backpressure mechanism

Let’s look at patterns that address these issues.

Worker Pools: Controlled Concurrency

Worker pools allow you to limit the number of concurrent operations while still processing items in parallel:

 1func processItems(items []Item) error {
 2    numWorkers := 10
 3    itemCh := make(chan Item)
 4    errCh := make(chan error, len(items))
 5    
 6    // Start worker pool
 7    var wg sync.WaitGroup
 8    wg.Add(numWorkers)
 9    for i := 0; i < numWorkers; i++ {
10        go func() {
11            defer wg.Done()
12            for item := range itemCh {
13                if err := processItem(item); err != nil {
14                    errCh <- err
15                }
16            }
17        }()
18    }
19    
20    // Send items to workers
21    for _, item := range items {
22        itemCh <- item
23    }
24    close(itemCh)
25    
26    // Wait for workers to finish
27    wg.Wait()
28    close(errCh)
29    
30    // Collect errors
31    var errs []error
32    for err := range errCh {
33        errs = append(errs, err)
34    }
35    
36    if len(errs) > 0 {
37        return fmt.Errorf("encountered %d errors: %v", len(errs), errs)
38    }
39    return nil
40}

This pattern gives you control over concurrency levels, proper error collection, and ensures all work completes before moving on.

The Context Package: Managing Cancellation

The context package is crucial for controlling the lifecycle of concurrent operations. Here’s a pattern I use for operations that need timeout controls:

 1func processWithTimeout(item Item) error {
 2    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 3    defer cancel()
 4    
 5    resultCh := make(chan result, 1)
 6    errCh := make(chan error, 1)
 7    
 8    go func() {
 9        res, err := processItemWithResult(item)
10        if err != nil {
11            errCh <- err
12            return
13        }
14        resultCh <- res
15    }()
16    
17    select {
18    case res := <-resultCh:
19        return handleResult(res)
20    case err := <-errCh:
21        return err
22    case <-ctx.Done():
23        return fmt.Errorf("processing timed out: %w", ctx.Err())
24    }
25}

What I love about this pattern is its composability - you can pass the context through layers of function calls, allowing cancellation to propagate gracefully through your call stack.

The Fan-Out, Fan-In Pattern

When you need to process data through multiple stages in parallel, the fan-out, fan-in pattern shines:

 1func processData(data []Data) ([]Result, error) {
 2    // Fan-out stage
 3    processCh := make(chan Data)
 4    transformCh := make(chan IntermediateResult)
 5    resultCh := make(chan Result)
 6    
 7    // Step 1: Process (fan out to 5 workers)
 8    var processWg sync.WaitGroup
 9    processWg.Add(5)
10    for i := 0; i < 5; i++ {
11        go func() {
12            defer processWg.Done()
13            for d := range processCh {
14                ir, err := processData(d)
15                if err != nil {
16                    // Error handling omitted for brevity
17                    continue
18                }
19                transformCh <- ir
20            }
21        }()
22    }
23    
24    // Step 2: Transform (fan out to 3 workers)
25    var transformWg sync.WaitGroup
26    transformWg.Add(3)
27    for i := 0; i < 3; i++ {
28        go func() {
29            defer transformWg.Done()
30            for ir := range transformCh {
31                r := transformData(ir)
32                resultCh <- r
33            }
34        }()
35    }
36    
37    // Close channels when previous stage completes
38    go func() {
39        for _, d := range data {
40            processCh <- d
41        }
42        close(processCh)
43        processWg.Wait()
44        close(transformCh)
45        transformWg.Wait()
46        close(resultCh)
47    }()
48    
49    // Fan-in: collect results
50    var results []Result
51    for r := range resultCh {
52        results = append(results, r)
53    }
54    
55    return results, nil
56}

This pattern creates a pipeline of concurrent operations, where each stage can operate at different rates. I’ve used this extensively for data processing workflows where different stages have different computational requirements.

Rate Limiting with Leaky Bucket

When working with external APIs or services with rate limits, implementing a leaky bucket pattern is incredibly useful:

 1type RateLimiter struct {
 2    rate       time.Duration
 3    maxBuckets int
 4    buckets    chan struct{}
 5}
 6
 7func NewRateLimiter(rps int, burstLimit int) *RateLimiter {
 8    r := &RateLimiter{
 9        rate:       time.Second / time.Duration(rps),
10        maxBuckets: burstLimit,
11        buckets:    make(chan struct{}, burstLimit),
12    }
13    
14    // Fill the bucket initially
15    for i := 0; i < burstLimit; i++ {
16        r.buckets <- struct{}{}
17    }
18    
19    // Refill the bucket at the defined rate
20    go func() {
21        ticker := time.NewTicker(r.rate)
22        defer ticker.Stop()
23        
24        for range ticker.C {
25            select {
26            case r.buckets <- struct{}{}:
27                // Bucket refilled
28            default:
29                // Bucket full, skip
30            }
31        }
32    }()
33    
34    return r
35}
36
37func (r *RateLimiter) Wait(ctx context.Context) error {
38    select {
39    case <-r.buckets:
40        return nil
41    case <-ctx.Done():
42        return ctx.Err()
43    }
44}

I use this pattern when calling external APIs that enforce rate limits. It smooths out traffic spikes and prevents HTTP 429 errors, which are the bane of any integration project.

Work Stealing for Dynamic Workloads

For workloads where some items take significantly longer to process than others, a work-stealing pattern can improve throughput:

 1func processWithWorkStealing(items []Item) []Result {
 2    numWorkers := runtime.GOMAXPROCS(0)
 3    
 4    // Create queues for each worker
 5    queues := make([]chan Item, numWorkers)
 6    for i := range queues {
 7        queues[i] = make(chan Item, len(items)/numWorkers+1)
 8    }
 9    
10    // Distribute work initially
11    for i, item := range items {
12        queueIdx := i % numWorkers
13        queues[queueIdx] <- item
14    }
15    
16    // Close all queues to signal no more initial work
17    for _, q := range queues {
18        close(q)
19    }
20    
21    // Results channel
22    resultCh := make(chan Result, len(items))
23    
24    var wg sync.WaitGroup
25    wg.Add(numWorkers)
26    
27    // Start workers
28    for workerID, queue := range queues {
29        go func(id int, myQueue chan Item) {
30            defer wg.Done()
31            
32            // Process own queue first
33            for item := range myQueue {
34                result := processItem(item)
35                resultCh <- result
36            }
37            
38            // Then try to steal work from other queues
39            for i := 1; i < numWorkers; i++ {
40                otherQueueIdx := (id + i) % numWorkers
41                otherQueue := queues[otherQueueIdx]
42                
43                // Try to steal any remaining work
44                for item := range otherQueue {
45                    result := processItem(item)
46                    resultCh <- result
47                }
48            }
49        }(workerID, queue)
50    }
51    
52    // Wait for all workers to finish
53    go func() {
54        wg.Wait()
55        close(resultCh)
56    }()
57    
58    // Collect results
59    var results []Result
60    for r := range resultCh {
61        results = append(results, r)
62    }
63    
64    return results
65}

This pattern shines when processing heterogeneous workloads. Workers that finish their assigned tasks early can “steal” work from other queues, improving overall throughput and resource utilization.

Semaphores for Resource Control

Sometimes you need to limit concurrent access to a specific resource that isn’t about CPU utilization, like database connections or file handles. Weighted semaphores are perfect for this:

 1type Semaphore struct {
 2    permits chan struct{}
 3}
 4
 5func NewSemaphore(maxConcurrent int) *Semaphore {
 6    return &Semaphore{
 7        permits: make(chan struct{}, maxConcurrent),
 8    }
 9}
10
11func (s *Semaphore) Acquire(ctx context.Context) error {
12    select {
13    case s.permits <- struct{}{}:
14        return nil
15    case <-ctx.Done():
16        return ctx.Err()
17    }
18}
19
20func (s *Semaphore) Release() {
21    <-s.permits
22}
23
24// Usage example
25func accessLimitedResource() {
26    // Allow max 10 concurrent DB connections
27    dbSemaphore := NewSemaphore(10)
28    
29    for _, task := range tasks {
30        go func(t Task) {
31            ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
32            defer cancel()
33            
34            // Acquire permit or timeout
35            if err := dbSemaphore.Acquire(ctx); err != nil {
36                log.Printf("Failed to acquire DB connection: %v", err)
37                return
38            }
39            defer dbSemaphore.Release()
40            
41            // Now access the database
42            result, err := queryDatabase(t)
43            // ...
44        }(task)
45    }
46}

I’ve used this pattern extensively to manage connections to external systems, preventing connection exhaustion and improving stability.

Error Groups for Parallel Operations

The errgroup package from golang.org/x/sync provides a clean way to manage errors across concurrent operations:

 1func fetchAllUserData(ctx context.Context, userIDs []string) (map[string]UserData, error) {
 2    g, ctx := errgroup.WithContext(ctx)
 3    
 4    userDataMap := sync.Map{}
 5    
 6    // Limit concurrency to avoid overwhelming the system
 7    sem := make(chan struct{}, 10)
 8    
 9    for _, id := range userIDs {
10        id := id // Capture for goroutine
11        
12        sem <- struct{}{} // Acquire
13        g.Go(func() error {
14            defer func() { <-sem }() // Release
15            
16            data, err := fetchUserData(ctx, id)
17            if err != nil {
18                return fmt.Errorf("error fetching data for user %s: %w", id, err)
19            }
20            
21            userDataMap.Store(id, data)
22            return nil
23        })
24    }
25    
26    // Wait for all fetches to complete or for an error
27    if err := g.Wait(); err != nil {
28        return nil, err
29    }
30    
31    // Convert sync.Map to regular map
32    result := make(map[string]UserData)
33    userDataMap.Range(func(key, value interface{}) bool {
34        result[key.(string)] = value.(UserData)
35        return true
36    })
37    
38    return result, nil
39}

What I love about error groups is that they propagate cancellation automatically when any goroutine returns an error, preventing unnecessary work when something has already failed.

The Power of Pipelines

Go’s channels are perfect for creating processing pipelines. Here’s a pattern I’ve used for data processing:

 1func processItems(ctx context.Context, items []Item) ([]Result, error) {
 2    // Create pipeline stages
 3    stage1 := validateItems(ctx, items)
 4    stage2 := enrichItems(ctx, stage1)
 5    stage3 := transformItems(ctx, stage2)
 6    results := collectResults(ctx, stage3)
 7    
 8    return results, nil
 9}
10
11func validateItems(ctx context.Context, items []Item) <-chan Item {
12    out := make(chan Item)
13    
14    go func() {
15        defer close(out)
16        for _, item := range items {
17            if ctx.Err() != nil {
18                return
19            }
20            
21            if valid(item) {
22                out <- item
23            }
24        }
25    }()
26    
27    return out
28}
29
30func enrichItems(ctx context.Context, in <-chan Item) <-chan EnrichedItem {
31    out := make(chan EnrichedItem)
32    
33    go func() {
34        defer close(out)
35        for item := range in {
36            if ctx.Err() != nil {
37                return
38            }
39            
40            enriched, err := addMetadata(item)
41            if err != nil {
42                // Handle error or skip
43                continue
44            }
45            
46            out <- enriched
47        }
48    }()
49    
50    return out
51}
52
53// Additional stages follow similar pattern...
54
55func collectResults(ctx context.Context, in <-chan ProcessedItem) []Result {
56    var results []Result
57    
58    for item := range in {
59        if ctx.Err() != nil {
60            break
61        }
62        
63        results = append(results, toResult(item))
64    }
65    
66    return results
67}

This pattern separates concerns cleanly, making the code more maintainable and testable. Each stage does one thing and can be tested in isolation.

Performance Considerations

When using these patterns, keep these performance considerations in mind:

  1. Channel Size: Unbuffered channels cause synchronization overhead. For performance-critical code, benchmark different buffer sizes.

  2. Goroutine Overhead: While goroutines are lightweight, they’re not free. Each goroutine consumes ~2KB of stack space initially. Create them judiciously.

  3. Work Batching: Sometimes batching work can be more efficient than processing individual items, especially when dealing with I/O or network operations.

  4. CPU-Bound vs. I/O-Bound: For CPU-bound tasks, limit concurrency to GOMAXPROCS. For I/O-bound tasks, you can typically use higher concurrency levels.

  5. Memory Pressure: Watch for high allocations in concurrent code, as this can trigger more frequent garbage collection pauses.

Debugging Concurrent Code

Debugging concurrent code can be challenging. Here are some tools I’ve found invaluable:

  1. Race Detector: Always run tests with -race to catch data races early:

    1go test -race ./...
    
  2. pprof: For profiling concurrent performance issues:

    1import _ "net/http/pprof"
    2
    3// In your main()
    4go func() {
    5    log.Println(http.ListenAndServe("localhost:6060", nil))
    6}()
    
  3. Execution Tracing: Go’s execution tracer provides deep insights:

     1f, err := os.Create("trace.out")
     2if err != nil {
     3    log.Fatal(err)
     4}
     5defer f.Close()
     6
     7err = trace.Start(f)
     8if err != nil {
     9    log.Fatal(err)
    10}
    11defer trace.Stop()
    12
    13// Your code here
    

Conclusion

Go’s concurrency primitives might seem simple at first, but they can be combined in powerful ways to solve complex problems. The patterns I’ve shared come from real-world experiences building systems that handle substantial load reliably.

Remember, concurrency isn’t just about making things faster—it’s about designing systems that can handle multiple tasks efficiently and gracefully. The right pattern depends on your specific needs:

  • Use worker pools to control concurrency levels
  • Use context for cancellation and timeouts
  • Use fan-out, fan-in for parallel processing pipelines
  • Use rate limiters for controlled access to resources
  • Use work stealing for heterogeneous workloads
  • Use semaphores for resource control
  • Use error groups for parallel operations with clean error handling
  • Use pipelines for multi-stage processing

I hope these patterns help you write more efficient, reliable concurrent code in Go. What’s your favorite Go concurrency pattern? Let me know in the comments!


This post is based on my experience building data processing systems that handle millions of records daily. The code examples are simplified for clarity but represent real patterns I’ve used in production systems.