← 返回目录

第四章:并发编程

goroutine、channel 与并发模式

1. goroutine

goroutine 是 Go 的核心并发原语——一种由 Go 运行时管理的轻量级线程。启动一个 goroutine 只需在函数调用前加 go 关键字。

package main

import (
    "fmt"
    "time"
)

func sayHello(name string) {
    for i := 0; i < 3; i++ {
        fmt.Printf("[%s] 第 %d 次: Hello!\n", name, i+1)
        time.Sleep(100 * time.Millisecond)
    }
}

func main() {
    // 启动 goroutine(异步执行)
    go sayHello("goroutine-1")
    go sayHello("goroutine-2")

    // 匿名函数 goroutine
    go func(msg string) {
        fmt.Println(msg)
    }("异步消息")

    // main 函数结束时,所有 goroutine 立即终止
    // 实际项目中应使用 WaitGroup 或 channel 等待
    time.Sleep(500 * time.Millisecond)
    fmt.Println("main 结束")
}

goroutine vs 线程:

  • • goroutine 初始栈仅 2KB(线程通常 1-8MB),可轻松创建数十万个
  • • 由 Go 运行时调度,而非 OS 内核调度,上下文切换成本极低
  • • M:N 调度模型:M 个 goroutine 映射到 N 个 OS 线程上

🔄 并发模型对比

语言 并发单元 内存开销
Go goroutine ~2KB
Java Thread ~1MB
Python threading (受 GIL 限制) ~8MB
Node.js 事件循环 + Worker 单线程

2. channel

channel 是 goroutine 之间通信的管道。Go 的并发哲学是:"不要通过共享内存来通信,而要通过通信来共享内存。"

package main

import "fmt"

func main() {
    // 无缓冲 channel(同步通信)
    ch := make(chan string)

    go func() {
        ch <- "Hello from goroutine" // 发送(阻塞直到有接收者)
    }()

    msg := <-ch // 接收(阻塞直到有发送者)
    fmt.Println(msg)

    // 有缓冲 channel(异步通信)
    buffered := make(chan int, 3) // 缓冲区大小为 3
    buffered <- 1
    buffered <- 2
    buffered <- 3
    // buffered <- 4 // 缓冲区满,会阻塞

    fmt.Println(<-buffered) // 1
    fmt.Println(<-buffered) // 2

    // 关闭 channel
    jobs := make(chan int, 5)
    done := make(chan bool)

    // 消费者
    go func() {
        for {
            job, ok := <-jobs
            if !ok {
                fmt.Println("所有任务完成")
                done <- true
                return
            }
            fmt.Printf("处理任务: %d\n", job)
        }
    }()

    // 生产者
    for i := 1; i <= 3; i++ {
        jobs <- i
    }
    close(jobs) // 关闭 channel,通知消费者

    <-done // 等待消费者完成
}

// range 遍历 channel
func rangeDemo() {
    ch := make(chan int, 5)
    go func() {
        for i := 0; i < 5; i++ {
            ch <- i * i
        }
        close(ch) // 必须 close 才能结束 range
    }()

    // range 自动在 channel 关闭时退出
    for val := range ch {
        fmt.Println(val) // 0, 1, 4, 9, 16
    }
}

channel 规则:

  • • 向 nil channel 发送/接收会永远阻塞
  • • 向已关闭的 channel 发送会 panic
  • • 从已关闭的 channel 接收会立即返回零值
  • • 只有发送方应该关闭 channel,接收方不关闭

3. select

select 用于同时等待多个 channel 操作,类似于 switch 但用于 channel。

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)

    go func() {
        time.Sleep(100 * time.Millisecond)
        ch1 <- "来自 ch1"
    }()

    go func() {
        time.Sleep(200 * time.Millisecond)
        ch2 <- "来自 ch2"
    }()

    // select 等待多个 channel
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println("收到:", msg1)
        case msg2 := <-ch2:
            fmt.Println("收到:", msg2)
        }
    }

    // 超时处理
    slowCh := make(chan string)
    go func() {
        time.Sleep(2 * time.Second)
        slowCh <- "慢响应"
    }()

    select {
    case result := <-slowCh:
        fmt.Println(result)
    case <-time.After(500 * time.Millisecond):
        fmt.Println("超时!")
    }

    // default 非阻塞模式
    ch := make(chan int, 1)
    select {
    case val := <-ch:
        fmt.Println("收到:", val)
    default:
        fmt.Println("没有数据可读(非阻塞)")
    }
}

// 使用 done channel 优雅退出
func worker(done <-chan struct{}, tasks <-chan int) {
    for {
        select {
        case <-done:
            fmt.Println("收到退出信号")
            return
        case task := <-tasks:
            fmt.Printf("处理任务: %d\n", task)
        }
    }
}

4. sync 包

除了 channel,Go 还提供了传统的同步原语。适合不需要通信、只需要同步的场景。

WaitGroup

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup

    urls := []string{
        "https://example.com",
        "https://golang.org",
        "https://github.com",
    }

    for _, url := range urls {
        wg.Add(1) // 计数器 +1
        go func(u string) {
            defer wg.Done() // 计数器 -1
            fmt.Printf("正在抓取: %s\n", u)
            // 模拟网络请求...
        }(url)
    }

    wg.Wait() // 等待所有 goroutine 完成
    fmt.Println("所有请求完成")
}

