loom_exex/
main.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
use crate::arguments::{AppArgs, Command, LoomArgs};
use alloy::eips::BlockId;
use alloy::providers::{ProviderBuilder, WsConnect};
use alloy::rpc::client::ClientBuilder;
use clap::{CommandFactory, FromArgMatches, Parser};
use loom::core::blockchain::Blockchain;
use loom::core::topology::TopologyConfig;
use loom::evm::db::{AlloyDB, LoomDB};
use loom::node::actor_config::NodeBlockActorConfig;
use loom::node::exex::mempool_worker;
use loom::types::entities::MarketState;
use reth::builder::engine_tree_config::TreeConfig;
use reth::builder::EngineNodeLauncher;
use reth::chainspec::{Chain, EthereumChainSpecParser};
use reth::cli::Cli;
use reth_node_ethereum::node::EthereumAddOns;
use reth_node_ethereum::EthereumNode;
use reth_provider::providers::BlockchainProvider2;
use std::time::Duration;
use tokio::{signal, task};
use tracing::{error, info};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{fmt, EnvFilter, Layer};

mod arguments;
mod loom_runtime;

fn main() -> eyre::Result<()> {
    let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into());
    let fmt_layer = fmt::Layer::default().with_thread_ids(true).with_file(false).with_line_number(true).with_filter(env_filter);
    tracing_subscriber::registry().with(fmt_layer).init();

    // ignore arguments used by reth
    let app_args = AppArgs::from_arg_matches_mut(&mut AppArgs::command().ignore_errors(true).get_matches())?;
    match app_args.command {
        Command::Node(_) => Cli::<EthereumChainSpecParser, LoomArgs>::parse().run(|builder, loom_args: LoomArgs| async move {
            let topology_config = TopologyConfig::load_from_file(loom_args.loom_config.clone())?;

            let bc = Blockchain::new(builder.config().chain.chain.id());
            let bc_clone = bc.clone();

            let engine_tree_config = TreeConfig::default()
                .with_persistence_threshold(loom_args.persistence_threshold)
                .with_memory_block_buffer_target(loom_args.memory_block_buffer_target);
            let handle = builder
                .with_types_and_provider::<EthereumNode, BlockchainProvider2<_>>()
                .with_components(EthereumNode::components())
                .with_add_ons(EthereumAddOns::default())
                .install_exex("loom-exex", |node_ctx| loom_runtime::init(node_ctx, bc_clone, NodeBlockActorConfig::all_enabled()))
                .launch_with_fn(|builder| {
                    let launcher = EngineNodeLauncher::new(builder.task_executor().clone(), builder.config().datadir(), engine_tree_config);
                    builder.launch_with(launcher)
                })
                .await?;

            let mempool = handle.node.pool.clone();
            let ipc_provider = ProviderBuilder::new().on_builtin(handle.node.config.rpc.ipcpath.as_str()).await?;
            let alloy_db = AlloyDB::new(ipc_provider.clone(), BlockId::latest()).unwrap();

            let state_db = LoomDB::new().with_ext_db(alloy_db);
            let bc = bc.with_market_state(MarketState::new(state_db));
            let bc_clone = bc.clone();
            tokio::task::spawn(async move {
                if let Err(e) = loom_runtime::start_loom(ipc_provider, bc_clone, topology_config, loom_args.loom_config.clone(), true).await
                {
                    error!("Error starting loom: {:?}", e);
                }
            });
            tokio::task::spawn(mempool_worker(mempool, bc));

            handle.node_exit_future.await
        }),
        Command::Remote(loom_args) => {
            let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build()?;

            rt.block_on(async {
                info!("Loading config from {}", loom_args.loom_config);
                let topology_config = TopologyConfig::load_from_file(loom_args.loom_config.clone())?;

                let client_config = topology_config.clients.get("remote").unwrap();
                let transport = WsConnect { url: client_config.url(), auth: None, config: None };
                let client = ClientBuilder::default().ws(transport).await?;
                let provider = ProviderBuilder::new().on_client(client).boxed();
                let bc = Blockchain::<LoomDB>::new(Chain::mainnet().id());
                let bc_clone = bc.clone();

                if let Err(e) = loom_runtime::start_loom(provider, bc_clone, topology_config, loom_args.loom_config.clone(), false).await {
                    error!("Error starting loom: {:#?}", e);
                    panic!("{}", e)
                }

                // keep loom running
                tokio::select! {
                    _ = signal::ctrl_c() => {
                    info!("CTRL+C received... exiting");
                }
                _ = async {
                        loop {
                        tokio::time::sleep(Duration::from_secs(60)).await;
                        task::yield_now().await;
                        }
                    } => {}
                }
                Ok::<(), eyre::Error>(())
            })?;
            Ok(())
        }
    }
}