Pin

非同步區塊和函式會傳回實作 Future 特徵的型別。傳回的型別是編譯器轉換的結果,會將本機變數轉換成在 Future 中儲存的資料。

其中一些變數可保留指向其他本機變數的指標。因此,Future 不應移至其他記憶體位置,以免這些指標失效。

為避免在記憶體中移動 Future 型別,此型別只能透過固定指標輪詢。Pin 是參照的包裝函式,會禁止所有將所指向例項移至不同記憶體位置的作業。

use tokio::sync::{mpsc, oneshot};
use tokio::task::spawn;
use tokio::time::{sleep, Duration};

// A work item. In this case, just sleep for the given time and respond
// with a message on the `respond_on` channel.
#[derive(Debug)]
struct Work {
    input: u32,
    respond_on: oneshot::Sender<u32>,
}

// A worker which listens for work on a queue and performs it.
async fn worker(mut work_queue: mpsc::Receiver<Work>) {
    let mut iterations = 0;
    loop {
        tokio::select! {
            Some(work) = work_queue.recv() => {
                sleep(Duration::from_millis(10)).await; // Pretend to work.
                work.respond_on
                    .send(work.input * 1000)
                    .expect("failed to send response");
                iterations += 1;
            }
            // TODO: report number of iterations every 100ms
        }
    }
}

// A requester which requests work and waits for it to complete.
async fn do_work(work_queue: &mpsc::Sender<Work>, input: u32) -> u32 {
    let (tx, rx) = oneshot::channel();
    work_queue
        .send(Work { input, respond_on: tx })
        .await
        .expect("failed to send on work queue");
    rx.await.expect("failed waiting for response")
}

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel(10);
    spawn(worker(rx));
    for i in 0..100 {
        let resp = do_work(&tx, i).await;
        println!("work result for iteration {i}: {resp}");
    }
}
  • 您可以將此視為演員模型的例子。演員通常會在迴圈中呼叫 select!

  • 這彙整了先前幾堂課的內容,您可以放鬆慢慢做。

    • 直接的將 _ = sleep(Duration::from_millis(100)) => { println!(..) } 新增至 select!。這永遠不會執行。為什麼?

    • 請改為在 loop 之外新增含有該 Future 的 timeout_fut

      #![allow(unused)]
      fn main() {
      let mut timeout_fut = sleep(Duration::from_millis(100));
      loop {
          select! {
              ..,
              _ = timeout_fut => { println!(..); },
          }
      }
      }
    • 這樣還是無法運作。請根據編譯器錯誤,將 &mut 新增至 select! 中的 timeout_fut 來處理移動作業,然後使用 Box::pin

      #![allow(unused)]
      fn main() {
      let mut timeout_fut = Box::pin(sleep(Duration::from_millis(100)));
      loop {
          select! {
              ..,
              _ = &mut timeout_fut => { println!(..); },
          }
      }
      }
    • 這樣可以編譯,但逾時到期後,每次疊代都會是 Poll::Ready (熔斷型 Future 有助解決這個問題)。更新即可在每次到期時重設 timeout_fut

  • Box 會在堆積上分配。在某些情況下,也可以採用 std::pin::pin! (最近才推出穩定版,舊版程式碼通常使用 tokio::pin!),但這不容易用於已重新指派的 Future。

  • 另一個替代方案是完全不使用 pin,改為產生另一項工作,該工作每 100 毫秒就會傳送至 oneshot 管道。

  • 包含指向自己的指標的資料稱為自我參照。一般而言,Rust 借用檢查器會禁止移動自我參照資料,因為參照的留存時間不會超過其指向的資料。不過,非同步區塊和函式的程式碼轉換不會由借用檢查器驗證。

  • Pin 是參照的包裝函式。如要從現有位置移動物件,使用固定指標是行不通的。但若使用未固定的指標,則仍可以移動物件。

  • Future 特徵的 poll 方法是使用 Pin<&mut Self> 參照例項,而非使用 &mut Self。因此,您只能在固定指標上呼叫這個方法。