← Back
后端开发 2026.03.06

Programming Basics 0006_Advanced Concurrency_sync Package and Context

后端开发

I. Detailed Explanation of the sync Package

1. sync.Mutex and sync.RWMutex

// Mutex: Mutual exclusion lock, only one goroutine can hold it at a time
var mu sync.Mutex
var count int

func increment() {
    mu.Lock()
    defer mu.Unlock()
    count++
}

// RWMutex: Read-write lock, allows multiple reads, but writes are exclusive
var rwmu sync.RWMutex
var data map[string]string

func read(key string) string {
    rwmu.RLock()         // Read lock, multiple goroutines can hold simultaneously
    defer rwmu.RUnlock()
    return data[key]
}

func write(key, val string) {
    rwmu.Lock()          // Write lock, exclusive
    defer rwmu.Unlock()
    data[key] = val
}

When to use RWMutex? In scenarios with many reads and few writes (e.g., caches, configurations). If reads and writes are roughly equal, Mutex is sufficient, as RWMutex has additional overhead.

2. sync.Once

Ensures that a certain operation is executed only once, commonly used for singleton initialization.

var (
    instance *Database
    once     sync.Once
)

func GetDB() *Database {
    once.Do(func() {
        // No matter how many goroutines call simultaneously, it executes only once
        instance = &Database{
            conn: connectDB(),
        }
        fmt.Println("数据库初始化完成")
    })
    return instance
}

func main() {
    // Concurrent calls, initializes only once
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            db := GetDB()
            _ = db
        }()
    }
    wg.Wait()
}

3. sync.Map

Concurrent-safe Map, no need for additional locking.

func main() {
    var m sync.Map

    // Store
    m.Store("name", "Alice")
    m.Store("age", 30)

    // Load
    val, ok := m.Load("name")
    if ok {
        fmt.Println(val) // Alice
    }

    // Load or Store (store if key does not exist)
    actual, loaded := m.LoadOrStore("name", "Bob")
    fmt.Println(actual, loaded) // Alice true (already exists, not stored)

    actual2, loaded2 := m.LoadOrStore("city", "Beijing")
    fmt.Println(actual2, loaded2) // Beijing false (newly stored)

    // Delete
    m.Delete("age")

    // Iterate
    m.Range(func(key, value any) bool {
        fmt.Printf("%s: %v\n", key, value)
        return true // Return false to stop iteration
    })

    // LoadAndDelete: Load and delete (Go 1.15+)
    val3, loaded3 := m.LoadAndDelete("city")
    fmt.Println(val3, loaded3) // Beijing true
}

sync.Map vs map+Mutex:

ScenarioRecommendation
Keys are relatively fixed, many reads, few writessync.Map
Frequent addition/deletion of keysmap + Mutex/RWMutex
Requires len() or iteration performancemap + Mutex/RWMutex
Different goroutines operate on different keyssync.Map

4. sync.Pool

Temporary object pool, reduces memory allocation and GC pressure. Objects may be reclaimed by GC at any time.

var bufPool = sync.Pool{
    New: func() interface{} {
        return new(bytes.Buffer) // Create a new object when the pool is empty
    },
}

func processRequest(data string) string {
    // Get from pool
    buf := bufPool.Get().(*bytes.Buffer)
    buf.Reset() // Reset state! Very important

    // Use
    buf.WriteString("处理: ")
    buf.WriteString(data)
    result := buf.String()

    // Return to pool
    bufPool.Put(buf)

    return result
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            result := processRequest(fmt.Sprintf("请求%d", id))
            _ = result
        }(i)
    }
    wg.Wait()
}

Notes: - Must Reset object state after Get - Do not assume that an object Put back will necessarily be Get-able next time (GC may clear the Pool) - Suitable for frequently created temporary objects (e.g., buffers, temporary slices) - The standard library fmt package heavily uses sync.Pool

5. sync.Cond

Condition variable, used for multiple goroutines to wait for a certain condition to be met.

type Queue struct {
    items []int
    cond  *sync.Cond
}

func NewQueue() *Queue {
    return &Queue{
        cond: sync.NewCond(&sync.Mutex{}),
    }
}

// Producer
func (q *Queue) Put(item int) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    q.items = append(q.items, item)
    q.cond.Signal() // Wake up one waiter (Broadcast wakes all)
}

// Consumer
func (q *Queue) Get() int {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    for len(q.items) == 0 {
        q.cond.Wait() // Release lock and wait, re-acquire lock when awakened
    }
    item := q.items[0]
    q.items = q.items[1:]
    return item
}

func main() {
    q := NewQueue()

    // Consumer
    go func() {
        for {
            item := q.Get()
            fmt.Println("消费:", item)
        }
    }()

    // Producer
    for i := 0; i < 10; i++ {
        q.Put(i)
        time.Sleep(200 * time.Millisecond)
    }
    time.Sleep(time.Second)
}

In real-world projects, channels are more commonly used than sync.Cond, but understanding Cond helps in understanding concurrency primitives.

6. sync.WaitGroup Advanced

func main() {
    var wg sync.WaitGroup

    urls := []string{
        "https://www.google.com",
        "https://www.github.com",
        "https://www.baidu.com",
    }

    results := make([]int, len(urls))

    for i, url := range urls {
        wg.Add(1)
        go func(idx int, u string) {
            defer wg.Done()
            resp, err := http.Get(u)
            if err != nil {
                results[idx] = -1
                return
            }
            defer resp.Body.Close()
            results[idx] = resp.StatusCode
        }(i, url)
    }

    wg.Wait()
    for i, url := range urls {
        fmt.Printf("%s -> %d\n", url, results[i])
    }
}

