并发编程

什么是并发编程

并发编程的核心目标,是让多个任务在同一时间段内共同推进。这里的”共同推进”并不一定表示真正同时运行,而是指多个任务可以通过调度机制交替执行

通过这种方式,程序可以在等待 I/O、处理用户事件或执行后台任务时保持响应,从而提高吞吐能力和资源利用率

并发编程真正关注的对象不是进程、线程或协程本身,而是 任务(Task)。任务表示程序中可以被独立调度、暂停或继续执行的一段逻辑,例如一次网络请求、一次文件读写、一次计算过程或一次用户事件处理

进程、线程和协程则是承载任务的不同执行抽象。它们分别对应不同层级的资源隔离、调度方式和运行开销

并发与并行

并发和并行容易混淆,但二者关注点不同

并发(Concurrency) 强调任务组织方式:系统允许多个任务在同一时间段内交替推进。即使只有一个 CPU 核心,也可以通过任务切换实现并发

并行(Parallelism) 强调任务执行状态:多个任务在同一时刻真正同时运行,通常依赖多核 CPU 或多处理器硬件支持

因此,并发是程序设计层面的概念,而并行是硬件执行层面的能力。并发程序可以在多核 CPU 上并行运行,但并发本身并不等于并行

任务与执行单元

并发编程中的任务需要依附某种执行单元运行。常见的执行单元包括 进程、线程和协程

它们之间的关系可以理解为:

  • 任务是要完成的逻辑工作
  • 进程、线程、协程是推进任务执行的不同载体

不同执行单元的差异主要体现在三个方面:

执行单元主要特点调度层级适用场景
进程地址空间独立,隔离性强操作系统多程序隔离、服务进程
线程共享进程资源,切换较轻操作系统CPU 密集型并行任务
协程用户态调度,轻量级运行时 / 事件循环高并发 I/O 任务

进程

process

一个进程通常对应一个正在运行的程序实例。它既包含用户态的虚拟地址空间,也包含内核态维护的进程控制信息,例如文件描述符表、调度状态、内存映射信息等

其中,进程控制块(PCB,Process Control Block)可以理解为操作系统管理进程的核心数据结构。操作系统通过 PCB 保存进程状态,并在调度切换时恢复对应的运行上下文

不同进程之间拥有相互独立的虚拟地址空间。因此,一个进程中的内存错误通常不会直接破坏另一个进程的内存空间

这种隔离机制提高了系统的安全性和稳定性,但也带来了更高的创建、切换和通信成本

进程之间如果需要交换数据,通常要借助进程间通信机制,例如管道、消息队列、共享内存、Socket 等。相比线程内部共享内存,进程间通信更复杂,但隔离性也更强

// 进程控制块(PCB)简化结构:用于保存进程的运行上下文以及相关管理信息
pub struct ProcessControlBlock {
    pid: u16,
    state: ProcessState,

    // CPU 上下文(Context):用于在进程切换(context switch)时
    // 保存和恢复当前进程的执行现场,确保恢复后能够从中断位置继续执行
    program_counter: u16,
    general_purpose_registers: [u8; 4],
    instruction_register: u8,
    flags: [u1, 3],
    stack_pointer: u16,
    index_registers: [u16; 2],
    ...

    // 内存管理信息:描述进程的虚拟地址空间边界,
    // 用于实现进程间的内存隔离,保证各进程地址空间相互独立
    memory_limits: [u16; 2],
    
    ...
    io_devices: Vec<IO_Device>, 
    open_files: Vec<File>, 
    
    parent: Arc<ProcessControlBlock>, 
    children: Vec<Arc<ProcessControlBlock>>
}

pub enum ProcessState {
	New, 
	Running, 
	Ready,
	Waiting, 
	Terminated
}

线程

thread

线程是进程内部的执行流,也是操作系统进行 CPU 调度的重要单位。一个进程可以包含多个线程,这些线程共享所属进程的虚拟地址空间、打开的文件以及其他进程级资源

线程自身只保留少量私有运行状态,例如:

  • 程序计数器
  • 寄存器上下文
  • 线程栈
  • 调度状态

因此,可以在原有的进程控制结构上加入线程信息,为一个进程维护多个可调度的执行流。线程切换通常比进程切换更轻量,因为它不需要完整切换进程地址空间

多个线程可以在同一进程内执行相同的代码逻辑,只是它们的执行位置、调用栈和运行状态彼此独立

这也是很多语言在线程库中要求提供 线程入口函数、闭包或任务对象 的原因:创建线程时,本质上需要告诉系统这个新线程从哪里开始执行、执行什么逻辑

