loom_core_actors/
actor.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
use crate::channels::Broadcaster;
use crate::shared_state::SharedState;
use eyre::{eyre, Result};
use tokio::task::JoinHandle;
use tracing::info;

pub type WorkerResult = Result<String>;

pub type ActorResult = Result<Vec<JoinHandle<WorkerResult>>>;

pub trait Actor {
    fn wait(&self, handles: ActorResult) -> Result<()> {
        let handles = handles?;
        let actor_name = self.name();
        futures::executor::block_on(async {
            for handle in handles {
                match handle.await {
                    Ok(result) => match result {
                        Ok(msg) => info!("One-shot actor '{}' completed with message: {}", actor_name, msg),
                        Err(e) => return Err(eyre!("Actor '{}' failed with error: {}", actor_name, e)),
                    },
                    Err(e) => return Err(eyre!("Actor task execution failed for '{}' with error: {}", actor_name, e)),
                }
            }
            Ok(())
        })
    }

    fn start_and_wait(&self) -> Result<()> {
        let handles = self.start();
        self.wait(handles)
    }

    fn start(&self) -> ActorResult;

    fn name(&self) -> &'static str;
}

pub trait Producer<T>
where
    T: Sync + Send + Clone,
{
    fn produce(&mut self, _broadcaster: Broadcaster<T>) -> &mut Self {
        panic!("Not implemented");
    }
}

pub trait Consumer<T>
where
    T: Sync + Send + Clone,
{
    fn consume(&mut self, _receiver: Broadcaster<T>) -> &mut Self {
        panic!("Not implemented");
    }
}

pub trait Accessor<T> {
    fn access(&mut self, _data: SharedState<T>) -> &mut Self {
        panic!("Not implemented");
    }
}

#[cfg(test)]
mod test {
    use crate::actor::{Consumer, Producer, SharedState};
    use crate::channels::Broadcaster;

    //use crate::macros::*;

    #[allow(dead_code)]
    #[derive(Clone)]
    struct DataStruct0 {
        data: Option<SharedState<i32>>,
    }

    #[allow(dead_code)]
    #[derive(Clone)]
    struct DataStruct1 {
        data: String,
    }

    #[allow(dead_code)]
    #[derive(Clone)]
    struct DataStruct2 {
        pub data: u32,
    }

    #[allow(dead_code)]
    #[derive(Clone)]
    struct DataStruct3 {
        data: u128,
    }

    #[allow(dead_code)]
    struct TestActor {
        state: Option<SharedState<DataStruct0>>,
        broadcaster0: Option<Broadcaster<DataStruct0>>,
        broadcaster1: Option<Broadcaster<DataStruct1>>,
        consumer2: Option<Broadcaster<DataStruct2>>,
    }

    impl TestActor {
        pub fn new() -> Self {
            Self { state: None, broadcaster0: None, broadcaster1: None, consumer2: None }
        }

        pub async fn start(&self) {}
    }

    impl Consumer<DataStruct2> for TestActor {
        fn consume(&mut self, consumer: Broadcaster<DataStruct2>) -> &mut Self {
            self.consumer2 = Some(consumer);
            self
        }
    }

    impl Producer<DataStruct0> for TestActor {
        fn produce(&mut self, broadcaster: Broadcaster<DataStruct0>) -> &mut Self {
            self.broadcaster0 = Some(broadcaster);
            self
        }
    }

    impl Producer<DataStruct1> for TestActor {
        fn produce(&mut self, broadcaster: Broadcaster<DataStruct1>) -> &mut Self {
            self.broadcaster1 = Some(broadcaster);
            self
        }
    }

    #[tokio::test]
    async fn test_actor() {
        let channel0: Broadcaster<DataStruct0> = Broadcaster::new(10);
        let channel1: Broadcaster<DataStruct1> = Broadcaster::new(10);
        let channel2: Broadcaster<DataStruct2> = Broadcaster::new(10);

        let mut test_actor: TestActor = TestActor::new();
        test_actor.produce(channel0).produce(channel1).consume(channel2).start().await;
    }
}