キャンセル
Future をドロップすると、その Future を再度ポーリングすることはできません。これはキャンセルと呼ばれ、どの await ポイントでも発生する可能性があります。そのため、Future がキャンセルされた場合でも、システムが正常に動作するようにしておく必要があります。たとえば、デッドロックやデータの消失があってはなりません。
use std::io;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};
struct LinesReader {
stream: DuplexStream,
}
impl LinesReader {
fn new(stream: DuplexStream) -> Self {
Self { stream }
}
async fn next(&mut self) -> io::Result<Option<String>> {
let mut bytes = Vec::new();
let mut buf = [0];
while self.stream.read(&mut buf[..]).await? != 0 {
bytes.push(buf[0]);
if buf[0] == b'\n' {
break;
}
}
if bytes.is_empty() {
return Ok(None);
}
let s = String::from_utf8(bytes)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "not UTF-8"))?;
Ok(Some(s))
}
}
async fn slow_copy(source: String, mut dest: DuplexStream) -> io::Result<()> {
for b in source.bytes() {
dest.write_u8(b).await?;
tokio::time::sleep(Duration::from_millis(10)).await
}
Ok(())
}
#[tokio::main]
async fn main() -> io::Result<()> {
let (client, server) = tokio::io::duplex(5);
let handle = tokio::spawn(slow_copy("hi\nthere\n".to_owned(), client));
let mut lines = LinesReader::new(server);
let mut interval = tokio::time::interval(Duration::from_millis(60));
loop {
tokio::select! {
_ = interval.tick() => println!("tick!"),
line = lines.next() => if let Some(l) = line? {
print!("{}", l)
} else {
break
},
}
}
handle.await.unwrap()?;
Ok(())
}
This slide should take about 18 minutes.
-
コンパイラではキャンセル安全性を確保できません。API ドキュメントを読み、
async fnが保持する状態を考慮する必要があります。 -
panicや?とは異なり、キャンセルは(エラー処理ではなく)通常の制御フローの一部です。 -
この例では、文字列の一部が失われています。
-
tick()分岐が先に終了するたびに、next()とそのbufがドロップされます。 -
bufを構造体の一部にすることで、LinesReaderにキャンセル安全性を持たせることができます。#![allow(unused)] fn main() { struct LinesReader { stream: DuplexStream, bytes: Vec<u8>, buf: [u8; 1], } impl LinesReader { fn new(stream: DuplexStream) -> Self { Self { stream, bytes: Vec::new(), buf: [0] } } async fn next(&mut self) -> io::Result<Option<String>> { // buf と bytes の先頭に self を付加します。 // ... let raw = std::mem::take(&mut self.bytes); let s = String::from_utf8(raw) .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "not UTF-8"))?; // ... } } }
-
-
Interval::tickは、ティックが「配信済み」かどうかを追跡しているため、安全にキャンセルできます。 -
AsyncReadExt::readは、データを返すか、データを読み取らないかのいずれかであるため、安全にキャンセルできます。 -
AsyncBufReadExt::read_lineはこの例と類似しており、安全にキャンセルできません。詳細と代替方法については、ドキュメントをご覧ください。