loom_metrics/
influxdb_actor.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use async_trait::async_trait;
use eyre::eyre;
use influxdb::{Client, ReadQuery, WriteQuery};
use loom_core_actors::{Actor, ActorResult, Broadcaster, Consumer, WorkerResult};
use loom_core_actors_macros::Consumer;
use loom_core_blockchain::Blockchain;
use revm::DatabaseRef;
use std::collections::HashMap;
use tracing::{error, info, warn};

pub async fn start_influxdb_worker(
    url: String,
    database: String,
    tags: HashMap<String, String>,
    event_receiver: Broadcaster<WriteQuery>,
) -> WorkerResult {
    let client = Client::new(url, database.clone());
    let create_db_stmt = format!("CREATE DATABASE {}", database);
    let result = client.query(ReadQuery::new(create_db_stmt)).await;
    match result {
        Ok(_) => info!("Database created with name: {}", database),
        Err(e) => info!("Database creation failed or already exists: {:?}", e),
    }
    let mut event_receiver = event_receiver.subscribe().await;
    loop {
        let event_result = event_receiver.recv().await;
        match event_result {
            Ok(mut event) => {
                for (key, value) in tags.iter() {
                    event = event.add_tag(key, value.clone());
                }
                let write_result = client.query(event).await;
                if write_result.is_err() {
                    error!("Write failed: {:?}", write_result.err().unwrap());
                }
            }
            Err(e) => match e {
                tokio::sync::broadcast::error::RecvError::Closed => {
                    error!("InfluxDB channel closed");
                    return Err(eyre!("INFLUXDB_CHANNEL_CLOSED"));
                }
                tokio::sync::broadcast::error::RecvError::Lagged(lagged) => {
                    warn!("InfluxDB lagged: {:?}", lagged);
                    continue;
                }
            },
        }
    }
}

#[derive(Consumer)]
pub struct InfluxDbWriterActor {
    url: String,
    database: String,
    tags: HashMap<String, String>,
    #[consumer]
    influxdb_write_channel_rx: Option<Broadcaster<WriteQuery>>,
}

impl InfluxDbWriterActor {
    pub fn new(url: String, database: String, tags: HashMap<String, String>) -> Self {
        Self { url, database, tags, influxdb_write_channel_rx: None }
    }

    pub fn on_bc<DB: DatabaseRef + Send + Sync + Clone + 'static>(self, bc: &Blockchain<DB>) -> Self {
        Self { influxdb_write_channel_rx: Some(bc.influxdb_write_channel()), ..self }
    }
}

#[async_trait]
impl Actor for InfluxDbWriterActor {
    fn start(&self) -> ActorResult {
        let influxdb_write_channel_rx = match &self.influxdb_write_channel_rx {
            Some(rx) => rx.clone(),
            None => {
                error!("InfluxDB write channel is not set.");
                return Err(eyre!("INFLUXDB_WRITE_CHANNEL_NOT_SET"));
            }
        };
        let task = tokio::task::spawn(start_influxdb_worker(
            self.url.clone(),
            self.database.clone(),
            self.tags.clone(),
            influxdb_write_channel_rx.clone(),
        ));
        Ok(vec![task])
    }

    fn name(&self) -> &'static str {
        "InfluxDbWriterActor"
    }
}