Concurrency Afternoon Exercise

Dining Philosophers โ€” Async

(์—ฐ์Šต๋ฌธ์ œ๋กœ ๋Œ์•„๊ฐ€๊ธฐ

use std::sync::Arc;
use tokio::sync::mpsc::{self, Sender};
use tokio::sync::Mutex;
use tokio::time;

struct Fork;

struct Philosopher {
    name: String,
    left_fork: Arc<Mutex<Fork>>,
    right_fork: Arc<Mutex<Fork>>,
    thoughts: Sender<String>,
}

impl Philosopher {
    async fn think(&self) {
        self.thoughts
            .send(format!("์œ ๋ ˆ์นด! {}์— ์ƒˆ๋กœ์šด ์•„์ด๋””์–ด๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค.", &self.name))
            .await
            .unwrap();
    }

    async fn eat(&self) {
        // Keep trying until we have both forks
        let (_left_fork, _right_fork) = loop {
            // ํฌํฌ๋ฅผ ๋“œ์„ธ์š”...
            let left_fork = self.left_fork.try_lock();
            let right_fork = self.right_fork.try_lock();
            let Ok(left_fork) = left_fork else {
                // If we didn't get the left fork, drop the right fork if we
                // have it and let other tasks make progress.
                drop(right_fork);
                time::sleep(time::Duration::from_millis(1)).await;
                continue;
            };
            let Ok(right_fork) = right_fork else {
                // If we didn't get the right fork, drop the left fork and let
                // other tasks make progress.
                drop(left_fork);
                time::sleep(time::Duration::from_millis(1)).await;
                continue;
            };
            break (left_fork, right_fork);
        };

        println!("{}๋‹˜์ด ๋จน๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค...", &self.name);
        time::sleep(time::Duration::from_millis(5)).await;

        // ์—ฌ๊ธฐ์— ์ž ๊ธˆ์ด ํ•ด์ œ๋ฉ๋‹ˆ๋‹ค.
    }
}

static PHILOSOPHERS: &[&str] =
    &["Socrates", "ํžˆํŒŒํ‹ฐ์•„", "ํ”Œ๋ผํ†ค", "์•„๋ฆฌ์Šคํ† ํ…”๋ ˆ์Šค", "ํ”ผํƒ€๊ณ ๋ผ์Šค"];

#[tokio::main]
async fn main() {
    // ํฌํฌ ๋งŒ๋“ค๊ธฐ
    let mut forks = vec![];
    (0..PHILOSOPHERS.len()).for_each(|_| forks.push(Arc::new(Mutex::new(Fork))));

    // ์ฒ ํ•™์ž ๋งŒ๋“ค๊ธฐ
    let (philosophers, mut rx) = {
        let mut philosophers = vec![];
        let (tx, rx) = mpsc::channel(10);
        for (i, name) in PHILOSOPHERS.iter().enumerate() {
            let left_fork = Arc::clone(&forks[i]);
            let right_fork = Arc::clone(&forks[(i + 1) % PHILOSOPHERS.len()]);
            philosophers.push(Philosopher {
                name: name.to_string(),
                left_fork,
                right_fork,
                thoughts: tx.clone(),
            });
        }
        (philosophers, rx)
        // tx๊ฐ€ ์—ฌ๊ธฐ์—์„œ ์‚ญ์ œ๋˜๋ฏ€๋กœ ๋‚˜์ค‘์— ๋ช…์‹œ์ ์œผ๋กœ ์‚ญ์ œํ•  ํ•„์š”๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค.
    };

    // ์ƒ๊ฐํ•˜๊ณ  ๋จน๊ฒŒ ํ•ฉ๋‹ˆ๋‹ค.
    for phil in philosophers {
        tokio::spawn(async move {
            for _ in 0..100 {
                phil.think().await;
                phil.eat().await;
            }
        });
    }

    // ์ƒ๊ฐ์„ ์ถœ๋ ฅํ•ฉ๋‹ˆ๋‹ค.
    while let Some(thought) = rx.recv().await {
        println!("์˜๊ฒฌ ๋ณด๋‚ด๊ธฐ: {thought}");
    }
}

์ฑ„ํŒ… ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜

(์—ฐ์Šต๋ฌธ์ œ๋กœ ๋Œ์•„๊ฐ€๊ธฐ)

src/bin/server.rs:

use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use std::error::Error;
use std::net::SocketAddr;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::broadcast::{channel, Sender};
use tokio_websockets::{Message, ServerBuilder, WebSocketStream};

async fn handle_connection(
    addr: SocketAddr,
    mut ws_stream: WebSocketStream<TcpStream>,
    bcast_tx: Sender<String>,
) -> Result<(), Box<dyn Error + Send + Sync>> {

    ws_stream
        .send(Message::text("์ฑ„ํŒ…์— ์˜ค์‹  ๊ฒƒ์„ ํ™˜์˜ํ•ฉ๋‹ˆ๋‹ค. ๋ฉ”์‹œ์ง€๋ฅผ ์ž…๋ ฅํ•˜์„ธ์š”.".to_string()))
        .await?;
    let mut bcast_rx = bcast_tx.subscribe();

    // ๋™์‹œ์— ๋‘ ์ž‘์—…์„ ์‹คํ–‰ํ•˜๋Š” ์—ฐ์† ๋ฃจํ”„: (1) `ws_stream`์—์„œ ๋ฉ”์‹œ์ง€๋ฅผ ์ˆ˜์‹ ํ•˜์—ฌ
    // ๋ธŒ๋กœ๋“œ์บ์ŠคํŒ…ํ•˜๊ณ  (2) `bcast_rx`์—์„œ ๋ฉ”์‹œ์ง€๋ฅผ ์ˆ˜์‹ ํ•˜์—ฌ
    // ํด๋ผ์ด์–ธํŠธ๋กœ ์ „์†กํ•ฉ๋‹ˆ๋‹ค.
    loop {
        tokio::select! {
            incoming = ws_stream.next() => {
                match incoming {
                    Some(Ok(msg)) => {
                        if let Some(text) = msg.as_text() {
                            println!("ํด๋ผ์ด์–ธํŠธ์—์„œ: {addr:?} {text:?}");
                            bcast_tx.send(text.into())?;
                        }
                    }
                    Some(Err(err)) => return Err(err.into()),
                    None => return Ok(()),
                }
            }
            msg = bcast_rx.recv() => {
                ws_stream.send(Message::text(msg?)).await?;
            }
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
    let (bcast_tx, _) = channel(16);

    let listener = TcpListener::bind("127.0.0.1:2000").await?;
    println!("ํฌํŠธ 2000์—์„œ ์ˆ˜์‹  ๋Œ€๊ธฐ");

    loop {
        let (socket, addr) = listener.accept().await?;
        println!("{addr:?}์˜ ์ƒˆ ์—ฐ๊ฒฐ");
        let bcast_tx = bcast_tx.clone();
        tokio::spawn(async move {
            // ์›์‹œ TCP ์ŠคํŠธ๋ฆผ์„ websocket์— ๋ž˜ํ•‘ํ•ฉ๋‹ˆ๋‹ค.
            let ws_stream = ServerBuilder::new().accept(socket).await?;

            handle_connection(addr, ws_stream, bcast_tx).await
        });
    }
}

src/bin/client.rs:

use futures_util::stream::StreamExt;
use futures_util::SinkExt;
use http::Uri;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_websockets::{ClientBuilder, Message};

#[tokio::main]
async fn main() -> Result<(), tokio_websockets::Error> {
    let (mut ws_stream, _) =
        ClientBuilder::from_uri(Uri::from_static("ws://127.0.0.1:2000"))
            .connect()
            .await?;

    let stdin = tokio::io::stdin();
    let mut stdin = BufReader::new(stdin).lines();

    // ๋™์‹œ์— ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ด๊ณ  ๋ฐ›๋Š” ์—ฐ์† ๋ฃจํ”„
    loop {
        tokio::select! {
            incoming = ws_stream.next() => {
                match incoming {
                    Some(Ok(msg)) => {
                        if let Some(text) = msg.as_text() {
                            println!("์„œ๋ฒ„์—์„œ: {}", text);
                        }
                    },
                    Some(Err(err)) => return Err(err.into()),
                    None => return Ok(()),
                }
            }
            res = stdin.next_line() => {
                match res {
                    Ok(None) => return Ok(()),
                    Ok(Some(line)) => ws_stream.send(Message::text(line.to_string())).await?,
                    Err(err) => return Err(err.into()),
                }
            }

        }
    }
}