use super::{PathInfo, PathInfoService};
use crate::nar::ingest_nar_and_hash;
use futures::{stream::BoxStream, TryStreamExt};
use nix_compat::{
narinfo::{self, NarInfo, Signature},
nixbase32,
nixhash::NixHash,
store_path::StorePath,
};
use reqwest::StatusCode;
use std::sync::Arc;
use tokio::io::{self, AsyncRead};
use tonic::async_trait;
use tracing::{debug, instrument, warn};
use tvix_castore::composition::{CompositionContext, ServiceBuilder};
use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error};
use url::Url;
pub struct NixHTTPPathInfoService<BS, DS> {
base_url: url::Url,
http_client: reqwest_middleware::ClientWithMiddleware,
blob_service: BS,
directory_service: DS,
public_keys: Option<Vec<narinfo::VerifyingKey>>,
}
impl<BS, DS> NixHTTPPathInfoService<BS, DS> {
pub fn new(base_url: url::Url, blob_service: BS, directory_service: DS) -> Self {
Self {
base_url,
http_client: reqwest_middleware::ClientBuilder::new(reqwest::Client::new())
.with(tvix_tracing::propagate::reqwest::tracing_middleware())
.build(),
blob_service,
directory_service,
public_keys: None,
}
}
pub fn set_public_keys(&mut self, public_keys: Vec<narinfo::VerifyingKey>) {
self.public_keys = Some(public_keys);
}
}
#[async_trait]
impl<BS, DS> PathInfoService for NixHTTPPathInfoService<BS, DS>
where
BS: BlobService + Send + Sync + Clone + 'static,
DS: DirectoryService + Send + Sync + Clone + 'static,
{
#[instrument(skip_all, err, fields(path.digest=nixbase32::encode(&digest)))]
async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
let narinfo_url = self
.base_url
.join(&format!("{}.narinfo", nixbase32::encode(&digest)))
.map_err(|e| {
warn!(e = %e, "unable to join URL");
io::Error::new(io::ErrorKind::InvalidInput, "unable to join url")
})?;
debug!(narinfo_url= %narinfo_url, "constructed NARInfo url");
let resp = self
.http_client
.get(narinfo_url)
.send()
.await
.map_err(|e| {
warn!(e=%e,"unable to send NARInfo request");
io::Error::new(
io::ErrorKind::InvalidInput,
"unable to send NARInfo request",
)
})?;
if resp.status() == StatusCode::NOT_FOUND || resp.status() == StatusCode::FORBIDDEN {
return Ok(None);
}
let narinfo_str = resp.text().await.map_err(|e| {
warn!(e=%e,"unable to decode response as string");
io::Error::new(
io::ErrorKind::InvalidData,
"unable to decode response as string",
)
})?;
let narinfo = NarInfo::parse(&narinfo_str).map_err(|e| {
warn!(e=%e,"unable to parse response as NarInfo");
io::Error::new(
io::ErrorKind::InvalidData,
"unable to parse response as NarInfo",
)
})?;
if let Some(public_keys) = &self.public_keys {
let fingerprint = narinfo.fingerprint();
if !public_keys.iter().any(|pubkey| {
narinfo
.signatures
.iter()
.any(|sig| pubkey.verify(&fingerprint, sig))
}) {
warn!("no valid signature found");
Err(io::Error::new(
io::ErrorKind::InvalidData,
"no valid signature found",
))?;
}
}
let nar_url = self.base_url.join(narinfo.url).map_err(|e| {
warn!(e = %e, "unable to join URL");
io::Error::new(io::ErrorKind::InvalidInput, "unable to join url")
})?;
debug!(nar_url= %nar_url, "constructed NAR url");
let resp = self
.http_client
.get(nar_url.clone())
.send()
.await
.map_err(|e| {
warn!(e=%e,"unable to send NAR request");
io::Error::new(io::ErrorKind::InvalidInput, "unable to send NAR request")
})?;
if !resp.status().is_success() {
return Err(Error::StorageError(format!(
"unable to retrieve NAR at {}, status {}",
nar_url,
resp.status()
)));
}
let r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| {
let e = e.without_url();
warn!(e=%e, "failed to get response body");
io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
}));
let mut r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression {
None => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>,
Some("bzip2") => Box::new(async_compression::tokio::bufread::BzDecoder::new(r))
as Box<dyn AsyncRead + Send + Unpin>,
Some("gzip") => Box::new(async_compression::tokio::bufread::GzipDecoder::new(r))
as Box<dyn AsyncRead + Send + Unpin>,
Some("xz") => Box::new(async_compression::tokio::bufread::XzDecoder::new(r))
as Box<dyn AsyncRead + Send + Unpin>,
Some("zstd") => Box::new(async_compression::tokio::bufread::ZstdDecoder::new(r))
as Box<dyn AsyncRead + Send + Unpin>,
Some(comp_str) => {
return Err(Error::StorageError(format!(
"unsupported compression: {comp_str}"
)));
}
};
let (root_node, nar_hash, nar_size) = ingest_nar_and_hash(
self.blob_service.clone(),
self.directory_service.clone(),
&mut r,
)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
if narinfo.nar_size != nar_size {
warn!(
narinfo.nar_size = narinfo.nar_size,
http.nar_size = nar_size,
"NarSize mismatch"
);
Err(io::Error::new(
io::ErrorKind::InvalidData,
"NarSize mismatch".to_string(),
))?;
}
if narinfo.nar_hash != nar_hash {
warn!(
narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash),
http.nar_hash = %NixHash::Sha256(nar_hash),
"NarHash mismatch"
);
Err(io::Error::new(
io::ErrorKind::InvalidData,
"NarHash mismatch".to_string(),
))?;
}
Ok(Some(PathInfo {
store_path: narinfo.store_path.to_owned(),
node: root_node,
references: narinfo.references.iter().map(StorePath::to_owned).collect(),
nar_size: narinfo.nar_size,
nar_sha256: narinfo.nar_hash,
deriver: narinfo.deriver.as_ref().map(StorePath::to_owned),
signatures: narinfo
.signatures
.into_iter()
.map(|s| Signature::<String>::new(s.name().to_string(), s.bytes().to_owned()))
.collect(),
ca: narinfo.ca,
}))
}
#[instrument(skip_all, fields(path_info=?_path_info))]
async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, Error> {
Err(Error::InvalidRequest(
"put not supported for this backend".to_string(),
))
}
fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
Box::pin(futures::stream::once(async {
Err(Error::InvalidRequest(
"list not supported for this backend".to_string(),
))
}))
}
}
#[derive(serde::Deserialize)]
pub struct NixHTTPPathInfoServiceConfig {
base_url: String,
blob_service: String,
directory_service: String,
#[serde(default)]
public_keys: Option<Vec<String>>,
}
impl TryFrom<Url> for NixHTTPPathInfoServiceConfig {
type Error = Box<dyn std::error::Error + Send + Sync>;
fn try_from(url: Url) -> Result<Self, Self::Error> {
let mut public_keys: Option<Vec<String>> = None;
for (_, v) in url
.query_pairs()
.into_iter()
.filter(|(k, _)| k == "trusted-public-keys")
{
public_keys
.get_or_insert(Default::default())
.extend(v.split_ascii_whitespace().map(ToString::to_string));
}
let blob_service = url
.query_pairs()
.into_iter()
.find(|(k, _)| k == "blob_service")
.map(|(_, v)| v.to_string())
.unwrap_or("default".to_string());
let directory_service = url
.query_pairs()
.into_iter()
.find(|(k, _)| k == "directory_service")
.map(|(_, v)| v.to_string())
.unwrap_or("default".to_string());
Ok(NixHTTPPathInfoServiceConfig {
base_url: url.to_string().strip_prefix("nix+").unwrap().to_string(),
blob_service,
directory_service,
public_keys,
})
}
}
#[async_trait]
impl ServiceBuilder for NixHTTPPathInfoServiceConfig {
type Output = dyn PathInfoService;
async fn build<'a>(
&'a self,
_instance_name: &str,
context: &CompositionContext,
) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync + 'static>> {
let (blob_service, directory_service) = futures::join!(
context.resolve::<dyn BlobService>(self.blob_service.clone()),
context.resolve::<dyn DirectoryService>(self.directory_service.clone())
);
let mut svc = NixHTTPPathInfoService::new(
Url::parse(&self.base_url)?,
blob_service?,
directory_service?,
);
if let Some(public_keys) = &self.public_keys {
svc.set_public_keys(
public_keys
.iter()
.map(|pubkey_str| {
narinfo::VerifyingKey::parse(pubkey_str)
.map_err(|e| Error::StorageError(format!("invalid public key: {e}")))
})
.collect::<Result<Vec<_>, Error>>()?,
);
}
Ok(Arc::new(svc))
}
}