Mutex 互斥锁

package main

import (
    "fmt"
    "sync"
)

// 线程安全的计数器
type SafeCounter struct {
    mu    sync.Mutex
    count map[string]int
}

func (c *SafeCounter) Inc(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count[key]++
}

func (c *SafeCounter) Value(key string) int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.count[key]
}

func main() {
    counter := SafeCounter{count: make(map[string]int)}

    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Inc("key")
        }()
    }
    wg.Wait()
    fmt.Println("最终计数:", counter.Value("key")) // 1000
}

Once 与 RWMutex

// sync.Once 确保函数只执行一次(常用于初始化)
var (
    instance *Database
    once     sync.Once
)

func GetDB() *Database {
    once.Do(func() {
        instance = connectDB()
        fmt.Println("数据库连接已创建")
    })
    return instance
}

// sync.RWMutex 读写锁(读多写少场景优化)
type Cache struct {
    mu   sync.RWMutex
    data map[string]string
}

func (c *Cache) Get(key string) (string, bool) {
    c.mu.RLock()         // 读锁(多个读可并发)
    defer c.mu.RUnlock()
    val, ok := c.data[key]
    return val, ok
}

func (c *Cache) Set(key, value string) {
    c.mu.Lock()          // 写锁(独占)
    defer c.mu.Unlock()
    c.data[key] = value
}

channel vs Mutex 选择:需要传递数据所有权时用 channel,需要保护共享状态时用 Mutex。Go 谚语:"用 channel 编排,用 Mutex 序列化。"

5. 并发模式

Fan-out / Fan-in

package main

import (
    "fmt"
    "sync"
)

// Fan-out: 一个 channel 分发给多个 goroutine
// Fan-in: 多个 channel 汇聚到一个 channel
func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for val := range c {
                merged <- val
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

Worker Pool

package main

import (
    "fmt"
    "sync"
)

func workerPool() {
    const numWorkers = 3
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    // 启动 worker
    var wg sync.WaitGroup
    for w := 0; w < numWorkers; w++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for job := range jobs {
                result := job * job // 模拟处理
                fmt.Printf("Worker %d: %d -> %d\n", id, job, result)
                results <- result
            }
        }(w)
    }

    // 发送任务
    for i := 1; i <= 10; i++ {
        jobs <- i
    }
    close(jobs)

    // 等待所有 worker 完成后关闭 results
    go func() {
        wg.Wait()
        close(results)
    }()

    // 收集结果
    for result := range results {
        fmt.Println("结果:", result)
    }
}

Pipeline 管道

package main

import "fmt"

// 生成器
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

// 平方
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

// 过滤偶数
func filterEven(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            if n%2 == 0 {
                out <- n
            }
        }
        close(out)
    }()
    return out
}

func main() {
    // Pipeline: generate -> square -> filterEven
    ch := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    squared := square(ch)
    even := filterEven(squared)

    for val := range even {
        fmt.Println(val) // 4, 16, 36, 64, 100
    }
}

context 取消

package main

import (
    "context"
    "fmt"
    "time"
)

func longRunningTask(ctx context.Context, name string) {
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("[%s] 收到取消信号: %v\n", name, ctx.Err())
            return
        default:
            fmt.Printf("[%s] 工作中...\n", name)
            time.Sleep(200 * time.Millisecond)
        }
    }
}

func main() {
    // 带超时的 context
    ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
    defer cancel()

    go longRunningTask(ctx, "worker-1")
    go longRunningTask(ctx, "worker-2")

    // 等待超时
    <-ctx.Done()
    time.Sleep(100 * time.Millisecond) // 等 goroutine 打印取消日志
    fmt.Println("所有任务已取消")

    // 手动取消
    ctx2, cancel2 := context.WithCancel(context.Background())
    go longRunningTask(ctx2, "worker-3")
    time.Sleep(300 * time.Millisecond)
    cancel2() // 手动取消
    time.Sleep(100 * time.Millisecond)
}

🔄 并发模式对比

模式 用途
Fan-out/Fan-in 一对多分发,多对一汇聚
Worker Pool 固定数量 worker 处理任务队列
Pipeline 多阶段流水线处理
Context 超时控制、取消传播、请求级数据

6. 本章要点

📌 goroutine

go func() 启动轻量级协程,初始栈仅 2KB。由 Go 运行时调度,可轻松创建数十万个。

📌 channel

goroutine 间的类型安全管道。无缓冲同步通信,有缓冲异步通信。发送方负责关闭。

📌 select

多路复用等待多个 channel。配合 time.After 实现超时,default 实现非阻塞。

📌 sync 包

WaitGroup 等待多个 goroutine,Mutex 保护共享状态,Once 确保只初始化一次。

📌 并发模式

Worker Pool 控制并发度,Pipeline 构建流水线,Fan-in/Fan-out 分发汇聚数据。

📌 context

请求级别的超时控制和取消传播。WithTimeoutWithCancel 是 HTTP 服务中的标配。