loom_execution_estimator/
hardhat.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
use alloy_consensus::TxEnvelope;
use alloy_eips::eip2718::Encodable2718;
use alloy_primitives::{Bytes, TxKind, U256};
use alloy_provider::Provider;
use alloy_rpc_types::{TransactionInput, TransactionRequest};
use eyre::{eyre, Result};
use revm::DatabaseRef;
use tokio::sync::broadcast::error::RecvError;
use tracing::{error, info};

use loom_core_actors::{subscribe, Actor, ActorResult, Broadcaster, Consumer, Producer, WorkerResult};
use loom_core_actors_macros::{Consumer, Producer};
use loom_node_debug_provider::DebugProviderExt;
use loom_types_entities::SwapEncoder;
use loom_types_events::{MessageTxCompose, TxCompose, TxComposeData, TxState};

async fn estimator_worker<DB: DatabaseRef + Send + Sync + Clone>(
    swap_encoder: impl SwapEncoder,
    compose_channel_rx: Broadcaster<MessageTxCompose<DB>>,
    compose_channel_tx: Broadcaster<MessageTxCompose<DB>>,
) -> WorkerResult {
    subscribe!(compose_channel_rx);

    loop {
        tokio::select! {
                    msg = compose_channel_rx.recv() => {
                        let compose_request_msg : Result<MessageTxCompose<DB>, RecvError> = msg;
                        match compose_request_msg {
                            Ok(compose_request) =>{
                                if let TxCompose::Estimate(estimate_request) = compose_request.inner {
                                    info!("Hardhat estimation");
                                    let token_in = estimate_request.swap.get_first_token().cloned().ok_or(eyre!("NO_TOKEN"))?;

                                    let tx_signer = estimate_request.signer.clone().ok_or(eyre!("NO_SIGNER"))?;

                                    let gas_price = estimate_request.priority_gas_fee + estimate_request.next_block_base_fee;
                                    let gas_cost = U256::from(100_000 * gas_price);

                                    let profit = estimate_request.swap.abs_profit();
                                    if profit.is_zero() {
                                        return Err(eyre!("NO_PROFIT"));
                                    }
                                    let profit_eth = token_in.calc_eth_value(profit).ok_or(eyre!("CALC_ETH_VALUE_FAILED"))?;

                                    let (to, _call_value, call_data, _) = swap_encoder.encode(
                                        estimate_request.swap.clone(),
                                        estimate_request.tips_pct,
                                        Some(estimate_request.next_block_number),
                                        Some(gas_cost),
                                        Some(tx_signer.address()),
                                        Some(estimate_request.eth_balance),
                                    )?;

                                    let tx_request = TransactionRequest {
                                        transaction_type : Some(2),
                                        chain_id : Some(1),
                                        from: Some(tx_signer.address()),
                                        to: Some(TxKind::Call(to)),
                                        gas: Some(estimate_request.gas),
                                        value: Some(U256::from(1000)),
                                        input: TransactionInput::new(call_data),
                                        nonce: Some(estimate_request.nonce ),
                                        max_priority_fee_per_gas: Some(estimate_request.priority_gas_fee as u128),
                                        max_fee_per_gas: Some(estimate_request.next_block_base_fee as u128), // TODO: Why not prio + base fee?
                                        ..TransactionRequest::default()
                                    };

                                    let gas_price = estimate_request.priority_gas_fee + estimate_request.next_block_base_fee;

                                    if U256::from(300_000 * gas_price) > profit_eth {
                                        error!("Profit is too small");
                                        return Err(eyre!("TOO_SMALL_PROFIT"));
                                    }

                                    let enveloped_txs : Vec<TxEnvelope>= estimate_request.stuffing_txs.iter().map(|item| item.clone().into()).collect();
                                    let stuffing_txs_rlp : Vec<Bytes> = enveloped_txs.into_iter().map(|x| Bytes::from(x.encoded_2718()) ).collect();

                                    let mut tx_with_state: Vec<TxState> = stuffing_txs_rlp.into_iter().map(TxState::ReadyForBroadcastStuffing).collect();

                                    tx_with_state.push(TxState::SignatureRequired(tx_request));

                                    let sign_request = MessageTxCompose::sign(
                                        TxComposeData{
                                            tx_bundle : Some(tx_with_state),
                                            ..estimate_request
                                        }
                                    );

                                    if let Err(e) = compose_channel_tx.send(sign_request).await {
                                        error!("{e}");
                                    }
                                }
                            }
                    Err(e)=>{error!("{e}")}
                }
            }
        }
    }
}

#[allow(dead_code)]
#[derive(Consumer, Producer)]
pub struct HardhatEstimatorActor<P, E, DB: Send + Sync + Clone + 'static> {
    client: P,
    encoder: E,
    #[consumer]
    compose_channel_rx: Option<Broadcaster<MessageTxCompose<DB>>>,
    #[producer]
    compose_channel_tx: Option<Broadcaster<MessageTxCompose<DB>>>,
}

impl<P, E, DB> HardhatEstimatorActor<P, E, DB>
where
    P: Provider + DebugProviderExt + Clone + Send + Sync + 'static,
    E: SwapEncoder + Send + Sync + Clone + 'static,
    DB: DatabaseRef + Send + Sync + Clone,
{
    pub fn new(client: P, encoder: E) -> Self {
        Self { client, encoder, compose_channel_tx: None, compose_channel_rx: None }
    }
}

impl<P, E, DB> Actor for HardhatEstimatorActor<P, E, DB>
where
    P: Provider + DebugProviderExt + Clone + Send + Sync + 'static,
    E: SwapEncoder + Send + Sync + Clone + 'static,
    DB: DatabaseRef + Send + Sync + Clone,
{
    fn start(&self) -> ActorResult {
        let task = tokio::task::spawn(estimator_worker(
            self.encoder.clone(),
            self.compose_channel_rx.clone().unwrap(),
            self.compose_channel_tx.clone().unwrap(),
        ));
        Ok(vec![task])
    }

    fn name(&self) -> &'static str {
        "HardhatEstimatorActor"
    }
}