Pin
非同期ブロックと関数は、Future
トレイトを実装する型を返します。返される型は、ローカル変数を Future の内部に格納されるデータに変換するコンパイラ変換の結果です。
これらの変数の一部は、他のローカル変数へのポインタを保持できます。これらのポインタが無効になるため、Futureを別のメモリ位置に移動しないでください。
メモリ内の Future 型が移動するのを防ぐには、固定されたポインタのみを介してポーリングするようにします。Pin
は参照のラッパーで、参照先のインスタンスを別のメモリ位置に移動するオペレーションをすべて禁止します。
use tokio::sync::{mpsc, oneshot}; use tokio::task::spawn; use tokio::time::{sleep, Duration}; // 作業アイテム。この場合、指定された時間だけスリープし、 // `respond_on` チャンネルでメッセージを返します。 #[derive(Debug)] struct Work { input: u32, respond_on: oneshot::Sender<u32>, } // キュー上の処理をリッスンして実行するワーカー。 async fn worker(mut work_queue: mpsc::Receiver<Work>) { let mut iterations = 0; loop { tokio::select! { Some(work) = work_queue.recv() => { sleep(Duration::from_millis(10)).await; // Pretend to work. work.respond_on .send(work.input * 1000) .expect("failed to send response"); iterations += 1; } // TODO: 100 ミリ秒ごとの反復処理の回数をレポート } } } // 処理をリクエストし、処理が完了するまで待機するリクエスト元。 async fn do_work(work_queue: &mpsc::Sender<Work>, input: u32) -> u32 { let (tx, rx) = oneshot::channel(); work_queue .send(Work { input, respond_on: tx }) .await .expect("failed to send on work queue"); rx.await.expect("failed waiting for response") } #[tokio::main] async fn main() { let (tx, rx) = mpsc::channel(10); spawn(worker(rx)); for i in 0..100 { let resp = do_work(&tx, i).await; println!("work result for iteration {i}: {resp}"); } }
-
これはアクターのパターンの一例です。アクターは通常、ループ内で
select!
を呼び出します。 -
これはこれまでのレッスンの一部をまとめたものですので、時間をかけて復習してください。
-
_ = sleep(Duration::from_millis(100)) => { println!(..) }
をselect!
に追加しただけでは、実行されません。なぜでしょうか? -
代わりに、
loop
の外側で、その Future を含むtimeout_fut
を追加します。#![allow(unused)] fn main() { let timeout_fut = sleep(Duration::from_millis(100)); loop { select! { .., _ = timeout_fut => { println!(..); }, } } }
-
これでもうまくいきません。コンパイルエラーにあるように、
select!
内のtimeout_fut
に&mut
を追加して移動を回避してから、Box::pin
を使用します。#![allow(unused)] fn main() { let mut timeout_fut = Box::pin(sleep(Duration::from_millis(100))); loop { select! { .., _ = &mut timeout_fut => { println!(..); }, } } }
-
This compiles, but once the timeout expires it is
Poll::Ready
on every iteration (a fused future would help with this). Update to resettimeout_fut
every time it expires:#![allow(unused)] fn main() { let mut timeout_fut = Box::pin(sleep(Duration::from_millis(100))); loop { select! { _ = &mut timeout_fut => { println!(..); timeout_fut = Box::pin(sleep(Duration::from_millis(100))); }, } } }
-
-
Box でヒープに割り当てます。場合によっては
std::pin::pin!
(最近安定化されたばかりで、古いコードでは多くの場合にtokio::pin!
を使用します)も使用できますが、再割り当てされる Future に使用することは困難です。 -
別の方法としては、
pin
をまったく使用せずに、100 ミリ秒ごとにoneshot
チャネルに送信する別のタスクを生成するという方法もあります。 -
それ自体へのポインタを含むデータは、自己参照と呼ばれます。通常、Rust 借用チェッカーは、参照が参照先のデータより長く存続できないため、自己参照データの移動を防ぎます。ただし、非同期ブロックと関数のコード変換は、借用チェッカーによって検証されません。
-
Pin
は参照のラッパーです。固定されたポインタを使用して、オブジェクトをその場所から移動することはできません。ただし、固定されていないポインタを介して移動することは可能です。 -
Future
トレイトのpoll
メソッドは、&mut Self
ではなくPin<&mut Self>
を使用してインスタンスを参照します。固定されたポインタでのみ呼び出すことができるのはこのためです。