loom_metrics/
block_latency_actor.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use eyre::eyre;
use influxdb::{Timestamp, WriteQuery};
use loom_core_actors::Consumer;
use loom_core_actors::Producer;
use loom_core_actors::{subscribe, Actor, ActorResult, Broadcaster, WorkerResult};
use loom_core_actors_macros::{Accessor, Consumer, Producer};
use loom_core_blockchain::Blockchain;
use loom_types_events::MessageBlockHeader;
use revm::DatabaseRef;
use tokio::sync::broadcast::error::RecvError;
use tracing::{error, info};

async fn block_latency_worker(
    block_header_update_rx: Broadcaster<MessageBlockHeader>,
    influx_channel_tx: Broadcaster<WriteQuery>,
) -> WorkerResult {
    let mut last_block_number = 0;
    let mut last_block_hash = Default::default();

    subscribe!(block_header_update_rx);
    loop {
        let block_header = match block_header_update_rx.recv().await {
            Ok(block) => block,
            Err(e) => match e {
                RecvError::Closed => {
                    error!("Block header channel closed");
                    return Err(eyre!("Block header channel closed".to_string()));
                }
                RecvError::Lagged(lag) => {
                    info!("Block header channel lagged: {}", lag);
                    continue;
                }
            },
        };

        let current_timestamp = chrono::Utc::now();
        let block_latency = current_timestamp.timestamp() as f64 - block_header.inner.header.timestamp as f64;
        let write_query = WriteQuery::new(Timestamp::from(current_timestamp), "block_latency")
            .add_field("value", block_latency)
            .add_field("block_number", block_header.inner.header.number);
        if let Err(e) = influx_channel_tx.send(write_query).await {
            error!("Failed to send block latency to influxdb: {:?}", e);
        }

        // check if we received twice the same block number
        if last_block_number == block_header.inner.header.number {
            // check that we have not received the same block hash
            if last_block_hash != block_header.inner.header.hash_slow() {
                let write_query = WriteQuery::new(Timestamp::from(current_timestamp), "reorg_detected")
                    .add_field("block_number", block_header.inner.header.number);
                if let Err(e) = influx_channel_tx.send(write_query).await {
                    error!("Failed to send block reorg to influxdb: {:?}", e);
                }
            }
        }

        last_block_number = block_header.inner.header.number;
        last_block_hash = block_header.header.hash_slow();
    }
}

#[derive(Accessor, Consumer, Producer, Default)]
pub struct BlockLatencyRecorderActor {
    #[consumer]
    block_header_rx: Option<Broadcaster<MessageBlockHeader>>,
    #[producer]
    influxdb_write_channel_tx: Option<Broadcaster<WriteQuery>>,
}

impl BlockLatencyRecorderActor {
    pub fn new() -> Self {
        Self { block_header_rx: None, influxdb_write_channel_tx: None }
    }

    pub fn on_bc<DB: DatabaseRef + Send + Sync + Clone + 'static>(self, bc: &Blockchain<DB>) -> Self {
        Self { block_header_rx: Some(bc.new_block_headers_channel()), influxdb_write_channel_tx: Some(bc.influxdb_write_channel()) }
    }
}

impl Actor for BlockLatencyRecorderActor {
    fn start(&self) -> ActorResult {
        let task = tokio::task::spawn(block_latency_worker(
            self.block_header_rx.clone().unwrap(),
            self.influxdb_write_channel_tx.clone().unwrap(),
        ));
        Ok(vec![task])
    }

    fn name(&self) -> &'static str {
        "BlockLatencyRecorderActor"
    }
}