loom_defi_health_monitor/
stuffing_tx_monitor.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
use alloy_network::{Ethereum, TransactionResponse};
use alloy_primitives::{Address, TxHash, U256};
use alloy_provider::Provider;
use alloy_transport::Transport;
use eyre::{eyre, Result};
use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;
use tracing::{error, info};

use loom_core_blockchain::Blockchain;
use loom_evm_utils::NWETH;
use loom_types_entities::{LatestBlock, Swap, Token};

use loom_core_actors::{Accessor, Actor, ActorResult, Broadcaster, Consumer, SharedState, WorkerResult};
use loom_core_actors_macros::{Accessor, Consumer};
use loom_types_blockchain::debug_trace_transaction;
use loom_types_events::{MarketEvents, MessageTxCompose, TxCompose};
use revm::DatabaseRef;

#[derive(Clone, Debug)]
struct TxToCheck {
    block: u64,
    token_in: Token,
    profit: U256,
    tips: U256,
    swap: Swap,
}

async fn check_mf_tx<P: Provider<T, Ethereum> + 'static, T: Transport + Clone>(
    client: P,
    tx_hash: TxHash,
    coinbase: Address,
) -> Result<()> {
    let (pre, post) = debug_trace_transaction(client, tx_hash, true).await?;

    let coinbase_pre = pre.get(&coinbase).ok_or(eyre!("COINBASE_NOT_FOUND_IN_PRE"))?;
    let coinbase_post = post.get(&coinbase).ok_or(eyre!("COINBASE_NOT_FOUND_IN_POST"))?;

    let balance_diff = coinbase_post.balance.unwrap_or_default().checked_sub(coinbase_pre.balance.unwrap_or_default()).unwrap_or_default();
    info!("Stuffing tx mined MF tx: {:?} sent to coinbase: {}", tx_hash, NWETH::to_float(balance_diff));

    Ok(())
}

pub async fn stuffing_tx_monitor_worker<
    P: Provider<T, Ethereum> + Clone + 'static,
    T: Transport + Clone,
    DB: DatabaseRef + Send + Sync + Clone + 'static,