不过,线程共享进程资源也带来了新的问题。当多个线程同时访问同一份可变数据时,就可能产生竞态条件、数据不一致、死锁等并发错误

因此,线程编程通常需要借助锁、原子操作、条件变量等同步机制来保护共享资源

pub struct ProcessControlBlock {
	pid: u16, 
	memory_limits: [u16; 2], 
	io_devices: Vec<IO_Device>, 
	open_files: Vec<File>, 
	parent: Arc<ProcessControlBlock>, 
    children: Vec<Arc<ProcessControlBlock>>, 
    threads: Vec<Thread_t>, 
}

pub struct Thread_t {
	tid: u16, 
	state: ProcessState, 
	program_counter: u16, 
	general_purpose_registers: [u8; 4],
    instruction_register: u8,
    flags: [u1, 3],
    stack_pointer: u16,
    index_registers: [u16; 2],
    // process_ref: Arc<ProcessControlBlock>, 
}

协程

coroutine

协程是一种比线程更轻量的并发抽象,但它并不是脱离线程独立运行的执行实体。更准确地说,协程通常运行在线程之上

操作系统仍然负责调度线程,而语言运行时、异步运行时或事件循环负责在用户态把多个协程安排到一个或多个线程上执行

在线程模型中,操作系统可以通过时间片在任意时刻抢占当前线程。而在协程模型中,运行时通常不会强制打断正在执行的协程,协程需要在合适的位置主动让出执行权

例如遇到 await、异步 I/O 或显式 yield 操作时,当前协程会暂停执行。运行时调度器随后推进同一线程上的其他协程

这种关系可以理解为:线程提供真正占用 CPU 的执行上下文,协程则是在这个执行上下文中被切换的轻量任务

一个线程可以承载大量协程,因为大多数 I/O 任务并不会一直占用 CPU。当某个协程等待网络、文件或定时器事件时,线程可以继续执行其他已经就绪的协程,而不是长期闲置

因此,协程特别适合 I/O 密集型场景,例如:

  • 同时处理大量网络连接
  • 高并发 Web 服务
  • 异步文件读写
  • 消息队列消费
  • 多路事件监听

协程的优势是创建成本低、切换成本小,能够用较少线程承载大量任务。但它也有一个前提:任务必须在合适的位置主动让出执行权

如果某个协程长时间执行 CPU 密集型计算,或者在协程中直接调用会阻塞线程的同步 I/O,那么被占住的不只是这个协程,而是承载它的线程。同一线程上的其他协程也会因此得不到运行机会

对于需要真正并行消耗 CPU 的任务,仍然需要依赖多线程或多进程,把计算分散到多个 CPU 核心上

抢占式并发

eagermode

抢占式并发是操作系统常用的调度方式。在线程或进程运行过程中,操作系统调度器可以在任意时刻中断当前执行单元,并切换到另一个执行单元继续运行

这种方式的特点是:任务不需要主动让出 CPU,操作系统会根据时间片、优先级、阻塞状态等因素统一调度

抢占式并发的典型载体是线程。开发者可以通过线程库显式创建多个线程,将程序中的不同任务拆分出来,交由操作系统调度

在线程数量和硬件核心数合适的情况下,这些任务不仅可以并发推进,还可以在多核 CPU 上并行执行

最简单的线程示例

两个线程各自打印数字,互不干扰:

use std::thread;

fn main() {
    let handler = thread::spawn(|| {
        for i in 1..10 {
            println!("worker: {i}");
        }
    });

    for i in 100..110 {
        println!("main: {i}");
    }
    
    handler.join().unwrap();
}

实际应用场景

Web 服务器:为每个连接创建一个线程,同时处理多个客户端请求

例如:Apache HTTP Server 的 worker 模式,每个请求由独立线程处理

数据库连接池:预先创建多个数据库连接线程,避免频繁创建销毁

例如:HikariCP 连接池维护固定数量的连接线程,提高查询效率

图像处理:将大图分块,每个线程处理一块,最后合并结果

例如:视频编码器使用多线程并行处理不同帧,加速渲染

共享数据的挑战

当多个线程访问共享资源时,就必须考虑同步和互斥。否则,线程可能在任意位置被切换,导致共享数据处于不一致状态

例如,两个线程同时修改同一个计数器,如果不加锁保护,最终结果可能是错误的。这种情况需要使用互斥锁(Mutex)、原子操作(Atomic)或其他同步机制来保护共享资源

完整的共享数据 + 锁的实现示例见附录 A

协作式并发

cormode

协作式并发与抢占式并发相对应。它不依赖操作系统强制中断当前任务,而是由任务在特定位置主动让出执行权

在协作式并发中,任务之间的切换通常发生在:

  • await
  • 异步 I/O
  • yield
  • 定时器等待
  • 事件等待

