Pin

Асинхронні блоки та функції повертають типи, що реалізують трейт Future. Тип, що повертається, є результатом трансформації компілятора, який перетворює локальні змінні на дані, що зберігаються у ф'ючерсі.

Деякі з цих змінних можуть містити вказівники на інші локальні змінні. Через це ф'ючерс ніколи не слід переміщувати в іншу комірку пам'яті, оскільки це зробить ці вказівники недійсними.

Щоб запобігти переміщенню ф'ючерсного типу у пам'яті, його можна опитувати лише через закріплений вказівник. Закріплення - це обгортка навколо посилання, яка забороняє всі операції, що можуть перемістити екземпляр, на який воно вказує, в іншу ділянку пам'яті.

use tokio::sync::{mpsc, oneshot};
use tokio::task::spawn;
use tokio::time::{sleep, Duration};

// Робочий елемент. У цьому випадку просто заснути на заданий час і
// відповісти повідомленням на каналі `respond_on`.
#[derive(Debug)]
struct Work {
    input: u32,
    respond_on: oneshot::Sender<u32>,
}

// Робочий, який чекає на роботу у черзі та виконує її.
async fn worker(mut work_queue: mpsc::Receiver<Work>) {
    let mut iterations = 0;
    loop {
        tokio::select! {
            Some(work) = work_queue.recv() => {
                sleep(Duration::from_millis(10)).await; // Вдається, що працює.
                work.respond_on
                    .send(work.input * 1000)
                    .expect("не вдалося надіслати відповідь");
                iterations += 1;
            }
            // TODO: виводити кількість ітерацій кожні 100 мс
        }
    }
}

// Запитувач, який надсилає запит на виконання роботи і чекає на її завершення.
async fn do_work(work_queue: &mpsc::Sender<Work>, input: u32) -> u32 {
    let (tx, rx) = oneshot::channel();
    work_queue
        .send(Work { input, respond_on: tx })
        .await
        .expect("не вдалося відправити в робочу чергу");
    rx.await.expect("не вдалося дочекатися відповіді")
}

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel(10);
    spawn(worker(rx));
    for i in 0..100 {
        let resp = do_work(&tx, i).await;
        println!("результат роботи для ітерації {i}: {resp}");
    }
}
This slide should take about 20 minutes.
  • Ви можете розпізнати це як приклад шаблону актора. Актори зазвичай викликають select! у циклі.

  • Це є підсумком кількох попередніх уроків, тож не поспішайте з цим.

    • Наївно додайте _ = sleep(Duration::from_millis(100)) => { println!(..) } до select!. Це ніколи не буде виконано. Чому?

    • Замість цього додайте timeout_fut, що містить цей ф'юсчерс за межами loop:

      #![allow(unused)]
      fn main() {
      let timeout_fut = sleep(Duration::from_millis(100));
      loop {
          select! {
              ..,
              _ = timeout_fut => { println!(..); },
          }
      }
      }
    • Це все ще не працює. Слідкуйте за помилками компілятора, додавши &mut до timeout_fut у select!, щоб обійти переміщення, а потім використовуючи Box::pin:

      #![allow(unused)]
      fn main() {
      let mut timeout_fut = Box::pin(sleep(Duration::from_millis(100)));
      loop {
          select! {
              ..,
              _ = &mut timeout_fut => { println!(..); },
          }
      }
      }
    • Це компілюється, але після закінчення часу очікування на кожній ітерації відображається Poll::Ready (злитий ф'ючерс міг би допомогти в цьому). Оновіть, щоб скидати timeout_fut кожного разу, коли він спливає:

      #![allow(unused)]
      fn main() {
      let mut timeout_fut = Box::pin(sleep(Duration::from_millis(100)));
      loop {
          select! {
              _ = &mut timeout_fut => {
                  println!(..);
                  timeout_fut = Box::pin(sleep(Duration::from_millis(100)));
              },
          }
      }
      }
  • Box виділяє у купі. У деяких випадках, std::pin::pin! (лише нещодавно стабілізовано, у старому коді часто використовується tokio::pin!) також є варіантом, але його важко використовувати для фьючерсів, які перепризначено.

  • Інша альтернатива — взагалі не використовувати pin, а створювати інше завдання, яке буде надсилати на канал oneshot кожні 100 мс.

  • Дані, які містять вказівники на себе, називаються самопосилальними. Зазвичай, перевірка запозичень у Rust запобігає переміщенню самопосилань, оскільки посилання не можуть пережити дані, на які вони вказують. Однак, перетворення коду для асинхронних блоків і функцій не перевіряється перевіркою запозичень.

  • Pin - це обгортка навколо вказівника. Об'єкт не можна перемістити з його місця за допомогою закріпленого вказівника. Однак, його можна переміщати за допомогою незакріпленого вказівника.

  • Метод poll трейту Future використовує Pin<&mut Self> замість &mut Self для посилання на екземпляр. Тому його можна викликати лише на закріпленому покажчику.