لغو

کنار گذاشتن future به این معنی است که دیگر هرگز نمی‌توان آن را poll کرد. به این حالت cancellation می‌گویند و می‌تواند در هر نقطه await رخ دهد. برای اطمینان از عملکرد صحیح سیستم حتی در صورت لغو futureها، دقت مناسب لازم است. به‌عنوان ‌مثال، نباید داده‌ها را از دست بدهد یا به بن‌بست (deadlock) برسد.

use std::io::{self, ErrorKind};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};

struct LinesReader {
    stream: DuplexStream,
}

impl LinesReader {
    fn new(stream: DuplexStream) -> Self {
        Self { stream }
    }

    async fn next(&mut self) -> io::Result<Option<String>> {
        let mut bytes = Vec::new();
        let mut buf = [0];
        while self.stream.read(&mut buf[..]).await? != 0 {
            bytes.push(buf[0]);
            if buf[0] == b'\n' {
                break;
            }
        }
        if bytes.is_empty() {
            return Ok(None);
        }
        let s = String::from_utf8(bytes)
            .map_err(|_| io::Error::new(ErrorKind::InvalidData, "not UTF-8"))?;
        Ok(Some(s))
    }
}

async fn slow_copy(source: String, mut dest: DuplexStream) -> std::io::Result<()> {
    for b in source.bytes() {
        dest.write_u8(b).await?;
        tokio::time::sleep(Duration::from_millis(10)).await
    }
    Ok(())
}

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let (client, server) = tokio::io::duplex(5);
    let handle = tokio::spawn(slow_copy("hi\nthere\n".to_owned(), client));

    let mut lines = LinesReader::new(server);
    let mut interval = tokio::time::interval(Duration::from_millis(60));
    loop {
        tokio::select! {
            _ = interval.tick() => println!("tick!"),
            line = lines.next() => if let Some(l) = line? {
                print!("{}", l)
            } else {
                break
            },
        }
    }
    handle.await.unwrap()?;
    Ok(())
}
This slide should take about 18 minutes.
  • کامپایلر در مورد cancellation-safety کمکی نمی‌کند. باید مستندات API را بخوانید و در نظر بگیرید که async fn شما چه وضعیتی دارد.

  • برخلاف panic و ?، لغو یا cancellation بخشی از جریان کنترل عادی (و رسیدگی به خطا) است.

  • اسن مثال بخش‌هایی از string را از دست می‌دهد.

    • هر زمان که شاخه tick() اول تمام شود، next() و buf آن حذف می شوند.

    • LinesReader را می‌توان با تبدیل buf به بخشی از ساختار، ایمن کرد:

      #![allow(unused)]
      fn main() {
      struct LinesReader {
          stream: DuplexStream,
          bytes: Vec<u8>,
          buf: [u8; 1],
      }
      
      impl LinesReader {
          fn new(stream: DuplexStream) -> Self {
              Self { stream, bytes: Vec::new(), buf: [0] }
          }
          async fn next(&mut self) -> io::Result<Option<String>> {
              // prefix buf and bytes with self.
              // ...
              let raw = std::mem::take(&mut self.bytes);
              let s = String::from_utf8(raw)
                  .map_err(|_| io::Error::new(ErrorKind::InvalidData, "not UTF-8"))?;
              // ...
          }
      }
      }
  • 'Interval::tick' برای cancellation-safe است زیرا ردیابی می‌کند که آیا یک tick تحویل داده شده است.

  • AsyncReadExt::read برای cancellation-safe است زیرا داده‌ها را برمی‌گرداند یا نمی‌خواند.

  • AsyncBufReadExt::read_line مشابه مثال است و شبیه cancellation-safe نیست. برای جزئیات و موارد جایگزین به مستندات آن مراجعه کنید.