From 7c8bfb5c9eca88f088d43115a55aa0e4782e3384 Mon Sep 17 00:00:00 2001 From: Kai Vogelgesang Date: Sat, 29 Aug 2020 17:45:36 +0200 Subject: [PATCH] Add example code for json and SSE --- Cargo.toml | 10 +++++-- src/main.rs | 35 ++++++++++++++++++++++-- src/sse.rs | 76 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 117 insertions(+), 4 deletions(-) create mode 100644 src/sse.rs diff --git a/Cargo.toml b/Cargo.toml index 20baa29..1d4c6a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,12 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -actix-web = "2" +actix = "0.9.0" +actix-web = "2.0.0" actix-files = "0.2.1" -actix-rt = "1" \ No newline at end of file +actix-rt = "1" +tokio = "0.2.22" +futures = "0.3.1" + +serde = { version = "1.0.43", features = ["derive"] } +serde_json = "1.0" \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 0760edb..a17b1cb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,10 @@ use actix_files as fs; -use actix_web::{get, web, App, HttpServer, Responder}; +use actix_web::{get, post, web, App, HttpServer, HttpResponse, Responder}; +use actix_web::web::Data; +use serde::{Deserialize, Serialize}; +use std::sync::Mutex; + +mod sse; #[get("/yeet")] async fn yeet() -> impl Responder { @@ -7,11 +12,37 @@ async fn yeet() -> impl Responder { format!("yeet!") } +#[get("/events")] +async fn events(broadcaster: Data>) -> impl Responder { + let rx = { broadcaster.lock().unwrap().new_client() }; + + HttpResponse::Ok() + .header("content-type", "text/event-stream") + .no_chunking() + .streaming(rx) +} + +#[derive(Debug, Deserialize, Serialize)] +struct MyObj { + id: u64, +} + +#[post("/json_endpoint")] +async fn json_endpoint(item: web::Json) -> impl Responder { + format!("id = {}", item.id) +} + #[actix_rt::main] async fn main() -> std::io::Result<()> { - HttpServer::new(|| { + + let data = sse::Broadcaster::create(); + + HttpServer::new(move || { App::new() + .app_data(data.clone()) .service(yeet) + .service(json_endpoint) + .service(events) .service(fs::Files::new("/", "static/").index_file("index.html")) }) .bind("127.0.0.1:8080")? diff --git a/src/sse.rs b/src/sse.rs new file mode 100644 index 0000000..7fb5b89 --- /dev/null +++ b/src/sse.rs @@ -0,0 +1,76 @@ +use actix_web::Error; +use actix_web::web::{Bytes, Data}; +use tokio::time::{interval_at, Instant}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use futures::Stream; + +use std::pin::Pin; +use std::sync::Mutex; +use std::task::{Context, Poll}; +use std::time::Duration; + +// see https://github.com/actix/examples/blob/master/server-sent-events/src/main.rs + +pub struct Broadcaster { + clients: Vec>, +} + +pub struct Client(Receiver); + +impl Broadcaster { + fn new() -> Self { + Self { clients: Vec::new() } + } + + pub fn create() -> Data> { + let this = Data::new(Mutex::new(Self::new())); + + let clone = this.clone(); + actix_rt::spawn(async move { + let mut task = interval_at(Instant::now(), Duration::from_secs(1)); + loop { + task.tick().await; + println!("Task"); + clone.lock().unwrap().remove_stale_clients(); + } + }); + + this + } + + fn remove_stale_clients(&mut self) { + let mut ok_clients = Vec::new(); + for client in self.clients.iter() { + let result = client.clone().try_send(Bytes::from("data: ping\n\n")); + + if let Ok(()) = result { + ok_clients.push(client.clone()); + } + } + self.clients = ok_clients; + } + + pub fn new_client(&mut self) -> Client { + let (tx, rx) = channel(100); + + tx.clone().try_send(Bytes::from("data: connected\n\n")).unwrap(); + self.clients.push(tx); + + Client(rx) + } +} + +impl Stream for Client { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match Pin::new(&mut self.0).poll_next(cx) { + Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} \ No newline at end of file