这种方式的好处是调度成本较低,任务切换点也更加明确。因此,在大量 I/O 等待场景下,协作式并发通常非常高效

现代异步编程模型,例如 JS 的事件循环、Rust 的 async/await、Python 的 asyncio,都属于协作式并发的重要实践

但是,协作式并发并不适合直接执行长时间的 CPU 密集型任务。如果某个任务一直占用线程而不让出执行权,其他任务就无法获得运行机会,整个异步运行时的响应能力也会下降

核心概念示例

协作式并发的关键是任务主动让出执行权。以下是一个简化的概念示例:

use futures::executor::LocalPool;
use futures::task::LocalSpawnExt;

async fn first_task() {
    for i in 1..5 {
        println!("first task: {i}");
        // 主动让出执行权,让其他任务有机会运行
        tokio::task::yield_now().await;
    }
}

async fn second_task() {
    for i in 100..105 {
        println!("second task: {i}");
        tokio::task::yield_now().await;
    }
}

#[tokio::main]
async fn main() {
    // 两个任务在同一个线程上交替执行
    tokio::join!(first_task(), second_task());
}

输出展示了两个任务交替执行,而不是一个任务完全执行完再执行另一个:

实际应用场景

高并发 Web 服务:Node.js、Go、Rust 的异步运行时可以用少量线程处理数万并发连接

例如:Node.js 的事件循环可以在单线程上处理 10000+ WebSocket 连接

异步文件处理:同时读取多个文件,不阻塞主线程

例如:Tokio 异步运行时可以并发读取数百个日志文件进行分析

消息队列消费:同时监听多个消息队列,高效处理异步事件

例如:Kafka 消费者使用异步 I/O 同时处理多个分区的消息

完整的协作式并发实现(包含自定义调度器)见附录 B

共享资源与消息传递

在并发程序中,多个任务之间往往需要共享数据,或者协作完成同一项工作。常见方式主要有两种:共享内存消息传递

共享内存

共享内存方式是指多个线程直接访问同一份数据。它的优点是访问效率高,缺点是必须处理同步问题

共享资源访问大致可以分为两种情况:

  • 只读共享资源:多个线程同时读取通常是安全的
  • 可变共享资源:只要多个线程同时访问时存在写操作,就必须进行同步控制

对于可变共享资源,通常需要使用互斥锁、读写锁、原子变量等机制,避免竞态条件和数据不一致

典型场景:

  • 多线程访问配置对象(读多写少,适合读写锁)
  • 多线程更新计数器(适合原子操作)
  • 多线程修改共享缓存(需要互斥锁保护)

消息传递

另一种方式是通过消息传递来组织并发协作。核心思想是:

  • 不要通过共享内存来通信
  • 而是通过通信来共享数据

消息传递模式将数据修改权集中到接收方,发送方只负责传递消息。这样可以减少锁的使用,降低并发程序的同步复杂度

典型场景:

  • 生产者-消费者队列(任务分发)
  • Actor 模型(每个 Actor 独立处理消息)
  • 事件驱动架构(事件发布-订阅)

两种方式对比

对比维度共享内存消息传递
访问效率高(直接内存访问)较低(需要复制或传递)
同步复杂度高(需要锁、原子操作)低(通过通道隔离)
数据竞争风险高(容易出错)低(所有权转移)
适用场景读多写少、性能敏感任务解耦、高并发 I/O
典型实现Mutex、RwLock、AtomicChannel、Queue、Actor

选择建议

优先考虑消息传递:

  • 任务之间需要解耦
  • 数据流向清晰(生产者 → 消费者)
  • 高并发场景,避免锁竞争

考虑共享内存:

  • 性能关键路径,避免数据复制
  • 读多写少的配置数据
  • 需要多个线程同时读取大量数据

生产者-消费者模式

生产者-消费者队列是消息传递的经典应用。生产者线程负责发送任务或数据,消费者线程负责接收并处理

这种模式的优势是:

  • 生产者和消费者解耦,互不阻塞
  • 队列作为缓冲区,平衡生产和消费速度
  • 可以轻松扩展为多生产者、多消费者

完整的生产者-消费者队列实现(C++ 和 Rust)见附录 C

总结

  • 如果任务是 CPU 密集型,例如图像处理、矩阵计算、数据压缩、模型推理等,任务本身需要大量占用 CPU,那么更适合使用多线程或多进程,将任务分配到多个 CPU 核心上并行执行

  • 如果任务是 I/O 密集型,例如网络请求、数据库访问、文件读写、消息监听等,任务的大部分时间都在等待外部资源返回结果,那么异步和协程通常更加合适。它们可以用较少的线程承载大量并发任务,避免线程在等待期间被浪费

