loom_node_player/
compose.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
use loom_core_actors::{Broadcaster, SharedState, WorkerResult};
use loom_evm_utils::reth_types::decode_into_transaction;
use loom_types_blockchain::Mempool;
use loom_types_events::{MessageTxCompose, RlpState, TxCompose};
use tokio::select;
use tracing::{error, info};

pub(crate) async fn replayer_compose_worker<DB: Clone + Send + Sync>(
    mempool: SharedState<Mempool>,
    compose_channel: Broadcaster<MessageTxCompose<DB>>,
) -> WorkerResult {
    let mut compose_channel_rx = compose_channel.subscribe().await;

    loop {
        select! {
            msg = compose_channel_rx.recv() => {
                if let Ok(msg) = msg {
                    if let TxCompose::Broadcast(broadcast_msg) = msg.inner {
                        info!("Broadcast compose message received. {:?}", broadcast_msg.tx_bundle);
                        for tx in broadcast_msg.rlp_bundle.unwrap_or_default() {
                            match tx {
                                RlpState::Backrun( rlp_tx) | RlpState::Stuffing( rlp_tx)=>{
                                    match decode_into_transaction( &rlp_tx ) {
                                        Ok(new_tx)=>{
                                            mempool.write().await.add_tx(new_tx);
                                        }
                                        Err(e)=>{
                                            error!("decode_into_transaction {}", e);
                                        }
                                    }

                                }
                                _=>{
                                    error!("Unknown RLP tx type");
                                }
                            }
                        }
                    }
                }
            }
        }
    }
}