Rust入门失败之Concurrency

Thread

  • main函数结束, 则程序退出, 即使仍然有线程在运行;
  • spawn返回的handle.join()来等待线程结束, 但是有一点不一样, Rust线程崩溃(panic)不会影响到其它线程, 因此程序可以很容易从某个错误状态下恢复.
  • 当需要在线程间共享immutable数据时, 使用std::sync::Arc智能指针, 并调用它的clone()方法.

Channel

  • std::sync::mpsc::channel()返回一个Sender和Receiver, 分别调用send(item)recv()方法发送和接收数据, 后者会block住线程. Rust会根据上下文推断Channel操作的数据类型.

  • Rust的Channel比Unix的pipe机制更快, 数据移动而非拷贝. 例如发送一个String, 实际成本只发送了三个机器字(machine word): length, capacity and pointer.

  • sendrecv两个调用只会在另一个被dropped的时候失败. 换句话说就是如果Receiver被drop了, 那send会失败, 反之亦然. 因此, 如果存在一个Sender, 那么下面这个循环recv就会在Sender销毁时正常终止结束:

    1
    2
    3
    while let Ok(text) = receiver.recv() {
    do_something_with(text);
    }

    Receiver支持迭代, 因此更优雅的写法是(二者等价):

    1
    2
    3
    for text in receiver {
    do_something_with(text);
    }
  • Channel是std::sync::mpsc模块里的, 全称是multi-producer, single-consumer. 这种操作在编程中很常见. 因此Sender<T>实现里Clone trait. 可以拷贝出多个, 然后在不同的线程里send. 当然, 由于是single-consumer, Receiver<T>不能clone.

  • Rust Channel高性能的原因在于有几种不同的实现:

    • 如果只发送一个object, 那么Rust内部有一个特殊的“one-shot”队列使得发送一个的成本最小化.
    • 但是当你发送第二个object的时候, Rust会切换到另一种队列实现. 这个队列专门为实现大量数据传输做了优化, 尽可能降低传输大开销.
    • 但是如果你Clone了Sender, 那么Rust又会使用另一种线程安全的队列版本. 相比于前两种实现, 这个的性能自然是最慢的, 但是即便最慢, 它的overhead也很低.它的队列采用了一个无锁的队列. 因此, 发送和接收时除了move之外, 仅有一些堆上内存分配的原子操作. 而且只有在Receiver需要sleep时才会用到系统调用, 因此基本上对吞吐量没有任何设限.
  • 由于Rust Channel的高性能, 通常会引起问题的就是发送数据的速度大于Receiver处理的速度. 可以使用sync_channel, 当发送的队列达到指定数量后, send操作会被阻塞.

    1
    2
    3
    use std::sync::mpsc::sync_channel;

    let (sender, receiver) = sync_channel(1000);

Thread Safety: Send and Sync

上面谈到的在线程间传递和共享数据的能力来自于两个trait: std::marker::Sendstd::marker::Sync

  • 实现了Send的类型可以安全地move给其它线程.
  • 实现了Sync的类型可以在线程间进行non-mut reference共享, 同时要求该类型的引用类型&T实现Send trait.

如下图:

send_and_sync

  • 可以看到有一种类型是不能Send的, Rc<String>, 做为一个带引用计数的智能指针, 当两个线程都拥有它时, 同时对引用计数操作就会引起data race.

Mutex<T>

  • 大部分可以看文档, 和其它语言的互斥锁类似.

  • 有一个细节值得注意:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    /// All threads have shared access to this big context struct.
    struct FernEmpireApp {
    ...
    waiting_list: Mutex<WaitingList>,
    ...
    }

    impl FernEmpireApp {
    /// Add a player to the waiting list for the next game.
    /// Start a new game immediately if enough players are waiting.
    fn join_waiting_list(&self, player: PlayerId) {
    // Lock the mutex and gain access to the data inside.
    // The scope of `guard` is a critical section.
    let mut guard = self.waiting_list.lock().unwrap();

    // Now do the game logic.
    guard.push(player);
    if guard.len() == GAME_SIZE {
    let players = guard.split_off(0);
    self.start_game(players);
    }
    }
    }

    这个函数对waiting_list进行了修改, 而函数签名是immutable&self. 其实这里需要重新理解Rust中的mut另一层含义.

    • 在Rust中mut意味着排他性访问(exclusive access);
    • Non-mut意味着共享访问(shared access);

    这里的Mutex.lock()可以保证不会有竞争, 因此允许在即使Mutex拿到的是shared(non-mut)情况下做exclusive(mut)操作.

  • 另外一点, Rust的Mutex是不可重入的, 同一个线程连续两次调用lock会引起死锁.

  • Poisoned Mutex(名字有些夸张, 中毒锁), 是在持有锁的线程崩溃后, Rust会将这个锁标记为中毒污染状态. 后续其它线程对它的lock请求都会返回一个PoisonError. 因此用lock().unwrap()会把panic传播到另一个线程.

Condvar

  • Condvar::notify_all()Condvar::wait() 两个操作;

  • 不同的是wait操作有些不寻常

    1
    2
    3
    while !guard.has_data() {
    guard = self.has_data_condvar.wait(guard).unwrap();
    }

    wait(guard), 接收一个MutexGuard的所有权, 并返回一个新的MutexGuard. 如果用过pthread接口的话就不会对这个感到陌生. 类似pthread_cond_wait函数, 在wait前需要先释放这个mutex, 然后在返回前重新获取, 可以理解为Mutex授权给wait操作它.

Atomics

  • 不同于Mutex<T>, Atomics拥有最小化开销, 性能成本极低, 通常就是一条load或store指令. 没有任何系统调用.

All images are copyrighted by original authors Jim Blandy & Jason Orendorff who wrote in the book Programming Rust.