loom_defi_market/
new_pool_actor.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
use eyre::Result;
use revm::DatabaseCommit;
use revm::DatabaseRef;
use tokio::sync::broadcast::error::RecvError;
use tracing::{debug, error};

use loom_core_actors::{subscribe, Actor, ActorResult, Broadcaster, Consumer, Producer, WorkerResult};
use loom_core_actors_macros::{Consumer, Producer};
use loom_core_blockchain::Blockchain;
use loom_defi_pools::PoolsConfig;
use loom_types_events::{MessageBlockLogs, Task};

use crate::logs_parser::process_log_entries;

pub async fn new_pool_worker(
    log_update_rx: Broadcaster<MessageBlockLogs>,
    pools_config: PoolsConfig,
    tasks_tx: Broadcaster<Task>,
) -> WorkerResult {
    subscribe!(log_update_rx);

    loop {
        tokio::select! {
            msg = log_update_rx.recv() => {
                debug!("Log update");

                let log_update : Result<MessageBlockLogs, RecvError>  = msg;
                match log_update {
                    Ok(log_update_msg)=>{
                        process_log_entries(
                                log_update_msg.inner.logs,
                                &pools_config,
                                tasks_tx.clone(),
                        ).await?
                    }
                    Err(e)=>{
                        error!("block_update error {}", e)
                    }
                }

            }
        }
    }
}

#[derive(Consumer, Producer)]
pub struct NewPoolLoaderActor {
    #[consumer]
    log_update_rx: Option<Broadcaster<MessageBlockLogs>>,
    pools_config: PoolsConfig,
    #[producer]
    tasks_tx: Option<Broadcaster<Task>>,
}

impl NewPoolLoaderActor {
    pub fn new(pools_config: PoolsConfig) -> Self {
        NewPoolLoaderActor { log_update_rx: None, pools_config, tasks_tx: None }
    }

    pub fn on_bc<DB: DatabaseRef + DatabaseCommit + Send + Sync + Clone + 'static>(self, bc: &Blockchain<DB>) -> Self {
        Self { log_update_rx: Some(bc.new_block_logs_channel()), tasks_tx: Some(bc.tasks_channel()), ..self }
    }
}

impl Actor for NewPoolLoaderActor {
    fn start(&self) -> ActorResult {
        let task = tokio::task::spawn(new_pool_worker(
            self.log_update_rx.clone().unwrap(),
            self.pools_config.clone(),
            self.tasks_tx.clone().unwrap(),
        ));
        Ok(vec![task])
    }

    fn name(&self) -> &'static str {
        "NewPoolLoaderActor"
    }
}