Concurrency Afternoon Exercise
Dining Philosophers โ Async
use std::sync::Arc; use tokio::sync::mpsc::{self, Sender}; use tokio::sync::Mutex; use tokio::time; struct Fork; struct Philosopher { name: String, left_fork: Arc<Mutex<Fork>>, right_fork: Arc<Mutex<Fork>>, thoughts: Sender<String>, } impl Philosopher { async fn think(&self) { self.thoughts .send(format!("์ ๋ ์นด! {}์ ์๋ก์ด ์์ด๋์ด๊ฐ ์์ต๋๋ค.", &self.name)) .await .unwrap(); } async fn eat(&self) { // Keep trying until we have both forks let (_left_fork, _right_fork) = loop { // ํฌํฌ๋ฅผ ๋์ธ์... let left_fork = self.left_fork.try_lock(); let right_fork = self.right_fork.try_lock(); let Ok(left_fork) = left_fork else { // If we didn't get the left fork, drop the right fork if we // have it and let other tasks make progress. drop(right_fork); time::sleep(time::Duration::from_millis(1)).await; continue; }; let Ok(right_fork) = right_fork else { // If we didn't get the right fork, drop the left fork and let // other tasks make progress. drop(left_fork); time::sleep(time::Duration::from_millis(1)).await; continue; }; break (left_fork, right_fork); }; println!("{}๋์ด ๋จน๊ณ ์์ต๋๋ค...", &self.name); time::sleep(time::Duration::from_millis(5)).await; // ์ฌ๊ธฐ์ ์ ๊ธ์ด ํด์ ๋ฉ๋๋ค. } } static PHILOSOPHERS: &[&str] = &["Socrates", "ํํํฐ์", "ํ๋ผํค", "์๋ฆฌ์คํ ํ ๋ ์ค", "ํผํ๊ณ ๋ผ์ค"]; #[tokio::main] async fn main() { // ํฌํฌ ๋ง๋ค๊ธฐ let mut forks = vec![]; (0..PHILOSOPHERS.len()).for_each(|_| forks.push(Arc::new(Mutex::new(Fork)))); // ์ฒ ํ์ ๋ง๋ค๊ธฐ let (philosophers, mut rx) = { let mut philosophers = vec![]; let (tx, rx) = mpsc::channel(10); for (i, name) in PHILOSOPHERS.iter().enumerate() { let left_fork = Arc::clone(&forks[i]); let right_fork = Arc::clone(&forks[(i + 1) % PHILOSOPHERS.len()]); philosophers.push(Philosopher { name: name.to_string(), left_fork, right_fork, thoughts: tx.clone(), }); } (philosophers, rx) // tx๊ฐ ์ฌ๊ธฐ์์ ์ญ์ ๋๋ฏ๋ก ๋์ค์ ๋ช ์์ ์ผ๋ก ์ญ์ ํ ํ์๊ฐ ์์ต๋๋ค. }; // ์๊ฐํ๊ณ ๋จน๊ฒ ํฉ๋๋ค. for phil in philosophers { tokio::spawn(async move { for _ in 0..100 { phil.think().await; phil.eat().await; } }); } // ์๊ฐ์ ์ถ๋ ฅํฉ๋๋ค. while let Some(thought) = rx.recv().await { println!("์๊ฒฌ ๋ณด๋ด๊ธฐ: {thought}"); } }
์ฑํ ์ ํ๋ฆฌ์ผ์ด์
(์ฐ์ต๋ฌธ์ ๋ก ๋์๊ฐ๊ธฐ)
src/bin/server.rs:
use futures_util::sink::SinkExt; use futures_util::stream::StreamExt; use std::error::Error; use std::net::SocketAddr; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::broadcast::{channel, Sender}; use tokio_websockets::{Message, ServerBuilder, WebSocketStream}; async fn handle_connection( addr: SocketAddr, mut ws_stream: WebSocketStream<TcpStream>, bcast_tx: Sender<String>, ) -> Result<(), Box<dyn Error + Send + Sync>> { ws_stream .send(Message::text("์ฑํ ์ ์ค์ ๊ฒ์ ํ์ํฉ๋๋ค. ๋ฉ์์ง๋ฅผ ์ ๋ ฅํ์ธ์.".to_string())) .await?; let mut bcast_rx = bcast_tx.subscribe(); // ๋์์ ๋ ์์ ์ ์คํํ๋ ์ฐ์ ๋ฃจํ: (1) `ws_stream`์์ ๋ฉ์์ง๋ฅผ ์์ ํ์ฌ // ๋ธ๋ก๋์บ์คํ ํ๊ณ (2) `bcast_rx`์์ ๋ฉ์์ง๋ฅผ ์์ ํ์ฌ // ํด๋ผ์ด์ธํธ๋ก ์ ์กํฉ๋๋ค. loop { tokio::select! { incoming = ws_stream.next() => { match incoming { Some(Ok(msg)) => { if let Some(text) = msg.as_text() { println!("ํด๋ผ์ด์ธํธ์์: {addr:?} {text:?}"); bcast_tx.send(text.into())?; } } Some(Err(err)) => return Err(err.into()), None => return Ok(()), } } msg = bcast_rx.recv() => { ws_stream.send(Message::text(msg?)).await?; } } } } #[tokio::main] async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { let (bcast_tx, _) = channel(16); let listener = TcpListener::bind("127.0.0.1:2000").await?; println!("ํฌํธ 2000์์ ์์ ๋๊ธฐ"); loop { let (socket, addr) = listener.accept().await?; println!("{addr:?}์ ์ ์ฐ๊ฒฐ"); let bcast_tx = bcast_tx.clone(); tokio::spawn(async move { // ์์ TCP ์คํธ๋ฆผ์ websocket์ ๋ํํฉ๋๋ค. let ws_stream = ServerBuilder::new().accept(socket).await?; handle_connection(addr, ws_stream, bcast_tx).await }); } }
src/bin/client.rs:
use futures_util::stream::StreamExt; use futures_util::SinkExt; use http::Uri; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio_websockets::{ClientBuilder, Message}; #[tokio::main] async fn main() -> Result<(), tokio_websockets::Error> { let (mut ws_stream, _) = ClientBuilder::from_uri(Uri::from_static("ws://127.0.0.1:2000")) .connect() .await?; let stdin = tokio::io::stdin(); let mut stdin = BufReader::new(stdin).lines(); // ๋์์ ๋ฉ์์ง๋ฅผ ๋ณด๋ด๊ณ ๋ฐ๋ ์ฐ์ ๋ฃจํ loop { tokio::select! { incoming = ws_stream.next() => { match incoming { Some(Ok(msg)) => { if let Some(text) = msg.as_text() { println!("์๋ฒ์์: {}", text); } }, Some(Err(err)) => return Err(err.into()), None => return Ok(()), } } res = stdin.next_line() => { match res { Ok(None) => return Ok(()), Ok(Some(line)) => ws_stream.send(Message::text(line.to_string())).await?, Err(err) => return Err(err.into()), } } } } }