loom_node_json_rpc/
wait_for_node_sync_actor.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
use alloy_network::Ethereum;
use alloy_provider::Provider;
use alloy_rpc_types::SyncStatus;
use alloy_transport::Transport;
use eyre::eyre;
use loom_core_actors::{Actor, ActorResult, WorkerResult};
use loom_node_debug_provider::DebugProviderExt;
use std::marker::PhantomData;
use std::time::Duration;
use tokio::time::timeout;
use tracing::{error, info};

const SYNC_CHECK_INTERVAL: Duration = Duration::from_secs(1);
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);

/// Wait for the node to sync. This works only for http/ipc/ws providers.
async fn wait_for_node_sync_one_shot_worker<P, T>(client: P) -> WorkerResult
where
    T: Transport + Clone,
    P: Provider<T, Ethereum> + DebugProviderExt<T, Ethereum> + Send + Sync + Clone + 'static,
{
    info!("Waiting for node to sync...");
    let mut print_count = 0;
    loop {
        match timeout(CLIENT_TIMEOUT, client.syncing()).await {
            Ok(result) => match result {
                Ok(syncing_status) => match syncing_status {
                    SyncStatus::None => {
                        break;
                    }
                    SyncStatus::Info(sync_progress) => {
                        if print_count == 0 {
                            info!("Sync progress: {:?}", sync_progress);
                        }
                    }
                },
                Err(e) => {
                    error!("Error retrieving syncing status: {:?}", e);
                    break;
                }
            },
            Err(elapsed) => {
                error!("Timeout during get syncing status. Elapsed time: {:?}", elapsed);
                break;
            }
        }
        tokio::time::sleep(SYNC_CHECK_INTERVAL).await;
        print_count = if print_count > 4 { 0 } else { print_count + 1 };
    }
    Ok("Node is sync".to_string())
}

pub struct WaitForNodeSyncOneShotBlockingActor<P, T> {
    client: P,
    _t: PhantomData<T>,
}

impl<P, T> WaitForNodeSyncOneShotBlockingActor<P, T>
where
    T: Transport + Clone,
    P: Provider<T, Ethereum> + DebugProviderExt<T, Ethereum> + Send + Sync + Clone + 'static,
{
    pub fn new(client: P) -> WaitForNodeSyncOneShotBlockingActor<P, T> {
        WaitForNodeSyncOneShotBlockingActor { client, _t: PhantomData }
    }
}

impl<P, T> Actor for WaitForNodeSyncOneShotBlockingActor<P, T>
where
    T: Transport + Clone,
    P: Provider<T, Ethereum> + DebugProviderExt<T, Ethereum> + Send + Sync + Clone + 'static,
{
    fn start_and_wait(&self) -> eyre::Result<()> {
        let rt = tokio::runtime::Runtime::new()?; // we need a different runtime to wait for the result
        let client_cloned = self.client.clone();
        let handle = rt.spawn(async { wait_for_node_sync_one_shot_worker(client_cloned).await });

        self.wait(Ok(vec![handle]))?;
        rt.shutdown_background();

        Ok(())
    }

    fn start(&self) -> ActorResult {
        Err(eyre!("NEED_TO_BE_WAITED"))
    }

    fn name(&self) -> &'static str {
        "WaitForNodeSyncOneShotBlockingActor"
    }
}