Common errors:

// Error 1: Add inside the goroutine
go func() {
    wg.Add(1) // Might execute after Wait!
    defer wg.Done()
}()
wg.Wait()

// Correct: Add before starting the goroutine
wg.Add(1)
go func() {
    defer wg.Done()
}()
wg.Wait()

// Error 2: Forgetting Done leads to eternal blocking
// Use defer wg.Done() to ensure execution

II. Context

1. What is Context?

Context is used to pass cancellation signals, timeout control, and request-scoped data between goroutines.

type Context interface {
    Deadline() (deadline time.Time, ok bool) // Deadline
    Done() <-chan struct{}                    // Cancellation signal channel
    Err() error                               // Reason for Done closing
    Value(key any) any                        // Request-scoped data
}

2. context.Background() and context.TODO()

// Background: Root context, never canceled, no values, no deadline
// Typically used in main functions, initialization, testing
ctx := context.Background()

// TODO: Placeholder when it's unclear which context to use
// If you see TODO during code review, it indicates a need for improvement
ctx := context.TODO()

3. context.WithCancel

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    go func(ctx context.Context) {
        for {
            select {
            case <-ctx.Done():
                fmt.Println("worker received cancellation signal:", ctx.Err())
                return
            default:
                fmt.Println("Working...")
                time.Sleep(500 * time.Millisecond)
            }
        }
    }(ctx)

    time.Sleep(2 * time.Second)
    cancel() // Send cancellation signal
    time.Sleep(100 * time.Millisecond)
    // Output: worker received cancellation signal: context canceled
}

4. context.WithTimeout and WithDeadline

// WithTimeout: Specify timeout duration
func fetchWithTimeout() {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel() // Even if not timed out, call cancel to release resources

    req, _ := http.NewRequestWithContext(ctx, "GET", "https://httpbin.org/delay/5", nil)
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        fmt.Println("Request failed:", err) // context deadline exceeded
        return
    }
    defer resp.Body.Close()
    fmt.Println("Status code:", resp.StatusCode)
}

// WithDeadline: Specify a deadline time
func fetchWithDeadline() {
    deadline := time.Now().Add(2 * time.Second)
    ctx, cancel := context.WithDeadline(context.Background(), deadline)
    defer cancel()
    // Usage is the same as WithTimeout
    _ = ctx
}

5. context.WithValue

type contextKey string

const (
    keyUserID    contextKey = "user_id"
    keyRequestID contextKey = "request_id"
)

func middleware(next http.HandlerFunc) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        // Extract information from the request, put into context
        ctx := r.Context()
        ctx = context.WithValue(ctx, keyRequestID, generateID())
        ctx = context.WithValue(ctx, keyUserID, r.Header.Get("X-User-ID"))
        next(w, r.WithContext(ctx))
    }
}

func handler(w http.ResponseWriter, r *http.Request) {
    // Get values from context
    reqID := r.Context().Value(keyRequestID).(string)
    userID := r.Context().Value(keyUserID).(string)
    fmt.Fprintf(w, "Request: %s, User: %s", reqID, userID)
}

Important: Keys should use custom unexported types (e.g., contextKey) to avoid key collisions between different packages.

6. Context Applications in Real-World Projects

// Database query with timeout
func queryUser(ctx context.Context, db *sql.DB, id int) (*User, error) {
    ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
    defer cancel()

    row := db.QueryRowContext(ctx, "SELECT id, name FROM users WHERE id = ?", id)
    var user User
    if err := row.Scan(&user.ID, &user.Name); err != nil {
        return nil, err
    }
    return &user, nil
}

// gRPC service automatically passes context
func (s *UserService) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
    // ctx automatically carries timeout and cancellation signals
    user, err := s.repo.FindByID(ctx, req.Id)
    if err != nil {
        return nil, err
    }
    return toProto(user), nil
}

// Cascading cancellation: When parent context is canceled, all child contexts are automatically canceled
func processOrder(ctx context.Context, orderID string) error {
    // Child tasks inherit parent context
    g, ctx := errgroup.WithContext(ctx)

    g.Go(func() error { return checkInventory(ctx, orderID) })
    g.Go(func() error { return chargePayment(ctx, orderID) })
    g.Go(func() error { return sendNotification(ctx, orderID) })

    return g.Wait() // Any failure automatically cancels others
}

7. Context Best Practices

RuleDescription
As the first parameterfunc DoSomething(ctx context.Context, ...)
Do not store in a structContext should be passed between functions, not as a field
Do not pass nilUse context.Background() or context.TODO()
WithValue only for request-scoped datae.g., request ID, user info, do not pass business parameters
Always call cancelEven if timed out, use defer cancel() to release resources
Do not pass the same cancel to multiple goroutinesWhoever creates it, cancels it

Context Propagation Chain Diagram

HTTP Request enters


context.Background() + WithValue(requestID)

    ├──► WithTimeout(5s) ──► Query database

    ├──► WithTimeout(3s) ──► Call gRPC service
    │                            │
    │                            ├──► Sub-query 1
    │                            └──► Sub-query 2

    └──► WithCancel() ──► Send notification (can be manually canceled)

// If any layer times out or is canceled, all downstream operations are automatically canceled