Pin

非同期ブロックと関数は、Future トレイトを実装する型を返します。返される型は、ローカル変数を Future の内部に格納されるデータに変換するコンパイラ変換の結果です。

これらの変数の一部は、他のローカル変数へのポインタを保持できます。これらのポインタが無効になるため、Futureを別のメモリ位置に移動しないでください。

メモリ内の Future 型が移動するのを防ぐには、固定されたポインタのみを介してポーリングするようにします。Pin は参照のラッパーで、参照先のインスタンスを別のメモリ位置に移動するオペレーションをすべて禁止します。

use tokio::sync::{mpsc, oneshot};
use tokio::task::spawn;
use tokio::time::{sleep, Duration};

// 作業アイテム。この場合、指定された時間だけスリープし、
// `respond_on` チャンネルでメッセージを返します。
#[derive(Debug)]
struct Work {
    input: u32,
    respond_on: oneshot::Sender<u32>,
}

// キュー上の処理をリッスンして実行するワーカー。
async fn worker(mut work_queue: mpsc::Receiver<Work>) {
    let mut iterations = 0;
    loop {
        tokio::select! {
            Some(work) = work_queue.recv() => {
                sleep(Duration::from_millis(10)).await; // Pretend to work.
                work.respond_on
                    .send(work.input * 1000)
                    .expect("failed to send response");
                iterations += 1;
            }
            // TODO: 100 ミリ秒ごとの反復処理の回数をレポート
        }
    }
}

// 処理をリクエストし、処理が完了するまで待機するリクエスト元。
async fn do_work(work_queue: &mpsc::Sender<Work>, input: u32) -> u32 {
    let (tx, rx) = oneshot::channel();
    work_queue
        .send(Work { input, respond_on: tx })
        .await
        .expect("failed to send on work queue");
    rx.await.expect("failed waiting for response")
}

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel(10);
    spawn(worker(rx));
    for i in 0..100 {
        let resp = do_work(&tx, i).await;
        println!("work result for iteration {i}: {resp}");
    }
}
This slide should take about 20 minutes.
  • これはアクターのパターンの一例です。アクターは通常、ループ内で select! を呼び出します。

  • これはこれまでのレッスンの一部をまとめたものですので、時間をかけて復習してください。

    • _ = sleep(Duration::from_millis(100)) => { println!(..) }select! に追加しただけでは、実行されません。なぜでしょうか?

    • 代わりに、loop の外側で、その Future を含む timeout_fut を追加します。

      #![allow(unused)]
      fn main() {
      let timeout_fut = sleep(Duration::from_millis(100));
      loop {
          select! {
              ..,
              _ = timeout_fut => { println!(..); },
          }
      }
      }
    • これでもうまくいきません。コンパイルエラーにあるように、select! 内の timeout_fut&mut を追加して移動を回避してから、Box::pin を使用します。

      #![allow(unused)]
      fn main() {
      let mut timeout_fut = Box::pin(sleep(Duration::from_millis(100)));
      loop {
          select! {
              ..,
              _ = &mut timeout_fut => { println!(..); },
          }
      }
      }
    • This compiles, but once the timeout expires it is Poll::Ready on every iteration (a fused future would help with this). Update to reset timeout_fut every time it expires:

      #![allow(unused)]
      fn main() {
      let mut timeout_fut = Box::pin(sleep(Duration::from_millis(100)));
      loop {
          select! {
              _ = &mut timeout_fut => {
                  println!(..);
                  timeout_fut = Box::pin(sleep(Duration::from_millis(100)));
              },
          }
      }
      }
  • Box でヒープに割り当てます。場合によっては std::pin::pin!(最近安定化されたばかりで、古いコードでは多くの場合に tokio::pin! を使用します)も使用できますが、再割り当てされる Future に使用することは困難です。

  • 別の方法としては、pin をまったく使用せずに、100 ミリ秒ごとに oneshot チャネルに送信する別のタスクを生成するという方法もあります。

  • それ自体へのポインタを含むデータは、自己参照と呼ばれます。通常、Rust 借用チェッカーは、参照が参照先のデータより長く存続できないため、自己参照データの移動を防ぎます。ただし、非同期ブロックと関数のコード変換は、借用チェッカーによって検証されません。

  • Pin は参照のラッパーです。固定されたポインタを使用して、オブジェクトをその場所から移動することはできません。ただし、固定されていないポインタを介して移動することは可能です。

  • Future トレイトの poll メソッドは、&mut Self ではなく Pin<&mut Self> を使用してインスタンスを参照します。固定されたポインタでのみ呼び出すことができるのはこのためです。