Beyond Goroutines: Advanced Concurrency Patterns in Go
One of the reasons I fell in love with Go is its elegant approach to concurrency. While most developers get comfortable with basic goroutines and channels, the true power of Go’s concurrency model reveals itself when you start applying advanced patterns to solve complex problems.
In this post, I’ll walk through some of the more sophisticated concurrency patterns I’ve used in production systems that process millions of records daily.
The Limitations of “Naïve Concurrency”
First, let’s acknowledge the elephant in the room: throwing goroutines at every problem isn’t always the answer. I learned this the hard way when a seemingly innocent line of code brought down an entire service:
1// DON'T do this with unbounded input!
2for _, item := range hugeListOfItems {
3 go processItem(item)
4}
This “fire-and-forget” approach might work in demos, but in production, it can:
- Exhaust system resources (each goroutine consumes memory)
- Overwhelm downstream services
- Make error handling virtually impossible
- Provide no backpressure mechanism
Let’s look at patterns that address these issues.
Worker Pools: Controlled Concurrency
Worker pools allow you to limit the number of concurrent operations while still processing items in parallel:
1func processItems(items []Item) error {
2 numWorkers := 10
3 itemCh := make(chan Item)
4 errCh := make(chan error, len(items))
5
6 // Start worker pool
7 var wg sync.WaitGroup
8 wg.Add(numWorkers)
9 for i := 0; i < numWorkers; i++ {
10 go func() {
11 defer wg.Done()
12 for item := range itemCh {
13 if err := processItem(item); err != nil {
14 errCh <- err
15 }
16 }
17 }()
18 }
19
20 // Send items to workers
21 for _, item := range items {
22 itemCh <- item
23 }
24 close(itemCh)
25
26 // Wait for workers to finish
27 wg.Wait()
28 close(errCh)
29
30 // Collect errors
31 var errs []error
32 for err := range errCh {
33 errs = append(errs, err)
34 }
35
36 if len(errs) > 0 {
37 return fmt.Errorf("encountered %d errors: %v", len(errs), errs)
38 }
39 return nil
40}
This pattern gives you control over concurrency levels, proper error collection, and ensures all work completes before moving on.
The Context Package: Managing Cancellation
The context package is crucial for controlling the lifecycle of concurrent operations. Here’s a pattern I use for operations that need timeout controls:
1func processWithTimeout(item Item) error {
2 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
3 defer cancel()
4
5 resultCh := make(chan result, 1)
6 errCh := make(chan error, 1)
7
8 go func() {
9 res, err := processItemWithResult(item)
10 if err != nil {
11 errCh <- err
12 return
13 }
14 resultCh <- res
15 }()
16
17 select {
18 case res := <-resultCh:
19 return handleResult(res)
20 case err := <-errCh:
21 return err
22 case <-ctx.Done():
23 return fmt.Errorf("processing timed out: %w", ctx.Err())
24 }
25}
What I love about this pattern is its composability - you can pass the context through layers of function calls, allowing cancellation to propagate gracefully through your call stack.
The Fan-Out, Fan-In Pattern
When you need to process data through multiple stages in parallel, the fan-out, fan-in pattern shines:
1func processData(data []Data) ([]Result, error) {
2 // Fan-out stage
3 processCh := make(chan Data)
4 transformCh := make(chan IntermediateResult)
5 resultCh := make(chan Result)
6
7 // Step 1: Process (fan out to 5 workers)
8 var processWg sync.WaitGroup
9 processWg.Add(5)
10 for i := 0; i < 5; i++ {
11 go func() {
12 defer processWg.Done()
13 for d := range processCh {
14 ir, err := processData(d)
15 if err != nil {
16 // Error handling omitted for brevity
17 continue
18 }
19 transformCh <- ir
20 }
21 }()
22 }
23
24 // Step 2: Transform (fan out to 3 workers)
25 var transformWg sync.WaitGroup
26 transformWg.Add(3)
27 for i := 0; i < 3; i++ {
28 go func() {
29 defer transformWg.Done()
30 for ir := range transformCh {
31 r := transformData(ir)
32 resultCh <- r
33 }
34 }()
35 }
36
37 // Close channels when previous stage completes
38 go func() {
39 for _, d := range data {
40 processCh <- d
41 }
42 close(processCh)
43 processWg.Wait()
44 close(transformCh)
45 transformWg.Wait()
46 close(resultCh)
47 }()
48
49 // Fan-in: collect results
50 var results []Result
51 for r := range resultCh {
52 results = append(results, r)
53 }
54
55 return results, nil
56}
This pattern creates a pipeline of concurrent operations, where each stage can operate at different rates. I’ve used this extensively for data processing workflows where different stages have different computational requirements.
Rate Limiting with Leaky Bucket
When working with external APIs or services with rate limits, implementing a leaky bucket pattern is incredibly useful:
1type RateLimiter struct {
2 rate time.Duration
3 maxBuckets int
4 buckets chan struct{}
5}
6
7func NewRateLimiter(rps int, burstLimit int) *RateLimiter {
8 r := &RateLimiter{
9 rate: time.Second / time.Duration(rps),
10 maxBuckets: burstLimit,
11 buckets: make(chan struct{}, burstLimit),
12 }
13
14 // Fill the bucket initially
15 for i := 0; i < burstLimit; i++ {
16 r.buckets <- struct{}{}
17 }
18
19 // Refill the bucket at the defined rate
20 go func() {
21 ticker := time.NewTicker(r.rate)
22 defer ticker.Stop()
23
24 for range ticker.C {
25 select {
26 case r.buckets <- struct{}{}:
27 // Bucket refilled
28 default:
29 // Bucket full, skip
30 }
31 }
32 }()
33
34 return r
35}
36
37func (r *RateLimiter) Wait(ctx context.Context) error {
38 select {
39 case <-r.buckets:
40 return nil
41 case <-ctx.Done():
42 return ctx.Err()
43 }
44}
I use this pattern when calling external APIs that enforce rate limits. It smooths out traffic spikes and prevents HTTP 429 errors, which are the bane of any integration project.
Work Stealing for Dynamic Workloads
For workloads where some items take significantly longer to process than others, a work-stealing pattern can improve throughput:
1func processWithWorkStealing(items []Item) []Result {
2 numWorkers := runtime.GOMAXPROCS(0)
3
4 // Create queues for each worker
5 queues := make([]chan Item, numWorkers)
6 for i := range queues {
7 queues[i] = make(chan Item, len(items)/numWorkers+1)
8 }
9
10 // Distribute work initially
11 for i, item := range items {
12 queueIdx := i % numWorkers
13 queues[queueIdx] <- item
14 }
15
16 // Close all queues to signal no more initial work
17 for _, q := range queues {
18 close(q)
19 }
20
21 // Results channel
22 resultCh := make(chan Result, len(items))
23
24 var wg sync.WaitGroup
25 wg.Add(numWorkers)
26
27 // Start workers
28 for workerID, queue := range queues {
29 go func(id int, myQueue chan Item) {
30 defer wg.Done()
31
32 // Process own queue first
33 for item := range myQueue {
34 result := processItem(item)
35 resultCh <- result
36 }
37
38 // Then try to steal work from other queues
39 for i := 1; i < numWorkers; i++ {
40 otherQueueIdx := (id + i) % numWorkers
41 otherQueue := queues[otherQueueIdx]
42
43 // Try to steal any remaining work
44 for item := range otherQueue {
45 result := processItem(item)
46 resultCh <- result
47 }
48 }
49 }(workerID, queue)
50 }
51
52 // Wait for all workers to finish
53 go func() {
54 wg.Wait()
55 close(resultCh)
56 }()
57
58 // Collect results
59 var results []Result
60 for r := range resultCh {
61 results = append(results, r)
62 }
63
64 return results
65}
This pattern shines when processing heterogeneous workloads. Workers that finish their assigned tasks early can “steal” work from other queues, improving overall throughput and resource utilization.
Semaphores for Resource Control
Sometimes you need to limit concurrent access to a specific resource that isn’t about CPU utilization, like database connections or file handles. Weighted semaphores are perfect for this:
1type Semaphore struct {
2 permits chan struct{}
3}
4
5func NewSemaphore(maxConcurrent int) *Semaphore {
6 return &Semaphore{
7 permits: make(chan struct{}, maxConcurrent),
8 }
9}
10
11func (s *Semaphore) Acquire(ctx context.Context) error {
12 select {
13 case s.permits <- struct{}{}:
14 return nil
15 case <-ctx.Done():
16 return ctx.Err()
17 }
18}
19
20func (s *Semaphore) Release() {
21 <-s.permits
22}
23
24// Usage example
25func accessLimitedResource() {
26 // Allow max 10 concurrent DB connections
27 dbSemaphore := NewSemaphore(10)
28
29 for _, task := range tasks {
30 go func(t Task) {
31 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
32 defer cancel()
33
34 // Acquire permit or timeout
35 if err := dbSemaphore.Acquire(ctx); err != nil {
36 log.Printf("Failed to acquire DB connection: %v", err)
37 return
38 }
39 defer dbSemaphore.Release()
40
41 // Now access the database
42 result, err := queryDatabase(t)
43 // ...
44 }(task)
45 }
46}
I’ve used this pattern extensively to manage connections to external systems, preventing connection exhaustion and improving stability.
Error Groups for Parallel Operations
The errgroup package from golang.org/x/sync provides a clean way to manage errors across concurrent operations:
1func fetchAllUserData(ctx context.Context, userIDs []string) (map[string]UserData, error) {
2 g, ctx := errgroup.WithContext(ctx)
3
4 userDataMap := sync.Map{}
5
6 // Limit concurrency to avoid overwhelming the system
7 sem := make(chan struct{}, 10)
8
9 for _, id := range userIDs {
10 id := id // Capture for goroutine
11
12 sem <- struct{}{} // Acquire
13 g.Go(func() error {
14 defer func() { <-sem }() // Release
15
16 data, err := fetchUserData(ctx, id)
17 if err != nil {
18 return fmt.Errorf("error fetching data for user %s: %w", id, err)
19 }
20
21 userDataMap.Store(id, data)
22 return nil
23 })
24 }
25
26 // Wait for all fetches to complete or for an error
27 if err := g.Wait(); err != nil {
28 return nil, err
29 }
30
31 // Convert sync.Map to regular map
32 result := make(map[string]UserData)
33 userDataMap.Range(func(key, value interface{}) bool {
34 result[key.(string)] = value.(UserData)
35 return true
36 })
37
38 return result, nil
39}
What I love about error groups is that they propagate cancellation automatically when any goroutine returns an error, preventing unnecessary work when something has already failed.
The Power of Pipelines
Go’s channels are perfect for creating processing pipelines. Here’s a pattern I’ve used for data processing:
1func processItems(ctx context.Context, items []Item) ([]Result, error) {
2 // Create pipeline stages
3 stage1 := validateItems(ctx, items)
4 stage2 := enrichItems(ctx, stage1)
5 stage3 := transformItems(ctx, stage2)
6 results := collectResults(ctx, stage3)
7
8 return results, nil
9}
10
11func validateItems(ctx context.Context, items []Item) <-chan Item {
12 out := make(chan Item)
13
14 go func() {
15 defer close(out)
16 for _, item := range items {
17 if ctx.Err() != nil {
18 return
19 }
20
21 if valid(item) {
22 out <- item
23 }
24 }
25 }()
26
27 return out
28}
29
30func enrichItems(ctx context.Context, in <-chan Item) <-chan EnrichedItem {
31 out := make(chan EnrichedItem)
32
33 go func() {
34 defer close(out)
35 for item := range in {
36 if ctx.Err() != nil {
37 return
38 }
39
40 enriched, err := addMetadata(item)
41 if err != nil {
42 // Handle error or skip
43 continue
44 }
45
46 out <- enriched
47 }
48 }()
49
50 return out
51}
52
53// Additional stages follow similar pattern...
54
55func collectResults(ctx context.Context, in <-chan ProcessedItem) []Result {
56 var results []Result
57
58 for item := range in {
59 if ctx.Err() != nil {
60 break
61 }
62
63 results = append(results, toResult(item))
64 }
65
66 return results
67}
This pattern separates concerns cleanly, making the code more maintainable and testable. Each stage does one thing and can be tested in isolation.
Performance Considerations
When using these patterns, keep these performance considerations in mind:
Channel Size: Unbuffered channels cause synchronization overhead. For performance-critical code, benchmark different buffer sizes.
Goroutine Overhead: While goroutines are lightweight, they’re not free. Each goroutine consumes ~2KB of stack space initially. Create them judiciously.
Work Batching: Sometimes batching work can be more efficient than processing individual items, especially when dealing with I/O or network operations.
CPU-Bound vs. I/O-Bound: For CPU-bound tasks, limit concurrency to
GOMAXPROCS. For I/O-bound tasks, you can typically use higher concurrency levels.Memory Pressure: Watch for high allocations in concurrent code, as this can trigger more frequent garbage collection pauses.
Debugging Concurrent Code
Debugging concurrent code can be challenging. Here are some tools I’ve found invaluable:
Race Detector: Always run tests with
-raceto catch data races early:1go test -race ./...pprof: For profiling concurrent performance issues:
1import _ "net/http/pprof" 2 3// In your main() 4go func() { 5 log.Println(http.ListenAndServe("localhost:6060", nil)) 6}()Execution Tracing: Go’s execution tracer provides deep insights:
1f, err := os.Create("trace.out") 2if err != nil { 3 log.Fatal(err) 4} 5defer f.Close() 6 7err = trace.Start(f) 8if err != nil { 9 log.Fatal(err) 10} 11defer trace.Stop() 12 13// Your code here
Conclusion
Go’s concurrency primitives might seem simple at first, but they can be combined in powerful ways to solve complex problems. The patterns I’ve shared come from real-world experiences building systems that handle substantial load reliably.
Remember, concurrency isn’t just about making things faster—it’s about designing systems that can handle multiple tasks efficiently and gracefully. The right pattern depends on your specific needs:
- Use worker pools to control concurrency levels
- Use context for cancellation and timeouts
- Use fan-out, fan-in for parallel processing pipelines
- Use rate limiters for controlled access to resources
- Use work stealing for heterogeneous workloads
- Use semaphores for resource control
- Use error groups for parallel operations with clean error handling
- Use pipelines for multi-stage processing
I hope these patterns help you write more efficient, reliable concurrent code in Go. What’s your favorite Go concurrency pattern? Let me know in the comments!
This post is based on my experience building data processing systems that handle millions of records daily. The code examples are simplified for clarity but represent real patterns I’ve used in production systems.
