loom_exex/
loom_runtime.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
use alloy::network::Ethereum;
use alloy::primitives::Address;
use alloy::providers::Provider;
use alloy::transports::Transport;
use axum::Router;
use eyre::{ErrReport, OptionExt};
use loom::core::blockchain::Blockchain;
use loom::core::blockchain_actors::BlockchainActors;
use loom::core::topology::{BroadcasterConfig, EncoderConfig, TopologyConfig};
use loom::defi::pools::PoolsConfig;
use loom::evm::db::DatabaseLoomExt;
use loom::node::actor_config::NodeBlockActorConfig;
use loom::node::debug_provider::DebugProviderExt;
use loom::node::exex::loom_exex;
use loom::storage::db::init_db_pool;
use loom::strategy::backrun::{BackrunConfig, BackrunConfigSection};
use loom::types::entities::config::load_from_file;
use loom::types::entities::{BlockHistoryState, PoolClass};
use reth::revm::{Database, DatabaseCommit, DatabaseRef};
use reth_exex::ExExContext;
use reth_node_api::FullNodeComponents;
use std::env;
use std::future::Future;
use tracing::info;

pub async fn init<
    Node: FullNodeComponents,
    DB: Database<Error = ErrReport>
        + DatabaseRef<Error = ErrReport>
        + DatabaseCommit
        + DatabaseLoomExt
        + BlockHistoryState
        + Send
        + Sync
        + Clone
        + Default
        + 'static,
>(
    ctx: ExExContext<Node>,
    bc: Blockchain<DB>,
    config: NodeBlockActorConfig,
) -> eyre::Result<impl Future<Output = eyre::Result<()>>> {
    Ok(loom_exex(ctx, bc, config.clone()))
}

pub async fn start_loom<P, T, DB>(
    provider: P,
    bc: Blockchain<DB>,
    topology_config: TopologyConfig,
    loom_config_filepath: String,
    is_exex: bool,
) -> eyre::Result<()>
where
    T: Transport + Clone,
    P: Provider<T, Ethereum> + DebugProviderExt<T, Ethereum> + Send + Sync + Clone + 'static,
    DB: Database<Error = ErrReport>
        + DatabaseRef<Error = ErrReport>
        + DatabaseCommit
        + DatabaseLoomExt
        + BlockHistoryState
        + Send
        + Sync
        + Clone
        + Default
        + 'static,
{
    let chain_id = provider.get_chain_id().await?;

    info!(chain_id = ?chain_id, "Starting Loom" );

    let (_encoder_name, encoder) = topology_config.encoders.iter().next().ok_or_eyre("NO_ENCODER")?;

    let multicaller_address: Option<Address> = match encoder {
        EncoderConfig::SwapStep(e) => e.address.parse().ok(),
    };
    let multicaller_address = multicaller_address.ok_or_eyre("MULTICALLER_ADDRESS_NOT_SET")?;
    let private_key_encrypted = hex::decode(env::var("DATA")?)?;
    info!(address=?multicaller_address, "Multicaller");

    let webserver_host = topology_config.webserver.unwrap_or_default().host;
    let db_url = topology_config.database.unwrap().url;
    let db_pool = init_db_pool(db_url).await?;

    // Get flashbots relays from config
    let relays = topology_config
        .actors
        .broadcaster
        .as_ref()
        .and_then(|b| b.get("mainnet"))
        .map(|b| match b {
            BroadcasterConfig::Flashbots(f) => f.relays(),
        })
        .unwrap_or_default();

    let pools_config = PoolsConfig::disable_all().enable(PoolClass::UniswapV2).enable(PoolClass::UniswapV3);

    let backrun_config: BackrunConfigSection = load_from_file::<BackrunConfigSection>(loom_config_filepath.into()).await?;
    let backrun_config: BackrunConfig = backrun_config.backrun_strategy;

    let mut bc_actors = BlockchainActors::new(provider.clone(), bc.clone(), relays);
    bc_actors
        .mempool()?
        .with_wait_for_node_sync()? // wait for node to sync before
        .initialize_signers_with_encrypted_key(private_key_encrypted)? // initialize signer with encrypted key
        .with_block_history()? // collect blocks
        .with_price_station()? // calculate price fo tokens
        .with_health_monitor_pools()? // monitor pools health to disable empty
        //.with_health_monitor_state()? // monitor state health
        .with_health_monitor_stuffing_tx()? // collect stuffing tx information
        .with_swap_encoder(Some(multicaller_address))? // convert swaps to opcodes and passes to estimator
        .with_evm_estimator()? // estimate gas, add tips
        .with_signers()? // start signer actor that signs transactions before broadcasting
        .with_flashbots_broadcaster(false, true)? // broadcast signed txes to flashbots
        .with_market_state_preloader()? // preload contracts to market state
        .with_nonce_and_balance_monitor()? // start monitoring balances of
        .with_pool_history_loader(pools_config.clone())? // load pools used in latest 10000 blocks
        //.with_curve_pool_protocol_loader()? // load curve + steth + wsteth
        .with_new_pool_loader(pools_config.clone())? // load new pools
        .with_pool_loader()?
        .with_swap_path_merger()? // load merger for multiple swap paths
        .with_diff_path_merger()? // load merger for different swap paths
        .with_same_path_merger()? // load merger for same swap paths with different stuffing txes
        .with_backrun_block(backrun_config.clone())? // load backrun searcher for incoming block
        .with_backrun_mempool(backrun_config)? // load backrun searcher for mempool txes
        .with_web_server(webserver_host, Router::new(), db_pool)? // start web server
    ;

    if !is_exex {
        bc_actors.with_block_events(NodeBlockActorConfig::all_enabled())?.with_remote_mempool(provider.clone())?;
    }

    if let Some(influxdb_config) = topology_config.influxdb {
        bc_actors
            .with_influxdb_writer(influxdb_config.url, influxdb_config.database, influxdb_config.tags)?
            .with_block_latency_recorder()?;
    }

    bc_actors.wait().await;

    Ok(())
}