>(
    client: P,
    latest_block: SharedState<LatestBlock>,
    tx_compose_channel_rx: Broadcaster<MessageTxCompose<DB>>,
    market_events_rx: Broadcaster<MarketEvents>,
) -> WorkerResult {
    let mut tx_compose_channel_rx: Receiver<MessageTxCompose<DB>> = tx_compose_channel_rx.subscribe().await;
    let mut market_events_rx: Receiver<MarketEvents> = market_events_rx.subscribe().await;

    let mut txs_to_check: HashMap<TxHash, TxToCheck> = HashMap::new();

    loop {
        tokio::select! {
            msg = market_events_rx.recv() => {
                let market_event_msg : Result<MarketEvents, RecvError> = msg;
                match market_event_msg {
                    Ok(market_event)=>{
                        if let MarketEvents::BlockTxUpdate{ block_number,..} = market_event {
                            let coinbase =  latest_block.read().await.coinbase().unwrap_or_default();
                            if let Some(txs) = latest_block.read().await.txs().cloned() {
                                for (idx, tx) in txs.iter().enumerate() {
                                    let tx_hash = tx.tx_hash();
                                    if let Some(tx_to_check) = txs_to_check.get(&tx_hash).cloned(){
                                        info!("Stuffing tx found mined {:?} block: {} -> {} idx: {} profit: {} tips: {} token: {} to: {:?} {}", tx.tx_hash(), tx_to_check.block, block_number, idx, NWETH::to_float(tx_to_check.profit), NWETH::to_float(tx_to_check.tips), tx_to_check.token_in.get_symbol(), tx.to().unwrap_or_default(), tx_to_check.swap );
                                        if idx < txs.len() - 1 {
                                            let mf_tx = &txs[idx+1];
                                            info!("Stuffing tx mined {:?} MF tx: {:?} to: {:?}", tx.tx_hash(), mf_tx.tx_hash(), mf_tx.to().unwrap_or_default() );
                                            tokio::task::spawn(
                                                check_mf_tx(client.clone(), mf_tx.tx_hash(), coinbase)
                                            );
                                        }
                                        txs_to_check.remove::<TxHash>(&tx.tx_hash());
                                    }
                                }
                            }
                            info!("Stuffing txs to check : {} at block {}", txs_to_check.len(), block_number)
                        }
                    }
                    Err(e)=>{
                        error!("market_event_rx error : {e}")
                    }
                }
            },

            msg = tx_compose_channel_rx.recv() => {
                let tx_compose_update : Result<MessageTxCompose<DB>, RecvError>  = msg;
                match tx_compose_update {
                    Ok(tx_compose_msg)=>{
                        if let TxCompose::Broadcast(broadcast_data) = tx_compose_msg.inner {
                            for stuffing_tx_hash in broadcast_data.stuffing_txs_hashes.iter() {

                                let token_in = broadcast_data.swap.get_first_token().map_or(
                                    Arc::new(Token::new(Address::repeat_byte(0x11))), |x| x.clone()
                                );

                                let entry = txs_to_check.entry(*stuffing_tx_hash).or_insert(
                                        TxToCheck{
                                                block : broadcast_data.next_block_number,
                                                token_in : token_in.as_ref().clone(),
                                                profit : U256::ZERO,
                                                tips : U256::ZERO,
                                                swap : broadcast_data.swap.clone(),
                                        }
                                );
                                let profit = broadcast_data.swap.abs_profit();
                                let profit = token_in.calc_eth_value(profit).unwrap_or_default();

                                if entry.profit < profit {
                                    entry.token_in = token_in.as_ref().clone();
                                    entry.profit = profit;
                                    entry.tips = broadcast_data.tips.unwrap_or_default();
                                    entry.swap = broadcast_data.swap.clone()
                                }
                            }
                        }
                    }
                    Err(e)=>{
                        error!("tx_compose_channel_rx : {e}")
                    }
                }

            }
        }
    }
}

#[derive(Accessor, Consumer)]
pub struct StuffingTxMonitorActor<P, T, DB: Send + Sync + Clone + 'static> {
    client: P,
    #[accessor]
    latest_block: Option<SharedState<LatestBlock>>,
    #[consumer]
    tx_compose_channel_rx: Option<Broadcaster<MessageTxCompose<DB>>>,
    #[consumer]
    market_events_rx: Option<Broadcaster<MarketEvents>>,
    _t: PhantomData<T>,
}

impl<
        P: Provider<T, Ethereum> + Send + Sync + Clone + 'static,
        T: Transport + Clone,
        DB: DatabaseRef + Send + Sync + Clone + Default + 'static,
    > StuffingTxMonitorActor<P, T, DB>
{
    pub fn new(client: P) -> Self {
        StuffingTxMonitorActor { client, latest_block: None, tx_compose_channel_rx: None, market_events_rx: None, _t: PhantomData }
    }

    pub fn on_bc(self, bc: &Blockchain<DB>) -> Self {
        Self {
            latest_block: Some(bc.latest_block()),
            tx_compose_channel_rx: Some(bc.compose_channel()),
            market_events_rx: Some(bc.market_events_channel()),
            ..self
        }
    }
}

impl<P, T, DB> Actor for StuffingTxMonitorActor<P, T, DB>
where
    T: Transport + Clone,
    P: Provider<T, Ethereum> + Send + Sync + Clone + 'static,
    DB: DatabaseRef + Send + Sync + Clone + Default + 'static,
{
    fn start(&self) -> ActorResult {
        let task = tokio::task::spawn(stuffing_tx_monitor_worker(
            self.client.clone(),
            self.latest_block.clone().unwrap(),
            self.tx_compose_channel_rx.clone().unwrap(),
            self.market_events_rx.clone().unwrap(),
        ));
        Ok(vec![task])
    }

    fn name(&self) -> &'static str {
        "StuffingTxMonitorActor"
    }
}