loom_core_actors/channels/
broadcaster.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
use std::sync::Arc;

use eyre::{eyre, Result};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::SendError;
use tokio::sync::broadcast::Receiver;
use tokio::sync::RwLock;
use tracing::error;

#[derive(Clone)]
pub struct Broadcaster<T>
where
    T: Clone + Send + Sync + 'static,
{
    sender: Arc<RwLock<broadcast::Sender<T>>>,
}

impl<T: Clone + Send + Sync + 'static> Broadcaster<T> {
    pub fn new(capacity: usize) -> Self {
        let (sender, _) = broadcast::channel(capacity);
        Self { sender: Arc::new(RwLock::new(sender)) }
    }

    pub async fn send(&self, value: T) -> Result<usize, SendError<T>> {
        let sender = self.sender.write().await;
        sender.send(value)
    }

    pub fn try_send(&self, value: T) -> Result<usize> {
        //let sender = self.sender.write().await;
        match self.sender.try_write() {
            Ok(guard) => match guard.send(value) {
                Ok(size) => Ok(size),
                Err(_) => Err(eyre!("ERROR_SEND")),
            },
            Err(e) => {
                error!("self.sender.try_write {}", e);
                Err(eyre!("ERROR_WRITE_LOCK"))
            }
        }
    }

    pub async fn subscribe(&self) -> Receiver<T> {
        let sender = self.sender.write().await;
        sender.subscribe()
    }

    pub fn subscribe_sync(&self) -> Result<Receiver<T>> {
        let sender = self.sender.try_write()?;
        Ok(sender.subscribe())
    }
}