Thread
main
函数结束, 则程序退出, 即使仍然有线程在运行;- 用
spawn
返回的handle.join()来等待线程结束, 但是有一点不一样, Rust线程崩溃(panic)不会影响到其它线程, 因此程序可以很容易从某个错误状态下恢复. - 当需要在线程间共享immutable数据时, 使用
std::sync::Arc
智能指针, 并调用它的clone()
方法.
Channel
std::sync::mpsc::channel()
返回一个Sender和Receiver, 分别调用send(item)
和recv()
方法发送和接收数据, 后者会block住线程. Rust会根据上下文推断Channel操作的数据类型.Rust的Channel比Unix的pipe机制更快, 数据移动而非拷贝. 例如发送一个
String
, 实际成本只发送了三个机器字(machine word): length, capacity and pointer.send
和recv
两个调用只会在另一个被dropped的时候失败. 换句话说就是如果Receiver被drop了, 那send
会失败, 反之亦然. 因此, 如果存在一个Sender, 那么下面这个循环recv就会在Sender销毁时正常终止结束:1
2
3while let Ok(text) = receiver.recv() {
do_something_with(text);
}Receiver支持迭代, 因此更优雅的写法是(二者等价):
1
2
3for text in receiver {
do_something_with(text);
}Channel是std::sync::mpsc模块里的, 全称是multi-producer, single-consumer. 这种操作在编程中很常见. 因此
Sender<T>
实现里Clone trait. 可以拷贝出多个, 然后在不同的线程里send. 当然, 由于是single-consumer,Receiver<T>
不能clone.Rust Channel高性能的原因在于有几种不同的实现:
- 如果只发送一个object, 那么Rust内部有一个特殊的“one-shot”队列使得发送一个的成本最小化.
- 但是当你发送第二个object的时候, Rust会切换到另一种队列实现. 这个队列专门为实现大量数据传输做了优化, 尽可能降低传输大开销.
- 但是如果你Clone了Sender, 那么Rust又会使用另一种线程安全的队列版本. 相比于前两种实现, 这个的性能自然是最慢的, 但是即便最慢, 它的overhead也很低.它的队列采用了一个无锁的队列. 因此, 发送和接收时除了move之外, 仅有一些堆上内存分配的原子操作. 而且只有在Receiver需要sleep时才会用到系统调用, 因此基本上对吞吐量没有任何设限.
由于Rust Channel的高性能, 通常会引起问题的就是发送数据的速度大于Receiver处理的速度. 可以使用
sync_channel
, 当发送的队列达到指定数量后,send
操作会被阻塞.1
2
3use std::sync::mpsc::sync_channel;
let (sender, receiver) = sync_channel(1000);
Thread Safety: Send and Sync
上面谈到的在线程间传递和共享数据的能力来自于两个trait: std::marker::Send
和 std::marker::Sync
- 实现了
Send
的类型可以安全地move给其它线程. - 实现了
Sync
的类型可以在线程间进行non-mut reference共享, 同时要求该类型的引用类型&T实现Send
trait.
如下图:
- 可以看到有一种类型是不能Send的,
Rc<String>
, 做为一个带引用计数的智能指针, 当两个线程都拥有它时, 同时对引用计数操作就会引起data race.
Mutex<T>
大部分可以看文档, 和其它语言的互斥锁类似.
有一个细节值得注意:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23/// All threads have shared access to this big context struct.
struct FernEmpireApp {
...
waiting_list: Mutex<WaitingList>,
...
}
impl FernEmpireApp {
/// Add a player to the waiting list for the next game.
/// Start a new game immediately if enough players are waiting.
fn join_waiting_list(&self, player: PlayerId) {
// Lock the mutex and gain access to the data inside.
// The scope of `guard` is a critical section.
let mut guard = self.waiting_list.lock().unwrap();
// Now do the game logic.
guard.push(player);
if guard.len() == GAME_SIZE {
let players = guard.split_off(0);
self.start_game(players);
}
}
}这个函数对waiting_list进行了修改, 而函数签名是immutable 的
&self
. 其实这里需要重新理解Rust中的mut
另一层含义.- 在Rust中
mut
意味着排他性访问(exclusive access); - Non-mut意味着共享访问(shared access);
这里的
Mutex.lock()
可以保证不会有竞争, 因此允许在即使Mutex拿到的是shared(non-mut)情况下做exclusive(mut)操作.- 在Rust中
另外一点, Rust的Mutex是不可重入的, 同一个线程连续两次调用
lock
会引起死锁.Poisoned Mutex(名字有些夸张, 中毒锁), 是在持有锁的线程崩溃后, Rust会将这个锁标记为中毒污染状态. 后续其它线程对它的
lock
请求都会返回一个PoisonError
. 因此用lock().unwrap()
会把panic传播到另一个线程.
Condvar
Condvar::notify_all()
和Condvar::wait()
两个操作;不同的是wait操作有些不寻常
1
2
3while !guard.has_data() {
guard = self.has_data_condvar.wait(guard).unwrap();
}wait(guard)
, 接收一个MutexGuard
的所有权, 并返回一个新的MutexGuard
. 如果用过pthread接口的话就不会对这个感到陌生. 类似pthread_cond_wait
函数, 在wait前需要先释放这个mutex, 然后在返回前重新获取, 可以理解为Mutex授权给wait操作它.
Atomics
- 不同于
Mutex<T>
, Atomics拥有最小化开销, 性能成本极低, 通常就是一条load或store指令. 没有任何系统调用.
All images are copyrighted by original authors Jim Blandy & Jason Orendorff who wrote in the book Programming Rust.