use anyhow::Context;
use bstr::BStr;
use oci_spec::runtime::{LinuxIdMapping, LinuxIdMappingBuilder};
use tokio::process::{Child, Command};
use tonic::async_trait;
use tracing::{debug, instrument, warn, Span};
use tvix_castore::{
blobservice::BlobService,
directoryservice::DirectoryService,
fs::fuse::FuseDaemon,
import::fs::ingest_path,
refscan::{ReferencePattern, ReferenceScanner},
};
use uuid::Uuid;
use crate::buildservice::BuildRequest;
use crate::{
oci::{get_host_output_paths, make_bundle, make_spec},
proto::{self, build::OutputNeedles},
};
use std::{ffi::OsStr, path::PathBuf, process::Stdio};
use super::BuildService;
const SANDBOX_SHELL: &str = env!("TVIX_BUILD_SANDBOX_SHELL");
const MAX_CONCURRENT_BUILDS: usize = 2; pub struct OCIBuildService<BS, DS> {
bundle_root: PathBuf,
uid_mappings: Vec<LinuxIdMapping>,
gid_mappings: Vec<LinuxIdMapping>,
blob_service: BS,
directory_service: DS,
concurrent_builds: tokio::sync::Semaphore,
}
impl<BS, DS> OCIBuildService<BS, DS> {
pub fn new(bundle_root: PathBuf, blob_service: BS, directory_service: DS) -> Self {
Self {
bundle_root,
blob_service,
directory_service,
uid_mappings: vec![
LinuxIdMappingBuilder::default()
.host_id(1000_u32)
.container_id(0_u32)
.size(1_u32)
.build()
.unwrap(),
LinuxIdMappingBuilder::default()
.host_id(100000_u32)
.container_id(1000_u32)
.size(1_u32)
.build()
.unwrap(),
],
gid_mappings: vec![
LinuxIdMappingBuilder::default()
.host_id(100_u32)
.container_id(0_u32)
.size(1_u32)
.build()
.unwrap(),
LinuxIdMappingBuilder::default()
.host_id(100000_u32)
.container_id(100_u32)
.size(1_u32)
.build()
.unwrap(),
],
concurrent_builds: tokio::sync::Semaphore::new(MAX_CONCURRENT_BUILDS),
}
}
}
#[async_trait]
impl<BS, DS> BuildService for OCIBuildService<BS, DS>
where
BS: BlobService + Clone + 'static,
DS: DirectoryService + Clone + 'static,
{
#[instrument(skip_all, err)]
async fn do_build(&self, request: BuildRequest) -> std::io::Result<proto::Build> {
let _permit = self.concurrent_builds.acquire().await.unwrap();
let bundle_name = Uuid::new_v4();
let bundle_path = self.bundle_root.join(bundle_name.to_string());
let span = Span::current();
span.record("bundle_name", bundle_name.to_string());
let mut runtime_spec = make_spec(&request, true, SANDBOX_SHELL)
.context("failed to create spec")
.map_err(std::io::Error::other)?;
let mut linux = runtime_spec.linux().clone().unwrap();
linux.set_uid_mappings(Some(self.uid_mappings.clone()));
linux.set_gid_mappings(Some(self.gid_mappings.clone()));
runtime_spec.set_linux(Some(linux));
make_bundle(&request, &runtime_spec, &bundle_path)
.context("failed to produce bundle")
.map_err(std::io::Error::other)?;
let host_output_paths = get_host_output_paths(&request, &bundle_path)
.context("failed to calculate host output paths")
.map_err(std::io::Error::other)?;
let patterns = ReferencePattern::new(request.refscan_needles.clone());
let _fuse_daemon = tokio::task::spawn_blocking({
let blob_service = self.blob_service.clone();
let directory_service = self.directory_service.clone();
let dest = bundle_path.join("inputs");
let root_nodes = Box::new(request.inputs.clone());
move || {
let fs = tvix_castore::fs::TvixStoreFs::new(
blob_service,
directory_service,
root_nodes,
true,
false,
);
FuseDaemon::new(fs, dest, 4, true).context("failed to start fuse daemon")
}
})
.await?
.context("mounting")
.map_err(std::io::Error::other)?;
debug!(bundle.path=?bundle_path, bundle.name=%bundle_name, "about to spawn bundle");
let child = spawn_bundle(bundle_path, &bundle_name.to_string())?;
let child_output = child
.wait_with_output()
.await
.context("failed to run process")
.map_err(std::io::Error::other)?;
if !child_output.status.success() {
let stdout = BStr::new(&child_output.stdout);
let stderr = BStr::new(&child_output.stderr);
warn!(stdout=%stdout, stderr=%stderr, exit_code=%child_output.status, "build failed");
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"nonzero exit code".to_string(),
));
}
let (outputs, outputs_needles) = futures::future::try_join_all(
host_output_paths.into_iter().enumerate().map(|(i, p)| {
let output_path = request.outputs[i].clone();
let patterns = patterns.clone();
async move {
debug!(host.path=?p, output.path=?output_path, "ingesting path");
let scanner = ReferenceScanner::new(patterns);
let output_node = ingest_path(
self.blob_service.clone(),
&self.directory_service,
p,
Some(&scanner),
)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Unable to ingest output: {}", e),
)
})?;
let needles = OutputNeedles {
needles: scanner
.matches()
.into_iter()
.enumerate()
.filter(|(_, val)| *val)
.map(|(idx, _)| idx as u64)
.collect(),
};
Ok::<_, std::io::Error>((
tvix_castore::proto::Node::from_name_and_node(
output_path
.file_name()
.and_then(|s| s.to_str())
.map(|s| s.to_string())
.unwrap_or("".into())
.into(),
output_node,
),
needles,
))
}
}),
)
.await?
.into_iter()
.unzip();
Ok(proto::Build {
build_request: Some(request.into()),
outputs,
outputs_needles,
})
}
}
#[instrument(err)]
fn spawn_bundle(
bundle_path: impl AsRef<OsStr> + std::fmt::Debug,
bundle_name: &str,
) -> std::io::Result<Child> {
let mut command = Command::new("runc");
command
.args(&[
"run".into(),
"--bundle".into(),
bundle_path.as_ref().to_os_string(),
bundle_name.into(),
])
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.stdin(Stdio::null());
command.spawn()
}