Add example code for json and SSE

This commit is contained in:
Kai Vogelgesang 2020-08-29 17:45:36 +02:00
parent 3a581b17d4
commit 7c8bfb5c9e
3 changed files with 117 additions and 4 deletions

View File

@ -7,6 +7,12 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
actix-web = "2"
actix = "0.9.0"
actix-web = "2.0.0"
actix-files = "0.2.1"
actix-rt = "1"
actix-rt = "1"
tokio = "0.2.22"
futures = "0.3.1"
serde = { version = "1.0.43", features = ["derive"] }
serde_json = "1.0"

View File

@ -1,5 +1,10 @@
use actix_files as fs;
use actix_web::{get, web, App, HttpServer, Responder};
use actix_web::{get, post, web, App, HttpServer, HttpResponse, Responder};
use actix_web::web::Data;
use serde::{Deserialize, Serialize};
use std::sync::Mutex;
mod sse;
#[get("/yeet")]
async fn yeet() -> impl Responder {
@ -7,11 +12,37 @@ async fn yeet() -> impl Responder {
format!("yeet!")
}
#[get("/events")]
async fn events(broadcaster: Data<Mutex<sse::Broadcaster>>) -> impl Responder {
let rx = { broadcaster.lock().unwrap().new_client() };
HttpResponse::Ok()
.header("content-type", "text/event-stream")
.no_chunking()
.streaming(rx)
}
#[derive(Debug, Deserialize, Serialize)]
struct MyObj {
id: u64,
}
#[post("/json_endpoint")]
async fn json_endpoint(item: web::Json<MyObj>) -> impl Responder {
format!("id = {}", item.id)
}
#[actix_rt::main]
async fn main() -> std::io::Result<()> {
HttpServer::new(|| {
let data = sse::Broadcaster::create();
HttpServer::new(move || {
App::new()
.app_data(data.clone())
.service(yeet)
.service(json_endpoint)
.service(events)
.service(fs::Files::new("/", "static/").index_file("index.html"))
})
.bind("127.0.0.1:8080")?

76
src/sse.rs Normal file
View File

@ -0,0 +1,76 @@
use actix_web::Error;
use actix_web::web::{Bytes, Data};
use tokio::time::{interval_at, Instant};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use futures::Stream;
use std::pin::Pin;
use std::sync::Mutex;
use std::task::{Context, Poll};
use std::time::Duration;
// see https://github.com/actix/examples/blob/master/server-sent-events/src/main.rs
pub struct Broadcaster {
clients: Vec<Sender<Bytes>>,
}
pub struct Client(Receiver<Bytes>);
impl Broadcaster {
fn new() -> Self {
Self { clients: Vec::new() }
}
pub fn create() -> Data<Mutex<Self>> {
let this = Data::new(Mutex::new(Self::new()));
let clone = this.clone();
actix_rt::spawn(async move {
let mut task = interval_at(Instant::now(), Duration::from_secs(1));
loop {
task.tick().await;
println!("Task");
clone.lock().unwrap().remove_stale_clients();
}
});
this
}
fn remove_stale_clients(&mut self) {
let mut ok_clients = Vec::new();
for client in self.clients.iter() {
let result = client.clone().try_send(Bytes::from("data: ping\n\n"));
if let Ok(()) = result {
ok_clients.push(client.clone());
}
}
self.clients = ok_clients;
}
pub fn new_client(&mut self) -> Client {
let (tx, rx) = channel(100);
tx.clone().try_send(Bytes::from("data: connected\n\n")).unwrap();
self.clients.push(tx);
Client(rx)
}
}
impl Stream for Client {
type Item = Result<Bytes, Error>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.0).poll_next(cx) {
Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}