1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
use std::{future::Future, sync::Arc};
use bytes::Bytes;
use tokio::{
io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf},
sync::Mutex,
};
use tracing::{debug, warn};
use super::{
types::QueryValidPaths,
worker_protocol::{server_handshake_client, ClientSettings, Operation, Trust, STDERR_LAST},
NixDaemonIO,
};
use crate::{
store_path::StorePath,
wire::{
de::{NixRead, NixReader},
ser::{NixSerialize, NixWrite, NixWriter, NixWriterBuilder},
ProtocolVersion,
},
};
use crate::{nix_daemon::types::NixError, worker_protocol::STDERR_ERROR};
/// Handles a single connection with a nix client.
///
/// As part of its [`initialization`] it performs the handshake with the client
/// and determines the [ProtocolVersion] and [ClientSettings] to use for the remainder of the session.
///
/// Once initialized, [`handle_client`] needs to be called to handle the rest of the session,
/// it delegates all operation handling to an instance of [NixDaemonIO].
///
/// [`initialization`]: NixDaemon::initialize
#[allow(dead_code)]
pub struct NixDaemon<IO, R, W> {
io: Arc<IO>,
protocol_version: ProtocolVersion,
client_settings: ClientSettings,
reader: NixReader<R>,
writer: Arc<Mutex<NixWriter<W>>>,
}
impl<IO, R, W> NixDaemon<IO, R, W>
where
IO: NixDaemonIO + Sync + Send,
{
pub fn new(
io: Arc<IO>,
protocol_version: ProtocolVersion,
client_settings: ClientSettings,
reader: NixReader<R>,
writer: NixWriter<W>,
) -> Self {
Self {
io,
protocol_version,
client_settings,
reader,
writer: Arc::new(Mutex::new(writer)),
}
}
}
impl<IO, RW> NixDaemon<IO, ReadHalf<RW>, WriteHalf<RW>>
where
RW: AsyncReadExt + AsyncWriteExt + Send + Unpin + 'static,
IO: NixDaemonIO + Sync + Send,
{
/// Async constructor for NixDaemon.
///
/// Performs the initial handshake with the client and retrieves the client's preferred
/// settings.
///
/// The resulting daemon can handle the client session by calling [NixDaemon::handle_client].
pub async fn initialize(io: Arc<IO>, mut connection: RW) -> Result<Self, std::io::Error>
where
RW: AsyncReadExt + AsyncWriteExt + Send + Unpin,
{
let protocol_version =
server_handshake_client(&mut connection, "2.18.2", Trust::Trusted).await?;
connection.write_u64_le(STDERR_LAST).await?;
let (reader, writer) = split(connection);
let mut reader = NixReader::builder()
.set_version(protocol_version)
.build(reader);
let mut writer = NixWriterBuilder::default()
.set_version(protocol_version)
.build(writer);
// The first op is always SetOptions
let operation: Operation = reader.read_value().await?;
if operation != Operation::SetOptions {
return Err(std::io::Error::other(
"Expected SetOptions operation, but got {operation}",
));
}
let client_settings: ClientSettings = reader.read_value().await?;
writer.write_number(STDERR_LAST).await?;
writer.flush().await?;
Ok(Self::new(
io,
protocol_version,
client_settings,
reader,
writer,
))
}
/// Main client connection loop, reads client's requests and responds to them accordingly.
pub async fn handle_client(&mut self) -> Result<(), std::io::Error> {
let io = self.io.clone();
loop {
let op_code = self.reader.read_number().await?;
match TryInto::<Operation>::try_into(op_code) {
// Note: please keep operations sorted in ascending order of their numerical op number.
Ok(operation) => match operation {
Operation::IsValidPath => {
let path: StorePath<String> = self.reader.read_value().await?;
self.handle(io.is_valid_path(&path)).await?
}
// Note this operation does not currently delegate to NixDaemonIO,
// The general idea is that we will pass relevant ClientSettings
// into individual NixDaemonIO method calls if the need arises.
// For now we just store the settings in the NixDaemon for future use.
Operation::SetOptions => {
self.client_settings = self.reader.read_value().await?;
self.handle(async { Ok(()) }).await?
}
Operation::QueryPathInfo => {
let path: StorePath<String> = self.reader.read_value().await?;
self.handle(io.query_path_info(&path)).await?
}
Operation::QueryPathFromHashPart => {
let hash: Bytes = self.reader.read_value().await?;
self.handle(io.query_path_from_hash_part(&hash)).await?
}
Operation::QueryValidPaths => {
let query: QueryValidPaths = self.reader.read_value().await?;
self.handle(io.query_valid_paths(&query)).await?
}
Operation::QueryValidDerivers => {
let path: StorePath<String> = self.reader.read_value().await?;
self.handle(io.query_valid_derivers(&path)).await?
}
// FUTUREWORK: These are just stubs that return an empty list.
// It's important not to return an error for the local-overlay:// store
// to work properly. While it will not see certain referrers and realizations
// it will not fail on various operations like gc and optimize store. At the
// same time, returning an empty list here shouldn't break any of local-overlay store's
// invariants.
Operation::QueryReferrers | Operation::QueryRealisation => {
let _: String = self.reader.read_value().await?;
self.handle(async move {
warn!(
?operation,
"This operation is not implemented. Returning empty result..."
);
Ok(Vec::<StorePath<String>>::new())
})
.await?
}
_ => {
return Err(std::io::Error::other(format!(
"Operation {operation:?} is not implemented"
)));
}
},
_ => {
return Err(std::io::Error::other(format!(
"Unknown operation code received: {op_code}"
)));
}
}
}
}
/// Handles the operation and sends the response or error to the client.
///
/// As per nix daemon protocol, after sending the request, the client expects zero or more
/// log lines/activities followed by either
/// * STDERR_LAST and the response bytes
/// * STDERR_ERROR and the error
///
/// This is a helper method, awaiting on the passed in future and then
/// handling log lines/activities as described above.
async fn handle<T>(
&mut self,
future: impl Future<Output = std::io::Result<T>>,
) -> Result<(), std::io::Error>
where
T: NixSerialize + Send,
{
let result = future.await;
let mut writer = self.writer.lock().await;
match result {
Ok(r) => {
// the protocol requires that we first indicate that we are done sending logs
// by sending STDERR_LAST and then the response.
writer.write_number(STDERR_LAST).await?;
writer.write_value(&r).await?;
writer.flush().await
}
Err(e) => {
debug!(err = ?e, "IO error");
writer.write_number(STDERR_ERROR).await?;
writer.write_value(&NixError::new(format!("{e:?}"))).await?;
writer.flush().await
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::{io::Result, sync::Arc};
use tokio::io::AsyncWriteExt;
use crate::{
nix_daemon::types::UnkeyedValidPathInfo,
wire::ProtocolVersion,
worker_protocol::{ClientSettings, WORKER_MAGIC_1, WORKER_MAGIC_2},
};
struct MockDaemonIO {}
impl NixDaemonIO for MockDaemonIO {
async fn query_path_info(
&self,
_path: &crate::store_path::StorePath<String>,
) -> Result<Option<UnkeyedValidPathInfo>> {
Ok(None)
}
async fn query_path_from_hash_part(
&self,
_hash: &[u8],
) -> Result<Option<UnkeyedValidPathInfo>> {
Ok(None)
}
}
#[tokio::test]
async fn test_daemon_initialization() {
let mut builder = tokio_test::io::Builder::new();
let test_conn = builder
.read(&WORKER_MAGIC_1.to_le_bytes())
.write(&WORKER_MAGIC_2.to_le_bytes())
// Our version is 1.37
.write(&[37, 1, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
// The client's versin is 1.35
.read(&[35, 1, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
// cpu affinity
.read(&[0; 8])
// reservespace
.read(&[0; 8])
// version (size)
.write(&[0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
// version (data == 2.18.2 + padding)
.write(&[50, 46, 49, 56, 46, 50, 0, 0])
// Trusted (1 == client trusted)
.write(&[1, 0, 0, 0, 0, 0, 0, 0])
// STDERR_LAST
.write(&[115, 116, 108, 97, 0, 0, 0, 0]);
let mut bytes = Vec::new();
let mut writer = NixWriter::new(&mut bytes);
writer
.write_value(&ClientSettings::default())
.await
.unwrap();
writer.flush().await.unwrap();
let test_conn = test_conn
// SetOptions op
.read(&[19, 0, 0, 0, 0, 0, 0, 0])
.read(&bytes)
// STDERR_LAST
.write(&[115, 116, 108, 97, 0, 0, 0, 0])
.build();
let daemon = NixDaemon::initialize(Arc::new(MockDaemonIO {}), test_conn)
.await
.unwrap();
assert_eq!(daemon.client_settings, ClientSettings::default());
assert_eq!(daemon.protocol_version, ProtocolVersion::from_parts(1, 35));
}
}