use alloy_provider::Provider;
use eyre::Result;
use tracing::{error, info};
use loom::core::actors::{Accessor, Actor, Consumer, Producer};
use loom::core::router::SwapRouterActor;
use loom::core::topology::{Topology, TopologyConfig};
use loom::defi::health_monitor::{StateHealthMonitorActor, StuffingTxMonitorActor};
use loom::evm::db::LoomDBType;
use loom::metrics::{BlockLatencyRecorderActor, InfluxDbWriterActor};
use loom::strategy::backrun::{BackrunConfig, BackrunConfigSection, StateChangeArbActor};
use loom::strategy::merger::{ArbSwapPathMergerActor, DiffPathMergerActor, SamePathMergerActor};
use loom::types::entities::config::load_from_file;
use loom::types::events::MarketEvents;
#[tokio::main]
async fn main() -> Result<()> {
env_logger::Builder::from_env(
env_logger::Env::default().default_filter_or("debug,tokio_tungstenite=off,tungstenite=off,alloy_rpc_client=off"),
)
.format_timestamp_micros()
.init();
let topology_config = TopologyConfig::load_from_file("config.toml".to_string())?;
let influxdb_config = topology_config.influxdb.clone();
let (topology, mut worker_task_vec) = Topology::<LoomDBType>::from(topology_config).await?;
let client = topology.get_client(Some("local".to_string()).as_ref())?;
let blockchain = topology.get_blockchain(Some("mainnet".to_string()).as_ref())?;
let tx_signers = topology.get_signers(Some("env_signer".to_string()).as_ref())?;
let backrun_config: BackrunConfigSection = load_from_file("./config.toml".to_string().into()).await?;
let backrun_config: BackrunConfig = backrun_config.backrun_strategy;
let block_nr = client.get_block_number().await?;
info!("Block : {}", block_nr);
info!("Creating shared state");
info!("Starting state change arb actor");
let mut state_change_arb_actor = StateChangeArbActor::new(client.clone(), true, true, backrun_config);
match state_change_arb_actor
.access(blockchain.mempool())
.access(blockchain.latest_block())
.access(blockchain.market())
.access(blockchain.market_state())
.access(blockchain.block_history())
.consume(blockchain.market_events_channel())
.consume(blockchain.mempool_events_channel())
.produce(blockchain.compose_channel())
.produce(blockchain.pool_health_monitor_channel())
.start()
{
Err(e) => {
error!("{}", e)
}
Ok(r) => {
worker_task_vec.extend(r);
info!("State change arb actor started successfully")
}
}
let multicaller = topology.get_multicaller_encoder(None).unwrap().get_contract_address();
info!("Starting swap path encoder actor with multicaller at : {}", multicaller);
let mut swap_path_encoder_actor = SwapRouterActor::new();
match swap_path_encoder_actor
.access(tx_signers.clone())
.access(blockchain.nonce_and_balance())
.consume(blockchain.compose_channel())
.produce(blockchain.compose_channel())
.start()
{
Ok(r) => {
worker_task_vec.extend(r);
info!("Swap path encoder actor started successfully")
}
Err(e) => {
panic!("ArbSwapPathEncoderActor {}", e)
}
}
info!("Starting swap path merger actor");
let mut swap_path_merger_actor = ArbSwapPathMergerActor::new(multicaller);
match swap_path_merger_actor
.access(blockchain.latest_block())
.consume(blockchain.market_events_channel())
.consume(blockchain.compose_channel())
.produce(blockchain.compose_channel())
.start()
{
Ok(r) => {
worker_task_vec.extend(r);
info!("Swap path merger actor started successfully")
}
Err(e) => {
panic!("{}", e)
}
}
let mut same_path_merger_actor = SamePathMergerActor::new(client.clone());
match same_path_merger_actor
.access(blockchain.market_state())
.access(blockchain.latest_block())
.consume(blockchain.market_events_channel())
.consume(blockchain.compose_channel())
.produce(blockchain.compose_channel())
.start()
{
Ok(r) => {
worker_task_vec.extend(r);
info!("Same path merger actor started successfully")
}
Err(e) => {
panic!("{}", e)
}
}
let mut diff_path_merger_actor = DiffPathMergerActor::new();
match diff_path_merger_actor
.consume(blockchain.market_events_channel())
.consume(blockchain.compose_channel())
.produce(blockchain.compose_channel())
.start()
{
Ok(r) => {
worker_task_vec.extend(r);
info!("Diff path merger actor started successfully")
}
Err(e) => {
panic!("{}", e)
}
}
let mut state_health_monitor_actor = StateHealthMonitorActor::new(client.clone());
match state_health_monitor_actor
.access(blockchain.market_state())
.consume(blockchain.compose_channel())
.consume(blockchain.market_events_channel())
.start()
{
Err(e) => {
panic!("State health monitor actor failed : {}", e)
}
Ok(r) => {
worker_task_vec.extend(r);
info!("State health monitor actor started successfully")
}
}
let mut stuffing_txs_monitor_actor = StuffingTxMonitorActor::new(client.clone());
match stuffing_txs_monitor_actor
.access(blockchain.latest_block())
.consume(blockchain.compose_channel())
.consume(blockchain.market_events_channel())
.start()
{
Err(e) => {
panic!("Stuffing txs monitor actor failed : {}", e)
}
Ok(r) => {
worker_task_vec.extend(r);
info!("Stuffing txs monitor actor started successfully")
}
}
if let Some(influxdb_config) = influxdb_config {
let mut influxdb_writer_actor = InfluxDbWriterActor::new(influxdb_config.url, influxdb_config.database, influxdb_config.tags);
match influxdb_writer_actor.consume(blockchain.influxdb_write_channel()).start() {
Err(e) => {
panic!("InfluxDB writer actor failed : {}", e)
}
Ok(r) => {
worker_task_vec.extend(r);
info!("InfluxDB writer actor started successfully")
}
}
let mut block_latency_recorder_actor = BlockLatencyRecorderActor::new();
match block_latency_recorder_actor
.consume(blockchain.new_block_headers_channel())
.produce(blockchain.influxdb_write_channel())
.start()
{
Err(e) => {
panic!("Block latency recorder actor failed : {}", e)
}
Ok(r) => {
worker_task_vec.extend(r);
info!("Block latency recorder actor started successfully")
}
}
}
tokio::task::spawn(async move {
while !worker_task_vec.is_empty() {
let (result, _index, remaining_futures) = futures::future::select_all(worker_task_vec).await;
match result {
Ok(work_result) => match work_result {
Ok(s) => {
info!("ActorWorker {_index} finished : {s}")
}
Err(e) => {
error!("ActorWorker {_index} error : {e}")
}
},
Err(e) => {
error!("ActorWorker join error {_index} : {e}")
}
}
worker_task_vec = remaining_futures;
}
});
let mut s = blockchain.market_events_channel().subscribe().await;
loop {
let msg = s.recv().await;
if let Ok(msg) = msg {
match msg {
MarketEvents::BlockTxUpdate { block_number, block_hash } => {
info!("New block received {} {}", block_number, block_hash);
}
MarketEvents::BlockStateUpdate { block_hash } => {
info!("New block state received {}", block_hash);
}
_ => {}
}
}
}
}