loom_broadcast_accounts/signers/
signers_actor.rsuse alloy_eips::eip2718::Encodable2718;
use eyre::{eyre, Result};
use revm::DatabaseRef;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;
use tracing::{error, info};
use loom_core_actors::{Actor, ActorResult, Broadcaster, Consumer, Producer, WorkerResult};
use loom_core_actors_macros::{Accessor, Consumer, Producer};
use loom_core_blockchain::Blockchain;
use loom_types_events::{MessageTxCompose, RlpState, TxCompose, TxComposeData, TxState};
async fn sign_task<DB: Send + Sync + Clone>(
sign_request: TxComposeData<DB>,
compose_channel_tx: Broadcaster<MessageTxCompose<DB>>,
) -> Result<()> {
let signer = match sign_request.signer.clone() {
Some(signer) => signer,
None => {
error!("No signer found in sign_request");
return Err(eyre!("NO_SIGNER_FOUND"));
}
};
let rlp_bundle: Vec<RlpState> = sign_request
.tx_bundle
.clone()
.unwrap()
.iter()
.map(|tx_request| match &tx_request {
TxState::Stuffing(t) => RlpState::Stuffing(t.inner.encoded_2718().into()),
TxState::SignatureRequired(t) => {
let (tx_hash, signed_tx_bytes) = signer.sign_sync(t.clone()).unwrap();
info!("Tx signed {tx_hash:?}");
RlpState::Backrun(signed_tx_bytes)
}
TxState::ReadyForBroadcast(t) => RlpState::Backrun(t.clone()),
TxState::ReadyForBroadcastStuffing(t) => RlpState::Stuffing(t.clone()),
})
.collect();
if rlp_bundle.iter().any(|item| item.is_none()) {
error!("Bundle is not ready. Cannot sign");
return Err(eyre!("CANNOT_SIGN_BUNDLE"));
}
let broadcast_request = TxComposeData { rlp_bundle: Some(rlp_bundle), ..sign_request };
match compose_channel_tx.send(MessageTxCompose::broadcast(broadcast_request)).await {
Err(e) => {
error!("{e}");
Err(eyre!("BROADCAST_ERROR"))
}
_ => Ok(()),
}
}
async fn request_listener_worker<DB: Send + Sync + Clone>(
compose_channel_rx: Broadcaster<MessageTxCompose<DB>>,
compose_channel_tx: Broadcaster<MessageTxCompose<DB>>,
) -> WorkerResult {
let mut compose_channel_rx: Receiver<MessageTxCompose<DB>> = compose_channel_rx.subscribe().await;
loop {
tokio::select! {
msg = compose_channel_rx.recv() => {
let compose_request_msg : Result<MessageTxCompose<DB>, RecvError> = msg;
match compose_request_msg {
Ok(compose_request) =>{
if let TxCompose::Sign( sign_request)= compose_request.inner {
tokio::task::spawn(
sign_task(
sign_request,
compose_channel_tx.clone(),
)
);
}
}
Err(e)=>{error!("{}",e)}
}
}
}
}
}
#[derive(Accessor, Consumer, Producer, Default)]
pub struct TxSignersActor<DB: Send + Sync + Clone + 'static> {
#[consumer]
compose_channel_rx: Option<Broadcaster<MessageTxCompose<DB>>>,
#[producer]
compose_channel_tx: Option<Broadcaster<MessageTxCompose<DB>>>,
}
impl<DB> TxSignersActor<DB>
where
DB: DatabaseRef + Clone + Send + Sync + Default,
{
pub fn new() -> TxSignersActor<DB> {
TxSignersActor::default()
}
pub fn on_bc(self, bc: &Blockchain<DB>) -> Self {
Self { compose_channel_rx: Some(bc.compose_channel()), compose_channel_tx: Some(bc.compose_channel()) }
}
}
impl<DB> Actor for TxSignersActor<DB>
where
DB: DatabaseRef + Clone + Send + Sync,
{
fn start(&self) -> ActorResult {
let task =
tokio::task::spawn(request_listener_worker(self.compose_channel_rx.clone().unwrap(), self.compose_channel_tx.clone().unwrap()));
Ok(vec![task])
}
fn name(&self) -> &'static str {
"SignersActor"
}
}