1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
use std::{io, path::Path, sync::Arc};

use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession};
use parking_lot::Mutex;
use threadpool::ThreadPool;
use tracing::{error, instrument};

#[cfg(test)]
mod tests;

struct FuseServer<FS>
where
    FS: FileSystem + Sync + Send,
{
    server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>,
    channel: fuse_backend_rs::transport::FuseChannel,
}

#[cfg(target_os = "macos")]
const BADFD: libc::c_int = libc::EBADF;
#[cfg(target_os = "linux")]
const BADFD: libc::c_int = libc::EBADFD;

impl<FS> FuseServer<FS>
where
    FS: FileSystem + Sync + Send,
{
    fn start(&mut self) -> io::Result<()> {
        while let Some((reader, writer)) = self
            .channel
            .get_request()
            .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))?
        {
            if let Err(e) = self
                .server
                .handle_message(reader, writer.into(), None, None)
            {
                match e {
                    // This indicates the session has been shut down.
                    fuse_backend_rs::Error::EncodeMessage(e) if e.raw_os_error() == Some(BADFD) => {
                        break;
                    }
                    error => {
                        error!(?error, "failed to handle fuse request");
                        continue;
                    }
                }
            }
        }
        Ok(())
    }
}

/// Starts a [Filesystem] with the specified number of threads, and provides
/// functions to unmount, and wait for it to have completed.
#[derive(Clone)]
pub struct FuseDaemon {
    session: Arc<Mutex<FuseSession>>,
    threads: Arc<ThreadPool>,
}

impl FuseDaemon {
    #[instrument(skip(fs, mountpoint), fields(mountpoint=?mountpoint), err)]
    pub fn new<FS, P>(
        fs: FS,
        mountpoint: P,
        num_threads: usize,
        allow_other: bool,
    ) -> Result<Self, io::Error>
    where
        FS: FileSystem + Sync + Send + 'static,
        P: AsRef<Path> + std::fmt::Debug,
    {
        let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs)));

        let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true)
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;

        #[cfg(target_os = "linux")]
        session.set_allow_other(allow_other);
        session
            .mount()
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;

        // construct a thread pool
        let threads = threadpool::Builder::new()
            .num_threads(num_threads)
            .thread_name("fuse_server".to_string())
            .build();

        for _ in 0..num_threads {
            // for each thread requested, create and start a FuseServer accepting requests.
            let mut server = FuseServer {
                server: server.clone(),
                channel: session
                    .new_channel()
                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?,
            };

            threads.execute(move || {
                let _ = server.start();
            });
        }

        Ok(FuseDaemon {
            session: Arc::new(Mutex::new(session)),
            threads: Arc::new(threads),
        })
    }

    /// Waits for all threads to finish.
    #[instrument(skip_all)]
    pub fn wait(&self) {
        self.threads.join()
    }

    /// Send the unmount command, and waits for all threads to finish.
    #[instrument(skip_all, err)]
    pub fn unmount(&self) -> Result<(), io::Error> {
        // Send the unmount command.
        self.session
            .lock()
            .umount()
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;

        self.wait();
        Ok(())
    }
}

impl Drop for FuseDaemon {
    fn drop(&mut self) {
        if let Err(error) = self.unmount() {
            error!(?error, "failed to unmont fuse filesystem")
        }
    }
}