loom_broadcast_accounts/signers/
signers_actor.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use alloy_eips::eip2718::Encodable2718;
use eyre::{eyre, Result};
use revm::DatabaseRef;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;
use tracing::{error, info};

use loom_core_actors::{Actor, ActorResult, Broadcaster, Consumer, Producer, WorkerResult};
use loom_core_actors_macros::{Accessor, Consumer, Producer};
use loom_core_blockchain::Blockchain;
use loom_types_events::{MessageTxCompose, RlpState, TxCompose, TxComposeData, TxState};

async fn sign_task<DB: Send + Sync + Clone>(
    sign_request: TxComposeData<DB>,
    compose_channel_tx: Broadcaster<MessageTxCompose<DB>>,
) -> Result<()> {
    let signer = match sign_request.signer.clone() {
        Some(signer) => signer,
        None => {
            error!("No signer found in sign_request");
            return Err(eyre!("NO_SIGNER_FOUND"));
        }
    };

    let rlp_bundle: Vec<RlpState> = sign_request
        .tx_bundle
        .clone()
        .unwrap()
        .iter()
        .map(|tx_request| match &tx_request {
            TxState::Stuffing(t) => RlpState::Stuffing(t.inner.encoded_2718().into()),
            TxState::SignatureRequired(t) => {
                let (tx_hash, signed_tx_bytes) = signer.sign_sync(t.clone()).unwrap();
                info!("Tx signed {tx_hash:?}");
                RlpState::Backrun(signed_tx_bytes)
            }
            TxState::ReadyForBroadcast(t) => RlpState::Backrun(t.clone()),
            TxState::ReadyForBroadcastStuffing(t) => RlpState::Stuffing(t.clone()),
        })
        .collect();

    if rlp_bundle.iter().any(|item| item.is_none()) {
        error!("Bundle is not ready. Cannot sign");
        return Err(eyre!("CANNOT_SIGN_BUNDLE"));
    }

    let broadcast_request = TxComposeData { rlp_bundle: Some(rlp_bundle), ..sign_request };

    match compose_channel_tx.send(MessageTxCompose::broadcast(broadcast_request)).await {
        Err(e) => {
            error!("{e}");
            Err(eyre!("BROADCAST_ERROR"))
        }
        _ => Ok(()),
    }
}

async fn request_listener_worker<DB: Send + Sync + Clone>(
    compose_channel_rx: Broadcaster<MessageTxCompose<DB>>,
    compose_channel_tx: Broadcaster<MessageTxCompose<DB>>,
) -> WorkerResult {
    let mut compose_channel_rx: Receiver<MessageTxCompose<DB>> = compose_channel_rx.subscribe().await;

    loop {
        tokio::select! {
            msg = compose_channel_rx.recv() => {
                let compose_request_msg : Result<MessageTxCompose<DB>, RecvError> = msg;
                match compose_request_msg {
                    Ok(compose_request) =>{

                        if let TxCompose::Sign( sign_request)= compose_request.inner {
                            tokio::task::spawn(
                                sign_task(
                                    sign_request,
                                    compose_channel_tx.clone(),
                                )
                            );
                        }
                    }
                    Err(e)=>{error!("{}",e)}
                }
            }
        }
    }
}

#[derive(Accessor, Consumer, Producer, Default)]
pub struct TxSignersActor<DB: Send + Sync + Clone + 'static> {
    #[consumer]
    compose_channel_rx: Option<Broadcaster<MessageTxCompose<DB>>>,
    #[producer]
    compose_channel_tx: Option<Broadcaster<MessageTxCompose<DB>>>,
}

impl<DB> TxSignersActor<DB>
where
    DB: DatabaseRef + Clone + Send + Sync + Default,
{
    pub fn new() -> TxSignersActor<DB> {
        TxSignersActor::default()
    }

    pub fn on_bc(self, bc: &Blockchain<DB>) -> Self {
        Self { compose_channel_rx: Some(bc.compose_channel()), compose_channel_tx: Some(bc.compose_channel()) }
    }
}

impl<DB> Actor for TxSignersActor<DB>
where
    DB: DatabaseRef + Clone + Send + Sync,
{
    fn start(&self) -> ActorResult {
        let task =
            tokio::task::spawn(request_listener_worker(self.compose_channel_rx.clone().unwrap(), self.compose_channel_tx.clone().unwrap()));

        Ok(vec![task])
    }

    fn name(&self) -> &'static str {
        "SignersActor"
    }
}