ブロードキャスト・チャットアプリ
この演習では、新たに身に付けた知識を活かしてブロードキャスト チャット アプリを実装します。クライアントが接続してメッセージを公開するチャット サーバーがあります。クライアントは標準入力からユーザー メッセージを読み取り、サーバーに送信します。チャット サーバーは受信した各メッセージをすべてのクライアントにブロードキャストします。
このために、サーバー上の ブロードキャスト チャンネル を使用し、クライアントとサーバー間の通信には tokio_websockets
を使用します。
新しい Cargo プロジェクトを作成し、次の依存関係を追加します。
Cargo.toml:
[package]
name = "chat-async"
version = "0.1.0"
edition = "2021"
[dependencies]
futures-util = { version = "0.3.31", features = ["sink"] }
http = "1.1.0"
tokio = { version = "1.41.0", features = ["full"] }
tokio-websockets = { version = "0.10.1", features = ["client", "fastrand", "server", "sha1_smol"] }
必要な API
tokio
と tokio_websockets
の以下の関数が必要になります。少し時間をかけて API に対する理解を深めてください。
WebSocketStream
によって実装された StreamExt::next(): Websocket Stream からのメッセージを非同期で読み取ります。WebSocketStream
によって実装された SinkExt::send(): Websocket Stream 上でメッセージを非同期で送信します。- Lines::next_line(): 標準入力からのユーザー メッセージを非同期で読み取ります。
- Sender::subscribe(): ブロードキャスト チャンネルをサブスクライブします。
2 つのバイナリ
通常、Cargo プロジェクトに含めることができるのは 1 つのバイナリと 1 つの src/main.rs
ファイルのみです。このプロジェクトには 2 つのバイナリが必要です。1 つはクライアント用、もう 1 つはサーバー用です。2 つの独立した Cargo プロジェクトを作成することもできますが、ここでは 1 つの Cargo プロジェクトに 2 つのバイナリを入れます。そのためには、クライアントとサーバーのコードを src/bin
に配置する必要があります(ドキュメント をご覧ください)。
次のサーバーとクライアントのコードを、それぞれsrc/bin/server.rs
と src/bin/client.rs
にコピーします。ここでのタスクは、以下で説明するように、これらのファイルを完成させることです。
src/bin/server.rs:
use futures_util::sink::SinkExt; use futures_util::stream::StreamExt; use std::error::Error; use std::net::SocketAddr; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::broadcast::{channel, Sender}; use tokio_websockets::{Message, ServerBuilder, WebSocketStream}; async fn handle_connection( addr: SocketAddr, mut ws_stream: WebSocketStream<TcpStream>, bcast_tx: Sender<String>, ) -> Result<(), Box<dyn Error + Send + Sync>> { // TODO: ヒントについては、以下のタスクの説明をご覧ください。 } #[tokio::main] async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { let (bcast_tx, _) = channel(16); let listener = TcpListener::bind("127.0.0.1:2000").await?; println!("listening on port 2000"); loop { let (socket, addr) = listener.accept().await?; println!("New connection from {addr:?}"); let bcast_tx = bcast_tx.clone(); tokio::spawn(async move { // 未加工の TCP ストリームを WebSocket にラップします。 let ws_stream = ServerBuilder::new().accept(socket).await?; handle_connection(addr, ws_stream, bcast_tx).await }); } }
src/bin/client.rs:
use futures_util::stream::StreamExt; use futures_util::SinkExt; use http::Uri; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio_websockets::{ClientBuilder, Message}; #[tokio::main] async fn main() -> Result<(), tokio_websockets::Error> { let (mut ws_stream, _) = ClientBuilder::from_uri(Uri::from_static("ws://127.0.0.1:2000")) .connect() .await?; let stdin = tokio::io::stdin(); let mut stdin = BufReader::new(stdin).lines(); // TODO: ヒントについては、以下のタスクの説明をご覧ください。 }
バイナリの実行
次のコマンドでサーバーを実行します。
cargo run --bin server
次のコマンドでクライアントを実行します。
cargo run --bin client
タスク
src/bin/server.rs
にhandle_connection
関数を実装します。- ヒント: 2 つのタスクを連続ループで同時に実行するには、
tokio::select!
を使用します。1 つのタスクは、クライアントからメッセージを受信してブロードキャストします。もう 1 つのタスクは、サーバーで受信したメッセージをクライアントに送信します。
- ヒント: 2 つのタスクを連続ループで同時に実行するには、
src/bin/client.rs
のメイン関数を完成させます。- ヒント: 前の例と同様に、
tokio::select!
を連続ループで使用し、(1)標準入力からユーザー メッセージを読み取ってサーバーに送信するタスクと、(2)サーバーからメッセージを受信してユーザーに表示するタスクを同時に実行します。
- ヒント: 前の例と同様に、
- 省略可: 完了したら、メッセージの送信者以外のすべてのクライアントにメッセージをブロードキャストするようにコードを変更します。