1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use crate::primary_worker::{DeviceCommand, DeviceSender};
use crate::Shutdown;
use anyhow::{anyhow, Context, Result};
use goxlr_ipc::Socket;
use goxlr_ipc::{DaemonRequest, DaemonResponse};
use log::{debug, info, warn};
use tokio::net::UnixListener;
use tokio::sync::oneshot;

pub async fn listen_for_connections(
    listener: UnixListener,
    usb_tx: DeviceSender,
    mut shutdown_signal: Shutdown,
) {
    loop {
        tokio::select! {
            Ok((stream, addr)) = listener.accept() => {
                let usb_tx = usb_tx.clone();
                tokio::spawn(async move {
                    let socket = Socket::new(addr, stream);
                    handle_connection(socket, usb_tx).await
                });
            }
            () = shutdown_signal.recv() => {
                info!("Shutting down communications worker");
                return;
            }
        };
    }
}

async fn handle_connection(
    mut socket: Socket<DaemonRequest, DaemonResponse>,
    mut usb_tx: DeviceSender,
) {
    while let Some(msg) = socket.read().await {
        match msg {
            Ok(msg) => match handle_packet(msg, &mut usb_tx).await {
                Ok(response) => {
                    if let Err(e) = socket.send(response).await {
                        warn!("Couldn't reply to {:?}: {}", socket.address(), e);
                        return;
                    }
                }
                Err(e) => {
                    if let Err(e) = socket.send(DaemonResponse::Error(e.to_string())).await {
                        warn!("Couldn't reply to {:?}: {}", socket.address(), e);
                        return;
                    }
                }
            },
            Err(e) => warn!("Invalid message from {:?}: {}", socket.address(), e),
        }
    }
    debug!("Disconnected {:?}", socket.address());
}

async fn handle_packet(
    request: DaemonRequest,
    usb_tx: &mut DeviceSender,
) -> Result<DaemonResponse> {
    match request {
        DaemonRequest::Ping => Ok(DaemonResponse::Ok),
        DaemonRequest::GetStatus => {
            let (tx, rx) = oneshot::channel();
            usb_tx
                .send(DeviceCommand::SendDaemonStatus(tx))
                .await
                .map_err(|e| anyhow!(e.to_string()))
                .context("Could not communicate with the device task")?;
            Ok(DaemonResponse::Status(rx.await.context(
                "Could not execute the command on the device task",
            )?))
        }
        DaemonRequest::Command(serial, command) => {
            let (tx, rx) = oneshot::channel();
            usb_tx
                .send(DeviceCommand::RunDeviceCommand(serial, command, tx))
                .await
                .map_err(|e| anyhow!(e.to_string()))
                .context("Could not communicate with the GoXLR device")?;
            rx.await
                .context("Could not execute the command on the GoXLR device")??;
            Ok(DaemonResponse::Ok)
        }
    }
}