์ทจ์†Œ

future๊ฐ€ ๋ˆ„๋ฝ๋˜๋ฉด ๋‹ค์‹œ ํด๋งํ•  ์ˆ˜ ์—†๋‹ค๋Š” ์˜๋ฏธ์ž…๋‹ˆ๋‹ค. ์ด๋ฅผ _์ทจ์†Œ_๋ผ๊ณ  ํ•˜๋ฉฐ, await ์ง€์ ์—์„œ ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. future๊ฐ€ ์ทจ์†Œ๋˜๋”๋ผ๋„ ์‹œ์Šคํ…œ์ด ์˜ฌ๋ฐ”๋ฅด๊ฒŒ ์ž‘๋™ํ•  ์ˆ˜ ์žˆ๋„๋ก ์ฃผ์˜๋ฅผ ๊ธฐ์šธ์—ฌ์•ผ ํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด ๊ต์ฐฉ ์ƒํƒœ๊ฐ€ ๋˜๊ฑฐ๋‚˜ ๋ฐ์ดํ„ฐ๊ฐ€ ์†์‹ค๋˜๋ฉด ์•ˆ ๋ฉ๋‹ˆ๋‹ค.

use std::io::{self, ErrorKind};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};

struct LinesReader {
    stream: DuplexStream,
}

impl LinesReader {
    fn new(stream: DuplexStream) -> Self {
        Self { stream }
    }

    async fn next(&mut self) -> io::Result<Option<String>> {
        let mut bytes = Vec::new();
        let mut buf = [0];
        while self.stream.read(&mut buf[..]).await? != 0 {
            bytes.push(buf[0]);
            if buf[0] == b'\n' {
                break;
            }
        }
        if bytes.is_empty() {
            return Ok(None);
        }
        let s = String::from_utf8(bytes)
            .map_err(|_| io::Error::new(ErrorKind::InvalidData, "not UTF-8"))?;
        Ok(Some(s))
    }
}

async fn slow_copy(source: String, mut dest: DuplexStream) -> std::io::Result<()> {
    for b in source.bytes() {
        dest.write_u8(b).await?;
        tokio::time::sleep(Duration::from_millis(10)).await
    }
    Ok(())
}

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let (client, server) = tokio::io::duplex(5);
    let handle = tokio::spawn(slow_copy("hi\nthere\n".to_owned(), client));

    let mut lines = LinesReader::new(server);
    let mut interval = tokio::time::interval(Duration::from_millis(60));
    loop {
        tokio::select! {
            _ = interval.tick() => println!("ํ‹ฑ!"),
            line = lines.next() => if let Some(l) = line? {
                print!("{}", l)
            } else {
                break
            },
        }
    }
    handle.await.unwrap()?;
    Ok(())
}
  • ์ปดํŒŒ์ผ๋Ÿฌ๋Š” ์ทจ์†Œ ์•ˆ์ „์— ๋„์›€์ด ๋˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. API ๋ฌธ์„œ๋ฅผ ์ฝ๊ณ  async fn์˜ ์ƒํƒœ๋ฅผ ๊ณ ๋ คํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

  • panic ๋ฐ ?์™€ ๋‹ฌ๋ฆฌ ์ทจ์†Œ๋Š” ์˜ค๋ฅ˜ ์ฒ˜๋ฆฌ๊ฐ€ ์•„๋‹Œ ์ผ๋ฐ˜์ ์ธ ์ œ์–ด ํ๋ฆ„์˜ ์ผ๋ถ€์ž…๋‹ˆ๋‹ค.

  • ์ด ์˜ˆ์—์„œ๋Š” ๋ฌธ์ž์—ด์˜ ์ผ๋ถ€๊ฐ€ ์†์‹ค๋ฉ๋‹ˆ๋‹ค.

    • tick() ๋ธŒ๋žœ์น˜๊ฐ€ ๋จผ์ € ์™„๋ฃŒ๋  ๋•Œ๋งˆ๋‹ค next() ๋ฐ buf๊ฐ€ ์‚ญ์ œ๋ฉ๋‹ˆ๋‹ค.

    • ๋‹ค์Œ๊ณผ ๊ฐ™์ด buf๋ฅผ ๊ตฌ์กฐ์ฒด์˜ ์ผ๋ถ€๋กœ ๋งŒ๋“ค์–ด LinesReader๊ฐ€ ์ทจ์†Œ๋˜์ง€ ์•Š๋„๋ก ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

      #![allow(unused)]
      fn main() {
      struct LinesReader {
          stream: DuplexStream,
          bytes: Vec<u8>,
          buf: [u8; 1],
      }
      
      impl LinesReader {
          fn new(stream: DuplexStream) -> Self {
              Self { stream, bytes: Vec::new(), buf: [0] }
          }
          async fn next(&mut self) -> io::Result<Option<String>> {
              // buf ๋ฐ bytes ์ ‘๋‘์‚ฌ๋ฅผ self๋กœ ์ง€์ •ํ•ฉ๋‹ˆ๋‹ค.
              // ...
              let raw = std::mem::take(&mut self.bytes);
              let s = String::from_utf8(raw)
              // ...
          }
      }
      }
  • Interval::tick์€ ํ‹ฑ์ด โ€˜deliveredโ€™ ๋๋Š”์ง€ ์ถ”์ ํ•˜๋ฏ€๋กœ ์ทจ์†Œ์— ์•ˆ์ „ํ•ฉ๋‹ˆ๋‹ค.

  • AsyncReadExt::read๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ˜ํ™˜ํ•˜๊ฑฐ๋‚˜ ์ฝ์ง€ ์•Š์œผ๋ฏ€๋กœ ์ทจ์†Œ์— ์•ˆ์ „ํ•ฉ๋‹ˆ๋‹ค.

  • AsyncBufReadExt::read_line์€ ์˜ˆ์™€ ์œ ์‚ฌํ•˜๋ฉฐ ์ทจ์†Œ์— ์•ˆ์ „ํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ์ž์„ธํ•œ ๋‚ด์šฉ๊ณผ ๋Œ€์•ˆ์€ ๊ด€๋ จ ๋ฌธ์„œ๋ฅผ ์ฐธ๊ณ ํ•˜์„ธ์š”.