Concurrency Afternoon Exercise
Dining Philosophers β Async
use std::sync::Arc;
use tokio::sync::mpsc::{self, Sender};
use tokio::sync::Mutex;
use tokio::time;
struct Fork;
struct Philosopher {
name: String,
left_fork: Arc<Mutex<Fork>>,
right_fork: Arc<Mutex<Fork>>,
thoughts: Sender<String>,
}
impl Philosopher {
async fn think(&self) {
self.thoughts
.send(format!("μ λ μΉ΄! {}μ μλ‘μ΄ μμ΄λμ΄κ° μμ΅λλ€.", &self.name))
.await
.unwrap();
}
async fn eat(&self) {
// Keep trying until we have both forks
let (_left_fork, _right_fork) = loop {
// ν¬ν¬λ₯Ό λμΈμ...
let left_fork = self.left_fork.try_lock();
let right_fork = self.right_fork.try_lock();
let Ok(left_fork) = left_fork else {
// If we didn't get the left fork, drop the right fork if we
// have it and let other tasks make progress.
drop(right_fork);
time::sleep(time::Duration::from_millis(1)).await;
continue;
};
let Ok(right_fork) = right_fork else {
// If we didn't get the right fork, drop the left fork and let
// other tasks make progress.
drop(left_fork);
time::sleep(time::Duration::from_millis(1)).await;
continue;
};
break (left_fork, right_fork);
};
println!("{}λμ΄ λ¨Ήκ³ μμ΅λλ€...", &self.name);
time::sleep(time::Duration::from_millis(5)).await;
// μ¬κΈ°μ μ κΈμ΄ ν΄μ λ©λλ€.
}
}
static PHILOSOPHERS: &[&str] =
&["Socrates", "ννν°μ", "νλΌν€", "μ리μ€ν ν
λ μ€", "νΌνκ³ λΌμ€"];
#[tokio::main]
async fn main() {
// ν¬ν¬ λ§λ€κΈ°
let mut forks = vec![];
(0..PHILOSOPHERS.len()).for_each(|_| forks.push(Arc::new(Mutex::new(Fork))));
// μ² νμ λ§λ€κΈ°
let (philosophers, mut rx) = {
let mut philosophers = vec![];
let (tx, rx) = mpsc::channel(10);
for (i, name) in PHILOSOPHERS.iter().enumerate() {
let left_fork = Arc::clone(&forks[i]);
let right_fork = Arc::clone(&forks[(i + 1) % PHILOSOPHERS.len()]);
philosophers.push(Philosopher {
name: name.to_string(),
left_fork,
right_fork,
thoughts: tx.clone(),
});
}
(philosophers, rx)
// txκ° μ¬κΈ°μμ μμ λλ―λ‘ λμ€μ λͺ
μμ μΌλ‘ μμ ν νμκ° μμ΅λλ€.
};
// μκ°νκ³ λ¨Ήκ² ν©λλ€.
for phil in philosophers {
tokio::spawn(async move {
for _ in 0..100 {
phil.think().await;
phil.eat().await;
}
});
}
// μκ°μ μΆλ ₯ν©λλ€.
while let Some(thought) = rx.recv().await {
println!("μ견 보λ΄κΈ°: {thought}");
}
}
μ±ν μ ν리μΌμ΄μ
(μ°μ΅λ¬Έμ λ‘ λμκ°κΈ°)
src/bin/server.rs:
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use std::error::Error;
use std::net::SocketAddr;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::broadcast::{channel, Sender};
use tokio_websockets::{Message, ServerBuilder, WebSocketStream};
async fn handle_connection(
addr: SocketAddr,
mut ws_stream: WebSocketStream<TcpStream>,
bcast_tx: Sender<String>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
ws_stream
.send(Message::text("μ±ν
μ μ€μ κ²μ νμν©λλ€. λ©μμ§λ₯Ό μ
λ ₯νμΈμ.".to_string()))
.await?;
let mut bcast_rx = bcast_tx.subscribe();
// λμμ λ μμ
μ μ€ννλ μ°μ 루ν: (1) `ws_stream`μμ λ©μμ§λ₯Ό μμ νμ¬
// λΈλ‘λμΊμ€ν
νκ³ (2) `bcast_rx`μμ λ©μμ§λ₯Ό μμ νμ¬
// ν΄λΌμ΄μΈνΈλ‘ μ μ‘ν©λλ€.
loop {
tokio::select! {
incoming = ws_stream.next() => {
match incoming {
Some(Ok(msg)) => {
if let Some(text) = msg.as_text() {
println!("ν΄λΌμ΄μΈνΈμμ: {addr:?} {text:?}");
bcast_tx.send(text.into())?;
}
}
Some(Err(err)) => return Err(err.into()),
None => return Ok(()),
}
}
msg = bcast_rx.recv() => {
ws_stream.send(Message::text(msg?)).await?;
}
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let (bcast_tx, _) = channel(16);
let listener = TcpListener::bind("127.0.0.1:2000").await?;
println!("ν¬νΈ 2000μμ μμ λκΈ°");
loop {
let (socket, addr) = listener.accept().await?;
println!("{addr:?}μ μ μ°κ²°");
let bcast_tx = bcast_tx.clone();
tokio::spawn(async move {
// μμ TCP μ€νΈλ¦Όμ websocketμ λνν©λλ€.
let ws_stream = ServerBuilder::new().accept(socket).await?;
handle_connection(addr, ws_stream, bcast_tx).await
});
}
}
src/bin/client.rs:
use futures_util::stream::StreamExt;
use futures_util::SinkExt;
use http::Uri;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_websockets::{ClientBuilder, Message};
#[tokio::main]
async fn main() -> Result<(), tokio_websockets::Error> {
let (mut ws_stream, _) =
ClientBuilder::from_uri(Uri::from_static("ws://127.0.0.1:2000"))
.connect()
.await?;
let stdin = tokio::io::stdin();
let mut stdin = BufReader::new(stdin).lines();
// λμμ λ©μμ§λ₯Ό 보λ΄κ³ λ°λ μ°μ 루ν
loop {
tokio::select! {
incoming = ws_stream.next() => {
match incoming {
Some(Ok(msg)) => {
if let Some(text) = msg.as_text() {
println!("μλ²μμ: {}", text);
}
},
Some(Err(err)) => return Err(err.into()),
None => return Ok(()),
}
}
res = stdin.next_line() => {
match res {
Ok(None) => return Ok(()),
Ok(Some(line)) => ws_stream.send(Message::text(line.to_string())).await?,
Err(err) => return Err(err.into()),
}
}
}
}
}