loom_strategy_backrun/
arb_actor.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
use std::marker::PhantomData;

use alloy_network::Network;
use alloy_provider::Provider;
use alloy_transport::Transport;
use eyre::ErrReport;
use revm::{Database, DatabaseCommit, DatabaseRef};
use tokio::task::JoinHandle;
use tracing::info;

use loom_core_actors::{Accessor, Actor, ActorResult, Broadcaster, Consumer, Producer, SharedState, WorkerResult};
use loom_core_actors_macros::{Accessor, Consumer, Producer};
use loom_node_debug_provider::DebugProviderExt;
use loom_types_blockchain::Mempool;
use loom_types_entities::{BlockHistory, LatestBlock, Market, MarketState};
use loom_types_events::{MarketEvents, MempoolEvents, MessageHealthEvent, MessageTxCompose};

use super::{PendingTxStateChangeProcessorActor, StateChangeArbSearcherActor};
use crate::block_state_change_processor::BlockStateChangeProcessorActor;
use crate::BackrunConfig;

#[derive(Accessor, Consumer, Producer)]
pub struct StateChangeArbActor<P, T, N, DB: Clone + Send + Sync + 'static> {
    backrun_config: BackrunConfig,
    client: P,
    use_blocks: bool,
    use_mempool: bool,
    #[accessor]
    market: Option<SharedState<Market>>,
    #[accessor]
    mempool: Option<SharedState<Mempool>>,
    #[accessor]
    latest_block: Option<SharedState<LatestBlock>>,
    #[accessor]
    market_state: Option<SharedState<MarketState<DB>>>,
    #[accessor]
    block_history: Option<SharedState<BlockHistory<DB>>>,
    #[consumer]
    mempool_events_tx: Option<Broadcaster<MempoolEvents>>,
    #[consumer]
    market_events_tx: Option<Broadcaster<MarketEvents>>,
    #[producer]
    compose_channel_tx: Option<Broadcaster<MessageTxCompose<DB>>>,
    #[producer]
    pool_health_monitor_tx: Option<Broadcaster<MessageHealthEvent>>,
    _t: PhantomData<T>,
    _n: PhantomData<N>,
}

impl<P, T, N, DB> StateChangeArbActor<P, T, N, DB>
where
    T: Transport + Clone,
    N: Network,
    P: Provider<T, N> + DebugProviderExt<T, N> + Send + Sync + Clone + 'static,
    DB: DatabaseRef + Send + Sync + Clone + 'static,
{
    pub fn new(client: P, use_blocks: bool, use_mempool: bool, backrun_config: BackrunConfig) -> StateChangeArbActor<P, T, N, DB> {
        StateChangeArbActor {
            backrun_config,
            client,
            use_blocks,
            use_mempool,
            market: None,
            mempool: None,
            latest_block: None,
            block_history: None,
            market_state: None,
            mempool_events_tx: None,
            market_events_tx: None,
            compose_channel_tx: None,
            pool_health_monitor_tx: None,
            _t: PhantomData,
            _n: PhantomData,
        }
    }
}

impl<P, T, N, DB> Actor for StateChangeArbActor<P, T, N, DB>
where
    T: Transport + Clone,
    N: Network,
    P: Provider<T, N> + DebugProviderExt<T, N> + Send + Sync + Clone + 'static,
    DB: DatabaseRef<Error = ErrReport> + Database<Error = ErrReport> + DatabaseCommit + Send + Sync + Clone + Default + 'static,
{
    fn start(&self) -> ActorResult {
        let searcher_pool_update_channel = Broadcaster::new(100);
        let mut tasks: Vec<JoinHandle<WorkerResult>> = Vec::new();

        let mut state_update_searcher = StateChangeArbSearcherActor::new(self.backrun_config.clone());
        match state_update_searcher
            .access(self.market.clone().unwrap())
            .consume(searcher_pool_update_channel.clone())
            .produce(self.compose_channel_tx.clone().unwrap())
            .produce(self.pool_health_monitor_tx.clone().unwrap())
            .start()
        {
            Err(e) => {
                panic!("{}", e)
            }
            Ok(r) => {
                tasks.extend(r);
                info!("State change searcher actor started successfully")
            }
        }

        if self.mempool_events_tx.is_some() && self.use_mempool {
            let mut pending_tx_state_processor = PendingTxStateChangeProcessorActor::new(self.client.clone());
            match pending_tx_state_processor
                .access(self.mempool.clone().unwrap())
                .access(self.latest_block.clone().unwrap())
                .access(self.market.clone().unwrap())
                .access(self.market_state.clone().unwrap())
                .consume(self.mempool_events_tx.clone().unwrap())
                .consume(self.market_events_tx.clone().unwrap())
                .produce(searcher_pool_update_channel.clone())
                .start()
            {
                Err(e) => {
                    panic!("{}", e)
                }
                Ok(r) => {
                    tasks.extend(r);
                    info!("Pending tx state actor started successfully")
                }
            }
        }

        if self.market_events_tx.is_some() && self.use_blocks {
            let mut block_state_processor = BlockStateChangeProcessorActor::new();
            match block_state_processor
                .access(self.market.clone().unwrap())
                .access(self.block_history.clone().unwrap())
                .consume(self.market_events_tx.clone().unwrap())
                .produce(searcher_pool_update_channel.clone())
                .start()
            {
                Err(e) => {
                    panic!("{}", e)
                }
                Ok(r) => {
                    tasks.extend(r);
                    info!("Block change state actor started successfully")
                }
            }
        }

        Ok(tasks)
    }

    fn name(&self) -> &'static str {
        "StateChangeArbActor"
    }
}