لغو
کنار گذاشتن future به این معنی است که دیگر هرگز نمیتوان آن را poll کرد. به این حالت cancellation میگویند و میتواند در هر نقطه await
رخ دهد. برای اطمینان از عملکرد صحیح سیستم حتی در صورت لغو futureها، دقت مناسب لازم است. بهعنوان مثال، نباید دادهها را از دست بدهد یا به بنبست (deadlock) برسد.
use std::io::{self, ErrorKind}; use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream}; struct LinesReader { stream: DuplexStream, } impl LinesReader { fn new(stream: DuplexStream) -> Self { Self { stream } } async fn next(&mut self) -> io::Result<Option<String>> { let mut bytes = Vec::new(); let mut buf = [0]; while self.stream.read(&mut buf[..]).await? != 0 { bytes.push(buf[0]); if buf[0] == b'\n' { break; } } if bytes.is_empty() { return Ok(None); } let s = String::from_utf8(bytes) .map_err(|_| io::Error::new(ErrorKind::InvalidData, "not UTF-8"))?; Ok(Some(s)) } } async fn slow_copy(source: String, mut dest: DuplexStream) -> std::io::Result<()> { for b in source.bytes() { dest.write_u8(b).await?; tokio::time::sleep(Duration::from_millis(10)).await } Ok(()) } #[tokio::main] async fn main() -> std::io::Result<()> { let (client, server) = tokio::io::duplex(5); let handle = tokio::spawn(slow_copy("hi\nthere\n".to_owned(), client)); let mut lines = LinesReader::new(server); let mut interval = tokio::time::interval(Duration::from_millis(60)); loop { tokio::select! { _ = interval.tick() => println!("tick!"), line = lines.next() => if let Some(l) = line? { print!("{}", l) } else { break }, } } handle.await.unwrap()?; Ok(()) }
-
کامپایلر در مورد cancellation-safety کمکی نمیکند. باید مستندات API را بخوانید و در نظر بگیرید که
async fn
شما چه وضعیتی دارد. -
برخلاف
panic
و?
، لغو یا cancellation بخشی از جریان کنترل عادی (و رسیدگی به خطا) است. -
اسن مثال بخشهایی از string را از دست میدهد.
-
هر زمان که شاخه
tick()
اول تمام شود،next()
وbuf
آن حذف می شوند. -
LinesReader
را میتوان با تبدیلbuf
به بخشی از ساختار، ایمن کرد:#![allow(unused)] fn main() { struct LinesReader { stream: DuplexStream, bytes: Vec<u8>, buf: [u8; 1], } impl LinesReader { fn new(stream: DuplexStream) -> Self { Self { stream, bytes: Vec::new(), buf: [0] } } async fn next(&mut self) -> io::Result<Option<String>> { // prefix buf and bytes with self. // ... let raw = std::mem::take(&mut self.bytes); let s = String::from_utf8(raw) .map_err(|_| io::Error::new(ErrorKind::InvalidData, "not UTF-8"))?; // ... } } }
-
-
'Interval::tick' برای cancellation-safe است زیرا ردیابی میکند که آیا یک tick تحویل داده شده است.
-
AsyncReadExt::read
برای cancellation-safe است زیرا دادهها را برمیگرداند یا نمیخواند. -
AsyncBufReadExt::read_line
مشابه مثال است و شبیه cancellation-safe نیست. برای جزئیات و موارد جایگزین به مستندات آن مراجعه کنید.