廣播聊天應用程式
在本練習中,我們要運用所學的新知識,實作廣播即時通訊應用程式。我們有一個即時通訊伺服器,供用戶端連線和發布訊息。用戶端會從標準輸入內容讀取使用者訊息,然後將訊息傳送至伺服器。即時通訊伺服器會將收到的每則訊息播送至所有用戶端。
為此,我們會使用伺服器上的廣播通道 和 tokio_websockets
,在用戶端和伺服器之間通訊。
請建立新的 Cargo 專案,並新增下列依附元件:
Cargo.toml:
[package]
name = "chat-async"
version = "0.1.0"
edition = "2021"
[dependencies]
futures-util = { version = "0.3.30", features = ["sink"] }
http = "1.0.0"
tokio = { version = "1.36.0", features = ["full"] }
tokio-websockets = { version = "0.7.0", features = ["client", "fastrand", "server", "sha1_smol"] }
所需 API
您會需要 tokio
和 tokio_websockets
中的以下函式。請花幾分鐘熟悉此 API。
- 由
WebSocketStream
實作的 StreamExt::next():用於非同步讀取 WebSocket 串流中的訊息。 - 由
WebSocketStream
實作的 SinkExt::send():用於在 WebSocket 串流中非同步傳送訊息。 - Lines::next_line():用於非同步讀取標準輸入內容中的使用者訊息。
- Sender::subscribe():用於訂閱廣播管道。
兩個二進位檔
在 Cargo 專案中,通常只能有一個二進位檔,以及一個 src/main.rs
檔案。這項專案需要兩個二進位檔,一個用於用戶端,另一個用於伺服器。您可以設為兩個不同的 Cargo 專案,但我們會放入包含兩個二進位檔的單一 Cargo 專案。為順利運作,用戶端和伺服器程式碼應位於 src/bin
下方 (請參閱說明文件)。
請將以下伺服器和用戶端程式碼分別複製到 src/bin/server.rs
和 src/bin/client.rs
中。請按照下方說明完成這些檔案。
src/bin/server.rs:
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use std::error::Error;
use std::net::SocketAddr;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::broadcast::{channel, Sender};
use tokio_websockets::{Message, ServerBuilder, WebSocketStream};
async fn handle_connection(
addr: SocketAddr,
mut ws_stream: WebSocketStream<TcpStream>,
bcast_tx: Sender<String>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
// TODO: For a hint, see the description of the task below.
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let (bcast_tx, _) = channel(16);
let listener = TcpListener::bind("127.0.0.1:2000").await?;
println!("listening on port 2000");
loop {
let (socket, addr) = listener.accept().await?;
println!("New connection from {addr:?}");
let bcast_tx = bcast_tx.clone();
tokio::spawn(async move {
// Wrap the raw TCP stream into a websocket.
let ws_stream = ServerBuilder::new().accept(socket).await?;
handle_connection(addr, ws_stream, bcast_tx).await
});
}
}
src/bin/client.rs:
use futures_util::stream::StreamExt;
use futures_util::SinkExt;
use http::Uri;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_websockets::{ClientBuilder, Message};
#[tokio::main]
async fn main() -> Result<(), tokio_websockets::Error> {
let (mut ws_stream, _) =
ClientBuilder::from_uri(Uri::from_static("ws://127.0.0.1:2000"))
.connect()
.await?;
let stdin = tokio::io::stdin();
let mut stdin = BufReader::new(stdin).lines();
// TODO: For a hint, see the description of the task below.
}
執行二進位檔
使用以下指令執行伺服器:
cargo run --bin server
並使用以下指令執行用戶端:
cargo run --bin client
工作
- 在
src/bin/server.rs
中實作handle_connection
函式。- 提示:使用
tokio::select!
即可在連續迴圈中並行執行兩項工作。一項工作會收到來自用戶端的訊息,然後播送訊息。另一項工作則是將伺服器收到的訊息傳送至用戶端。
- 提示:使用
- 完成
src/bin/client.rs
中的主函式。- 提示:和先前一樣,在連續迴圈中使用
tokio::select!
並行執行兩項工作:(1) 從標準輸入內容讀取使用者訊息,然後將訊息傳送至伺服器;(2) 接收來自伺服器的訊息,並向使用者顯示訊息。
- 提示:和先前一樣,在連續迴圈中使用
- 選用步驟:完成後,將程式碼變更為播送訊息給所有用戶端,但不包括訊息發送端。