1. goroutine
goroutine 是 Go 的核心并发原语——一种由 Go 运行时管理的轻量级线程。启动一个 goroutine 只需在函数调用前加 go 关键字。
package main
import (
"fmt"
"time"
)
func sayHello(name string) {
for i := 0; i < 3; i++ {
fmt.Printf("[%s] 第 %d 次: Hello!\n", name, i+1)
time.Sleep(100 * time.Millisecond)
}
}
func main() {
// 启动 goroutine(异步执行)
go sayHello("goroutine-1")
go sayHello("goroutine-2")
// 匿名函数 goroutine
go func(msg string) {
fmt.Println(msg)
}("异步消息")
// main 函数结束时,所有 goroutine 立即终止
// 实际项目中应使用 WaitGroup 或 channel 等待
time.Sleep(500 * time.Millisecond)
fmt.Println("main 结束")
}
goroutine vs 线程:
- • goroutine 初始栈仅 2KB(线程通常 1-8MB),可轻松创建数十万个
- • 由 Go 运行时调度,而非 OS 内核调度,上下文切换成本极低
- • M:N 调度模型:M 个 goroutine 映射到 N 个 OS 线程上
🔄 并发模型对比
| 语言 | 并发单元 | 内存开销 |
| Go | goroutine | ~2KB |
| Java | Thread | ~1MB |
| Python | threading (受 GIL 限制) | ~8MB |
| Node.js | 事件循环 + Worker | 单线程 |
2. channel
channel 是 goroutine 之间通信的管道。Go 的并发哲学是:"不要通过共享内存来通信,而要通过通信来共享内存。"
package main
import "fmt"
func main() {
// 无缓冲 channel(同步通信)
ch := make(chan string)
go func() {
ch <- "Hello from goroutine" // 发送(阻塞直到有接收者)
}()
msg := <-ch // 接收(阻塞直到有发送者)
fmt.Println(msg)
// 有缓冲 channel(异步通信)
buffered := make(chan int, 3) // 缓冲区大小为 3
buffered <- 1
buffered <- 2
buffered <- 3
// buffered <- 4 // 缓冲区满,会阻塞
fmt.Println(<-buffered) // 1
fmt.Println(<-buffered) // 2
// 关闭 channel
jobs := make(chan int, 5)
done := make(chan bool)
// 消费者
go func() {
for {
job, ok := <-jobs
if !ok {
fmt.Println("所有任务完成")
done <- true
return
}
fmt.Printf("处理任务: %d\n", job)
}
}()
// 生产者
for i := 1; i <= 3; i++ {
jobs <- i
}
close(jobs) // 关闭 channel,通知消费者
<-done // 等待消费者完成
}
// range 遍历 channel
func rangeDemo() {
ch := make(chan int, 5)
go func() {
for i := 0; i < 5; i++ {
ch <- i * i
}
close(ch) // 必须 close 才能结束 range
}()
// range 自动在 channel 关闭时退出
for val := range ch {
fmt.Println(val) // 0, 1, 4, 9, 16
}
}
channel 规则:
- • 向 nil channel 发送/接收会永远阻塞
- • 向已关闭的 channel 发送会 panic
- • 从已关闭的 channel 接收会立即返回零值
- • 只有发送方应该关闭 channel,接收方不关闭
3. select
select 用于同时等待多个 channel 操作,类似于 switch 但用于 channel。
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(100 * time.Millisecond)
ch1 <- "来自 ch1"
}()
go func() {
time.Sleep(200 * time.Millisecond)
ch2 <- "来自 ch2"
}()
// select 等待多个 channel
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println("收到:", msg1)
case msg2 := <-ch2:
fmt.Println("收到:", msg2)
}
}
// 超时处理
slowCh := make(chan string)
go func() {
time.Sleep(2 * time.Second)
slowCh <- "慢响应"
}()
select {
case result := <-slowCh:
fmt.Println(result)
case <-time.After(500 * time.Millisecond):
fmt.Println("超时!")
}
// default 非阻塞模式
ch := make(chan int, 1)
select {
case val := <-ch:
fmt.Println("收到:", val)
default:
fmt.Println("没有数据可读(非阻塞)")
}
}
// 使用 done channel 优雅退出
func worker(done <-chan struct{}, tasks <-chan int) {
for {
select {
case <-done:
fmt.Println("收到退出信号")
return
case task := <-tasks:
fmt.Printf("处理任务: %d\n", task)
}
}
}
4. sync 包
除了 channel,Go 还提供了传统的同步原语。适合不需要通信、只需要同步的场景。
WaitGroup
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
urls := []string{
"https://example.com",
"https://golang.org",
"https://github.com",
}
for _, url := range urls {
wg.Add(1) // 计数器 +1
go func(u string) {
defer wg.Done() // 计数器 -1
fmt.Printf("正在抓取: %s\n", u)
// 模拟网络请求...
}(url)
}
wg.Wait() // 等待所有 goroutine 完成
fmt.Println("所有请求完成")
}
Mutex 互斥锁
package main
import (
"fmt"
"sync"
)
// 线程安全的计数器
type SafeCounter struct {
mu sync.Mutex
count map[string]int
}
func (c *SafeCounter) Inc(key string) {
c.mu.Lock()
defer c.mu.Unlock()
c.count[key]++
}
func (c *SafeCounter) Value(key string) int {
c.mu.Lock()
defer c.mu.Unlock()
return c.count[key]
}
func main() {
counter := SafeCounter{count: make(map[string]int)}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Inc("key")
}()
}
wg.Wait()
fmt.Println("最终计数:", counter.Value("key")) // 1000
}
Once 与 RWMutex
// sync.Once 确保函数只执行一次(常用于初始化)
var (
instance *Database
once sync.Once
)
func GetDB() *Database {
once.Do(func() {
instance = connectDB()
fmt.Println("数据库连接已创建")
})
return instance
}
// sync.RWMutex 读写锁(读多写少场景优化)
type Cache struct {
mu sync.RWMutex
data map[string]string
}
func (c *Cache) Get(key string) (string, bool) {
c.mu.RLock() // 读锁(多个读可并发)
defer c.mu.RUnlock()
val, ok := c.data[key]
return val, ok
}
func (c *Cache) Set(key, value string) {
c.mu.Lock() // 写锁(独占)
defer c.mu.Unlock()
c.data[key] = value
}
channel vs Mutex 选择:需要传递数据所有权时用 channel,需要保护共享状态时用 Mutex。Go 谚语:"用 channel 编排,用 Mutex 序列化。"
5. 并发模式
Fan-out / Fan-in
package main
import (
"fmt"
"sync"
)
// Fan-out: 一个 channel 分发给多个 goroutine
// Fan-in: 多个 channel 汇聚到一个 channel
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for val := range c {
merged <- val
}
}(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
Worker Pool
package main
import (
"fmt"
"sync"
)
func workerPool() {
const numWorkers = 3
jobs := make(chan int, 100)
results := make(chan int, 100)
// 启动 worker
var wg sync.WaitGroup
for w := 0; w < numWorkers; w++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for job := range jobs {
result := job * job // 模拟处理
fmt.Printf("Worker %d: %d -> %d\n", id, job, result)
results <- result
}
}(w)
}
// 发送任务
for i := 1; i <= 10; i++ {
jobs <- i
}
close(jobs)
// 等待所有 worker 完成后关闭 results
go func() {
wg.Wait()
close(results)
}()
// 收集结果
for result := range results {
fmt.Println("结果:", result)
}
}
Pipeline 管道
package main
import "fmt"
// 生成器
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// 平方
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// 过滤偶数
func filterEven(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
if n%2 == 0 {
out <- n
}
}
close(out)
}()
return out
}
func main() {
// Pipeline: generate -> square -> filterEven
ch := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
squared := square(ch)
even := filterEven(squared)
for val := range even {
fmt.Println(val) // 4, 16, 36, 64, 100
}
}
context 取消
package main
import (
"context"
"fmt"
"time"
)
func longRunningTask(ctx context.Context, name string) {
for {
select {
case <-ctx.Done():
fmt.Printf("[%s] 收到取消信号: %v\n", name, ctx.Err())
return
default:
fmt.Printf("[%s] 工作中...\n", name)
time.Sleep(200 * time.Millisecond)
}
}
}
func main() {
// 带超时的 context
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
go longRunningTask(ctx, "worker-1")
go longRunningTask(ctx, "worker-2")
// 等待超时
<-ctx.Done()
time.Sleep(100 * time.Millisecond) // 等 goroutine 打印取消日志
fmt.Println("所有任务已取消")
// 手动取消
ctx2, cancel2 := context.WithCancel(context.Background())
go longRunningTask(ctx2, "worker-3")
time.Sleep(300 * time.Millisecond)
cancel2() // 手动取消
time.Sleep(100 * time.Millisecond)
}
🔄 并发模式对比
| 模式 | 用途 |
| Fan-out/Fan-in | 一对多分发,多对一汇聚 |
| Worker Pool | 固定数量 worker 处理任务队列 |
| Pipeline | 多阶段流水线处理 |
| Context | 超时控制、取消传播、请求级数据 |
6. 本章要点
📌 goroutine
go func() 启动轻量级协程,初始栈仅 2KB。由 Go 运行时调度,可轻松创建数十万个。
📌 channel
goroutine 间的类型安全管道。无缓冲同步通信,有缓冲异步通信。发送方负责关闭。
📌 select
多路复用等待多个 channel。配合 time.After 实现超时,default 实现非阻塞。
📌 sync 包
WaitGroup 等待多个 goroutine,Mutex 保护共享状态,Once 确保只初始化一次。
📌 并发模式
Worker Pool 控制并发度,Pipeline 构建流水线,Fan-in/Fan-out 分发汇聚数据。
📌 context
请求级别的超时控制和取消传播。WithTimeout、WithCancel 是 HTTP 服务中的标配。