exex_grpc_loom/
main.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
use loom_node_grpc_exex_proto::proto::{remote_ex_ex_client::RemoteExExClient, SubscribeRequest};
use reth_exex::ExExNotification;
use reth_tracing::tracing::error;
use reth_tracing::{tracing::info, RethTracer, Tracer};
use tokio::select;

#[tokio::main]
async fn main() -> eyre::Result<()> {
    let _ = RethTracer::new().init()?;

    let mut client =
        RemoteExExClient::connect("http://[::1]:10000").await?.max_encoding_message_size(usize::MAX).max_decoding_message_size(usize::MAX);

    let mut stream_exex = client.subscribe_ex_ex(SubscribeRequest {}).await?.into_inner();
    let mut stream_tx = client.subscribe_mempool_tx(SubscribeRequest {}).await?.into_inner();

    loop {
        select! {
            notification = stream_exex.message() => {
                match notification {
                    Ok(notification) => {
                        if let Some(notification) = notification {
                            let notification = ExExNotification::try_from(&notification)?;

                            match notification {
                                ExExNotification::ChainCommitted { new } => {
                                    info!(committed_chain = ?new.range(), "Received commit");
                                }
                                ExExNotification::ChainReorged { old, new } => {
                                    info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
                                }
                                ExExNotification::ChainReverted { old } => {
                                    info!(reverted_chain = ?old.range(), "Received revert");
                                }
                            };
                        }

                    },
                    Err(e)=>{
                        error!(error=?e, "stream_exex.message");
                    }
                }
            },
            notification = stream_tx.message() =>{
                match notification {
                    Ok(notification) => {
                        if let Some(tx) = notification {
                            info!(hash=?tx.hash, "tx received")
                        }
                    }
                    Err(e)=>{
                        error!(error=?e, "stream_tx.message");
                    }
                }
            },
        }
    }
}