附录 A:抢占式并发完整示例

多线程访问共享数据时的同步控制示例:

use std::sync::{Arc, Mutex};
use std::thread;

fn critical_section(v: Arc<Mutex<i32>>, change_v: i32) {
    // 第一段加锁
    let mut lock = v.lock().unwrap();
    *lock = change_v;
    println!("{}", *lock);

    // 显式释放锁
    drop(lock);

    // 在此期间,其他线程可以获得锁并修改 v

    // 第二段再次加锁
    let mut lock = v.lock().unwrap();
    *lock += 1;
    println!("{}", *lock);
}

fn main() {

    for round in 0..10 {
        println!("\n=== round {} ===", round);

        let v = Arc::new(Mutex::new(1));
        let v1 = Arc::clone(&v);
        let v2 = Arc::clone(&v);

        let t1 = thread::spawn(move || {
            critical_section(v1, 10);
        });

        let t2 = thread::spawn(move || {
            critical_section(v2, 100);
        });

        t1.join().unwrap();
        t2.join().unwrap();
    }
}

附录 B:协作式并发完整示例

自定义协程调度器的完整实现:

use futures::executor::LocalPool;
use futures::task::LocalSpawnExt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

// 一个最简单的"主动让出一次"的 Future
struct YieldOnce {
    yielded: bool,
}

impl Future for YieldOnce {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        if self.yielded {
            Poll::Ready(())
        } else {
            self.yielded = true;
            cx.waker().wake_by_ref(); // 重新唤醒自己,等下一轮再执行
            Poll::Pending
        }
    }
}

fn yield_once() -> YieldOnce {
    YieldOnce { yielded: false }
}

async fn first_task() {
    for i in 1..10 {
        println!("hi number {} from the first task!", i);
        yield_once().await; // 主动让出执行权
    }
}

async fn second_task() {
    for i in 100..110 {
        println!("hi number {} from the second task!", i);
        yield_once().await; // 主动让出执行权
    }
}

fn main() {
    let mut pool = LocalPool::new();
    let spawner = pool.spawner();

    spawner.spawn_local(first_task()).unwrap();
    spawner.spawn_local(second_task()).unwrap();

    pool.run();
}

附录 C:生产者-消费者队列完整实现

多生产者、多消费者的完整实现:

use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;

fn main() {
    // 标准库 mpsc:多生产者,单消费者
    let (tx, rx) = mpsc::channel::<i32>();

    // 因为 Receiver 不能 clone,所以用 Arc<Mutex<Receiver>>
    // 让多个消费者线程共享同一个接收端
    let rx = Arc::new(Mutex::new(rx));

    // 生产者
    let producer_tx = tx.clone();
    let producer = thread::spawn(move || {
        for i in 0..10 {
            // thread::sleep(Duration::from_millis(900));
            println!("producing {}", i);

            if producer_tx.send(i).is_err() {
                println!("receiver dropped, producer exits");
                break;
            }
        }
    });

    // 两个消费者
    let mut consumers = Vec::new();

    for id in 0..2 {
        let rx_clone = Arc::clone(&rx);

        let consumer = thread::spawn(move || {
            loop {
                // 先拿锁,再 recv 一条消息
                let first_item = {
                    let guard = rx_clone.lock().unwrap();
                    guard.recv()
                };

                match first_item {
                    Ok(v) => {
                        // 模拟消费者处理较慢
                        // thread::sleep(Duration::from_millis(1000));
                        println!("consumer {} consuming {}", id, v);

                        // 尝试继续把当前队列里已经到达的消息一并取走
                        loop {
                            let next_item = {
                                let guard = rx_clone.lock().unwrap();
                                guard.try_recv()
                            };

                            match next_item {
                                Ok(x) => {
                                    println!("consumer {} consuming {}", id, x);
                                }
                                Err(mpsc::TryRecvError::Empty) => {
                                    break;
                                }
                                Err(mpsc::TryRecvError::Disconnected) => {
                                    println!("consumer {} exits", id);
                                    return;
                                }
                            }
                        }
                    }
                    Err(_) => {
                        println!("consumer {} exits", id);
                        break;
                    }
                }
            }
        });

        consumers.push(consumer);
    }

    producer.join().unwrap();
    for c in consumers {
        c.join().unwrap();
    }
}

参考文献

  • Core Dumped 的视频包含一些关于进程与线程调度的动画演示,适合初学者建立直观认识
  • RustOS Blog从操作系统层面介绍并发与并行机制,主要使用 Rust 语言讲解,适合继续理解系统级实现
  • 本文的代码部分参考了 Rust 圣经 以及 Modern C++