loom_rpc_handler/
web_actor.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
use crate::router::router;
use axum::Router;
use eyre::ErrReport;
use loom_core_actors::{Actor, ActorResult, WorkerResult};
use loom_core_actors_macros::Consumer;
use loom_core_blockchain::Blockchain;
use loom_rpc_state::AppState;
use loom_storage_db::DbPool;
use revm::{DatabaseCommit, DatabaseRef};
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tokio_util::sync::CancellationToken;
use tower_http::trace::{DefaultMakeSpan, TraceLayer};
use tracing::info;

pub async fn start_web_server_worker<S, DB>(
    host: String,
    extra_router: Router<S>,
    bc: Blockchain<DB>,
    db_pool: DbPool,
    shutdown_token: CancellationToken,
) -> WorkerResult
where
    DB: DatabaseRef<Error = ErrReport> + DatabaseCommit + Send + Sync + Clone + Default + 'static,
    S: Clone + Send + Sync + 'static,
    Router: From<Router<S>>,
{
    let app_state = AppState { db: db_pool, bc };
    let router = router(app_state);
    let router = router.merge(extra_router);

    // logging
    let router = router.layer(TraceLayer::new_for_http().make_span_with(DefaultMakeSpan::default().include_headers(true)));

    info!("Webserver listening on {}", &host);
    let listener = TcpListener::bind(host).await?;
    axum::serve(listener, router.into_make_service_with_connect_info::<SocketAddr>())
        .with_graceful_shutdown(async move {
            shutdown_token.cancelled().await;
            info!("Shutting down webserver...");
        })
        .await?;

    Ok("Webserver shutdown".to_string())
}

#[derive(Consumer)]
pub struct WebServerActor<S, DB: Clone + Send + Sync + 'static> {
    host: String,
    extra_router: Router<S>,
    shutdown_token: CancellationToken,
    db_pool: DbPool,
    bc: Option<Blockchain<DB>>,
}

impl<S, DB> WebServerActor<S, DB>
where
    DB: DatabaseRef<Error = ErrReport> + Send + Sync + Clone + Default + 'static,
    S: Clone + Send + Sync + 'static,
    Router: From<Router<S>>,
{
    pub fn new(host: String, extra_router: Router<S>, db_pool: DbPool, shutdown_token: CancellationToken) -> Self {
        Self { host, extra_router, shutdown_token, db_pool, bc: None }
    }

    pub fn on_bc(self, bc: &Blockchain<DB>) -> Self {
        Self { bc: Some(bc.clone()), ..self }
    }
}

impl<S, DB> Actor for WebServerActor<S, DB>
where
    S: Clone + Send + Sync + 'static,
    Router: From<Router<S>>,
    DB: DatabaseRef<Error = ErrReport> + DatabaseCommit + Send + Sync + Clone + Default + 'static,
{
    fn start(&self) -> ActorResult {
        let task = tokio::spawn(start_web_server_worker(
            self.host.clone(),
            self.extra_router.clone(),
            self.bc.clone().unwrap(),
            self.db_pool.clone(),
            self.shutdown_token.clone(),
        ));
        Ok(vec![task])
    }

    fn name(&self) -> &'static str {
        "WebServerActor"
    }
}