loom_core_actors/
actor_manager.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
use eyre::Result;
use tokio::task::JoinHandle;
use tracing::{error, info};

use crate::{Actor, WorkerResult};

#[derive(Default)]
pub struct ActorsManager {
    tasks: Vec<JoinHandle<WorkerResult>>,
}

impl ActorsManager {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn start(&mut self, actor: impl Actor + 'static) -> Result<()> {
        match actor.start() {
            Ok(workers) => {
                info!("{} started successfully", actor.name());
                self.tasks.extend(workers);
                Ok(())
            }
            Err(e) => {
                error!("Error starting {} : {}", actor.name(), e);
                Err(e)
            }
        }
    }

    pub fn start_and_wait(&mut self, actor: impl Actor + Send + Sync + 'static) -> Result<()> {
        match actor.start_and_wait() {
            Ok(_) => {
                info!("{} started successfully", actor.name());
                Ok(())
            }
            Err(e) => {
                error!("Error starting {} : {}", actor.name(), e);
                Err(e)
            }
        }
    }

    pub async fn wait(self) {
        let mut f_remaining_futures = self.tasks;
        let mut futures_counter = f_remaining_futures.len();

        while futures_counter > 0 {
            let (result, _index, remaining_futures) = futures::future::select_all(f_remaining_futures).await;
            match result {
                Ok(work_result) => match work_result {
                    Ok(s) => {
                        info!("ActorWorker {_index} finished : {s}")
                    }
                    Err(e) => {
                        error!("ActorWorker {_index} finished with error : {e}")
                    }
                },
                Err(e) => {
                    error!("ActorWorker join error {_index} : {e}")
                }
            }
            f_remaining_futures = remaining_futures;
            futures_counter = f_remaining_futures.len();
        }
    }
}