# Concurrency Afternoon Exercise

## Dining Philosophers --- Async

``````use std::sync::Arc;
use tokio::sync::mpsc::{self, Sender};
use tokio::sync::Mutex;
use tokio::time;

struct Fork;

struct Philosopher {
name: String,
left_fork: Arc<Mutex<Fork>>,
right_fork: Arc<Mutex<Fork>>,
thoughts: Sender<String>,
}

impl Philosopher {
async fn think(&self) {
self.thoughts
.send(format!("Eureka! {} has a new idea!", &self.name))
.await
.unwrap();
}

async fn eat(&self) {
// Pick up forks...
let _first_lock = self.left_fork.lock().await;
// Add a delay before picking the second fork to allow the execution
// to transfer to another task
time::sleep(time::Duration::from_millis(1)).await;
let _second_lock = self.right_fork.lock().await;

println!("{} is eating...", &self.name);
time::sleep(time::Duration::from_millis(5)).await;

// The locks are dropped here
}
}

static PHILOSOPHERS: &[&str] =
&["Socrates", "Hypatia", "Plato", "Aristotle", "Pythagoras"];

#[tokio::main]
async fn main() {
// Create forks
let mut forks = vec![];
(0..PHILOSOPHERS.len()).for_each(|_| forks.push(Arc::new(Mutex::new(Fork))));

// Create philosophers
let (philosophers, mut rx) = {
let mut philosophers = vec![];
let (tx, rx) = mpsc::channel(10);
for (i, name) in PHILOSOPHERS.iter().enumerate() {
let left_fork = Arc::clone(&forks[i]);
let right_fork = Arc::clone(&forks[(i + 1) % PHILOSOPHERS.len()]);
// To avoid a deadlock, we have to break the symmetry
// somewhere. This will swap the forks without deinitializing
// either of them.
if i == 0 {
std::mem::swap(&mut left_fork, &mut right_fork);
}
philosophers.push(Philosopher {
name: name.to_string(),
left_fork,
right_fork,
thoughts: tx.clone(),
});
}
(philosophers, rx)
// tx is dropped here, so we don't need to explicitly drop it later
};

// Make them think and eat
for phil in philosophers {
tokio::spawn(async move {
for _ in 0..100 {
phil.think().await;
phil.eat().await;
}
});
}

// Output their thoughts
while let Some(thought) = rx.recv().await {
println!("Here is a thought: {thought}");
}
}``````

src/bin/server.rs:

``````use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use std::error::Error;
use tokio::net::{TcpListener, TcpStream};
use tokio_websockets::{Message, ServerBuilder, WebSocketStream};

async fn handle_connection(
mut ws_stream: WebSocketStream<TcpStream>,
bcast_tx: Sender<String>,
) -> Result<(), Box<dyn Error + Send + Sync>> {

ws_stream
.send(Message::text("Welcome to chat! Type a message".to_string()))
.await?;
let mut bcast_rx = bcast_tx.subscribe();

// A continuous loop for concurrently performing two tasks: (1) receiving
// messages from `ws_stream` and broadcasting them, and (2) receiving
// messages on `bcast_rx` and sending them to the client.
loop {
tokio::select! {
incoming = ws_stream.next() => {
match incoming {
Some(Ok(msg)) => {
if let Some(text) = msg.as_text() {
bcast_tx.send(text.into())?;
}
}
Some(Err(err)) => return Err(err.into()),
None => return Ok(()),
}
}
msg = bcast_rx.recv() => {
ws_stream.send(Message::text(msg?)).await?;
}
}
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let (bcast_tx, _) = channel(16);

let listener = TcpListener::bind("127.0.0.1:2000").await?;
println!("listening on port 2000");

loop {
let bcast_tx = bcast_tx.clone();
tokio::spawn(async move {
// Wrap the raw TCP stream into a websocket.
let ws_stream = ServerBuilder::new().accept(socket).await?;

});
}
}``````

src/bin/client.rs:

``````use futures_util::stream::StreamExt;
use futures_util::SinkExt;
use http::Uri;
use tokio_websockets::{ClientBuilder, Message};

#[tokio::main]
async fn main() -> Result<(), tokio_websockets::Error> {
let (mut ws_stream, _) =
ClientBuilder::from_uri(Uri::from_static("ws://127.0.0.1:2000"))
.connect()
.await?;

let stdin = tokio::io::stdin();

// Continuous loop for concurrently sending and receiving messages.
loop {
tokio::select! {
incoming = ws_stream.next() => {
match incoming {
Some(Ok(msg)) => {
if let Some(text) = msg.as_text() {
println!("From server: {}", text);
}
},
Some(Err(err)) => return Err(err.into()),
None => return Ok(()),
}
}
res = stdin.next_line() => {
match res {
Ok(None) => return Ok(()),
Ok(Some(line)) => ws_stream.send(Message::text(line.to_string())).await?,
Err(err) => return Err(err.into()),
}
}

}
}
}``````