loom_defi_market/
logs_parser.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
use alloy_primitives::Log as EVMLog;
use alloy_rpc_types::Log;
use alloy_sol_types::SolEventInterface;
use eyre::Result;
use std::collections::HashMap;
use tracing::error;

use loom_core_actors::{run_async, Broadcaster};
use loom_defi_abi::maverick::IMaverickPool::IMaverickPoolEvents;
use loom_defi_abi::uniswap2::IUniswapV2Pair::IUniswapV2PairEvents;
use loom_defi_abi::uniswap3::IUniswapV3Pool::IUniswapV3PoolEvents;
use loom_defi_pools::PoolsConfig;
use loom_types_entities::PoolClass;
use loom_types_events::Task;

fn determine_pool_class(log_entry: Log) -> Option<PoolClass> {
    {
        let log_entry: Option<EVMLog> = EVMLog::new(log_entry.address(), log_entry.topics().to_vec(), log_entry.data().data.clone());
        match log_entry {
            Some(log_entry) => match IUniswapV3PoolEvents::decode_log(&log_entry, false) {
                Ok(event) => match event.data {
                    IUniswapV3PoolEvents::Swap(_)
                    | IUniswapV3PoolEvents::Mint(_)
                    | IUniswapV3PoolEvents::Burn(_)
                    | IUniswapV3PoolEvents::Initialize(_) => Some(PoolClass::UniswapV3),
                    _ => None,
                },
                Err(_) => None,
            }
            .or_else(|| {
                {
                    match IMaverickPoolEvents::decode_log(&log_entry, false) {
                        Ok(event) => match event.data {
                            IMaverickPoolEvents::Swap(_)
                            | IMaverickPoolEvents::AddLiquidity(_)
                            | IMaverickPoolEvents::RemoveLiquidity(_) => Some(PoolClass::UniswapV3),
                            _ => None,
                        },
                        Err(_) => None,
                    }
                }
                .or_else(|| match IUniswapV2PairEvents::decode_log(&log_entry, false) {
                    Ok(event) => match event.data {
                        IUniswapV2PairEvents::Swap(_)
                        | IUniswapV2PairEvents::Mint(_)
                        | IUniswapV2PairEvents::Burn(_)
                        | IUniswapV2PairEvents::Sync(_) => Some(PoolClass::UniswapV2),
                        _ => None,
                    },
                    Err(_) => None,
                })
            }),
            _ => None,
        }
    }
}

pub async fn process_log_entries(log_entries: Vec<Log>, pools_config: &PoolsConfig, tasks_tx: Broadcaster<Task>) -> Result<()> {
    let mut pool_to_fetch = Vec::new();
    let mut processed_pools = HashMap::new();

    for log_entry in log_entries.into_iter() {
        if let Some(pool_class) = determine_pool_class(log_entry.clone()) {
            if !pools_config.is_enabled(pool_class) {
                continue;
            }

            // was this pool already processed?
            if processed_pools.insert(log_entry.address(), true).is_some() {
                continue;
            }

            pool_to_fetch.push((log_entry.address(), pool_class));
        }
    }

    run_async!(tasks_tx.send(Task::FetchAndAddPools(pool_to_fetch)));
    Ok(())
}