1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617
use std::{
collections::HashMap,
io::{self, Cursor},
pin::pin,
sync::Arc,
task::Poll,
};
use data_encoding::HEXLOWER;
use fastcdc::v2020::AsyncStreamCDC;
use futures::Future;
use object_store::{path::Path, ObjectStore};
use pin_project_lite::pin_project;
use prost::Message;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio_stream::StreamExt;
use tonic::async_trait;
use tracing::{debug, instrument, trace, Level};
use url::Url;
use crate::{
composition::{CompositionContext, ServiceBuilder},
proto::{stat_blob_response::ChunkMeta, StatBlobResponse},
B3Digest, B3HashingReader, Error,
};
use super::{BlobReader, BlobService, BlobWriter, ChunkedReader};
/// Uses any object storage supported by the [object_store] crate to provide a
/// tvix-castore [BlobService].
///
/// # Data format
/// Data is organized in "blobs" and "chunks".
/// Blobs don't hold the actual data, but instead contain a list of more
/// granular chunks that assemble to the contents requested.
/// This allows clients to seek, and not download chunks they already have
/// locally, as it's referred to from other files.
/// Check `rpc_blobstore` and more general BlobStore docs on that.
///
/// ## Blobs
/// Stored at `${base_path}/blobs/b3/$digest_key`. They contains the serialized
/// StatBlobResponse for the blob with the digest.
///
/// ## Chunks
/// Chunks are stored at `${base_path}/chunks/b3/$digest_key`. They contain
/// the literal contents of the chunk, but are zstd-compressed.
///
/// ## Digest key sharding
/// The blake3 digest encoded in lower hex, and sharded after the second
/// character.
/// The blob for "Hello World" is stored at
/// `${base_path}/blobs/b3/41/41f8394111eb713a22165c46c90ab8f0fd9399c92028fd6d288944b23ff5bf76`.
///
/// This reduces the number of files in the same directory, which would be a
/// problem at least when using [object_store::local::LocalFileSystem].
///
/// # Future changes
/// There's no guarantees about this being a final format yet.
/// Once object_store gets support for additional metadata / content-types,
/// we can eliminate some requests (small blobs only consisting of a single
/// chunk can be stored as-is, without the blob index file).
/// It also allows signalling any compression of chunks in the content-type.
/// Migration *should* be possible by simply adding the right content-types to
/// all keys stored so far, but no promises ;-)
#[derive(Clone)]
pub struct ObjectStoreBlobService {
object_store: Arc<dyn ObjectStore>,
base_path: Path,
/// Average chunk size for FastCDC, in bytes.
/// min value is half, max value double of that number.
avg_chunk_size: u32,
}
#[instrument(level=Level::TRACE, skip_all,fields(base_path=%base_path,blob.digest=%digest),ret(Display))]
fn derive_blob_path(base_path: &Path, digest: &B3Digest) -> Path {
base_path
.child("blobs")
.child("b3")
.child(HEXLOWER.encode(&digest.as_slice()[..2]))
.child(HEXLOWER.encode(digest.as_slice()))
}
#[instrument(level=Level::TRACE, skip_all,fields(base_path=%base_path,chunk.digest=%digest),ret(Display))]
fn derive_chunk_path(base_path: &Path, digest: &B3Digest) -> Path {
base_path
.child("chunks")
.child("b3")
.child(HEXLOWER.encode(&digest.as_slice()[..2]))
.child(HEXLOWER.encode(digest.as_slice()))
}
#[async_trait]
impl BlobService for ObjectStoreBlobService {
#[instrument(skip_all, ret(level = Level::TRACE), err, fields(blob.digest=%digest))]
async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
// TODO: clarify if this should work for chunks or not, and explicitly
// document in the proto docs.
let p = derive_blob_path(&self.base_path, digest);
match self.object_store.head(&p).await {
Ok(_) => Ok(true),
Err(object_store::Error::NotFound { .. }) => {
let p = derive_chunk_path(&self.base_path, digest);
match self.object_store.head(&p).await {
Ok(_) => Ok(true),
Err(object_store::Error::NotFound { .. }) => Ok(false),
Err(e) => Err(e)?,
}
}
Err(e) => Err(e)?,
}
}
#[instrument(skip_all, err, fields(blob.digest=%digest))]
async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
// handle reading the empty blob.
if digest.as_slice() == blake3::hash(b"").as_bytes() {
return Ok(Some(Box::new(Cursor::new(b"")) as Box<dyn BlobReader>));
}
match self
.object_store
.get(&derive_chunk_path(&self.base_path, digest))
.await
{
Ok(res) => {
// handle reading blobs that are small enough to fit inside a single chunk:
// fetch the entire chunk into memory, decompress, ensure the b3 digest matches,
// and return a io::Cursor over that data.
// FUTUREWORK: use zstd::bulk to prevent decompression bombs
let chunk_raw_bytes = res.bytes().await?;
let chunk_contents = zstd::stream::decode_all(Cursor::new(chunk_raw_bytes))?;
if *digest != blake3::hash(&chunk_contents).as_bytes().into() {
Err(io::Error::other("chunk contents invalid"))?;
}
Ok(Some(Box::new(Cursor::new(chunk_contents))))
}
Err(object_store::Error::NotFound { .. }) => {
// NOTE: For public-facing things, we would want to stop here.
// Clients should fetch granularly, so they can make use of
// chunks they have locally.
// However, if this is used directly, without any caches, do the
// assembly here.
// This is subject to change, once we have store composition.
// TODO: make this configurable, and/or clarify behaviour for
// the gRPC server surface (explicitly document behaviour in the
// proto docs)
if let Some(chunks) = self.chunks(digest).await? {
let chunked_reader = ChunkedReader::from_chunks(
chunks.into_iter().map(|chunk| {
(
chunk.digest.try_into().expect("invalid b3 digest"),
chunk.size,
)
}),
Arc::new(self.clone()) as Arc<dyn BlobService>,
);
Ok(Some(Box::new(chunked_reader)))
} else {
// This is neither a chunk nor a blob, return None.
Ok(None)
}
}
Err(e) => Err(e.into()),
}
}
#[instrument(skip_all)]
async fn open_write(&self) -> Box<dyn BlobWriter> {
// ObjectStoreBlobWriter implements AsyncWrite, but all the chunking
// needs an AsyncRead, so we create a pipe here.
// In its `AsyncWrite` implementation, `ObjectStoreBlobWriter` delegates
// writes to w. It periodically polls the future that's reading from the
// other side.
let (w, r) = tokio::io::duplex(self.avg_chunk_size as usize * 10);
Box::new(ObjectStoreBlobWriter {
writer: Some(w),
fut: Some(Box::pin(chunk_and_upload(
r,
self.object_store.clone(),
self.base_path.clone(),
self.avg_chunk_size / 2,
self.avg_chunk_size,
self.avg_chunk_size * 2,
))),
fut_output: None,
})
}
#[instrument(skip_all, err, fields(blob.digest=%digest))]
async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
match self
.object_store
.get(&derive_blob_path(&self.base_path, digest))
.await
{
Ok(get_result) => {
// fetch the data at the blob path
let blob_data = get_result.bytes().await?;
// parse into StatBlobResponse
let stat_blob_response: StatBlobResponse = StatBlobResponse::decode(blob_data)?;
debug!(
chunk.count = stat_blob_response.chunks.len(),
blob.size = stat_blob_response
.chunks
.iter()
.map(|x| x.size)
.sum::<u64>(),
"found more granular chunks"
);
Ok(Some(stat_blob_response.chunks))
}
Err(object_store::Error::NotFound { .. }) => {
// If there's only a chunk, we must return the empty vec here, rather than None.
match self
.object_store
.head(&derive_chunk_path(&self.base_path, digest))
.await
{
Ok(_) => {
// present, but no more chunks available
debug!("found a single chunk");
Ok(Some(vec![]))
}
Err(object_store::Error::NotFound { .. }) => {
// Neither blob nor single chunk found
debug!("not found");
Ok(None)
}
// error checking for chunk
Err(e) => Err(e.into()),
}
}
// error checking for blob
Err(err) => Err(err.into()),
}
}
}
fn default_avg_chunk_size() -> u32 {
256 * 1024
}
#[derive(serde::Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ObjectStoreBlobServiceConfig {
object_store_url: String,
#[serde(default = "default_avg_chunk_size")]
avg_chunk_size: u32,
object_store_options: HashMap<String, String>,
}
impl TryFrom<url::Url> for ObjectStoreBlobServiceConfig {
type Error = Box<dyn std::error::Error + Send + Sync>;
/// Constructs a new [ObjectStoreBlobService] from a [Url] supported by
/// [object_store].
/// Any path suffix becomes the base path of the object store.
/// additional options, the same as in [object_store::parse_url_opts] can
/// be passed.
fn try_from(url: url::Url) -> Result<Self, Self::Error> {
// We need to convert the URL to string, strip the prefix there, and then
// parse it back as url, as Url::set_scheme() rejects some of the transitions we want to do.
let trimmed_url = {
let s = url.to_string();
let mut url = Url::parse(
s.strip_prefix("objectstore+")
.ok_or(Error::StorageError("Missing objectstore uri".into()))?,
)?;
// trim the query pairs, they might contain credentials or local settings we don't want to send as-is.
url.set_query(None);
url
};
Ok(ObjectStoreBlobServiceConfig {
object_store_url: trimmed_url.into(),
object_store_options: url
.query_pairs()
.into_iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect(),
avg_chunk_size: 256 * 1024,
})
}
}
#[async_trait]
impl ServiceBuilder for ObjectStoreBlobServiceConfig {
type Output = dyn BlobService;
async fn build<'a>(
&'a self,
_instance_name: &str,
_context: &CompositionContext,
) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
let (object_store, path) = object_store::parse_url_opts(
&self.object_store_url.parse()?,
&self.object_store_options,
)?;
Ok(Arc::new(ObjectStoreBlobService {
object_store: Arc::new(object_store),
base_path: path,
avg_chunk_size: self.avg_chunk_size,
}))
}
}
/// Reads blob contents from a AsyncRead, chunks and uploads them.
/// On success, returns a [StatBlobResponse] pointing to the individual chunks.
#[instrument(skip_all, fields(base_path=%base_path, min_chunk_size, avg_chunk_size, max_chunk_size), err)]
async fn chunk_and_upload<R: AsyncRead + Unpin>(
r: R,
object_store: Arc<dyn ObjectStore>,
base_path: Path,
min_chunk_size: u32,
avg_chunk_size: u32,
max_chunk_size: u32,
) -> io::Result<B3Digest> {
// wrap reader with something calculating the blake3 hash of all data read.
let mut b3_r = B3HashingReader::from(r);
// set up a fastcdc chunker
let mut chunker =
AsyncStreamCDC::new(&mut b3_r, min_chunk_size, avg_chunk_size, max_chunk_size);
/// This really should just belong into the closure at
/// `chunker.as_stream().then(|_| { … })``, but if we try to, rustc spits
/// higher-ranked lifetime errors at us.
async fn fastcdc_chunk_uploader(
resp: Result<fastcdc::v2020::ChunkData, fastcdc::v2020::Error>,
base_path: Path,
object_store: Arc<dyn ObjectStore>,
) -> std::io::Result<ChunkMeta> {
let chunk_data = resp?;
let chunk_digest: B3Digest = blake3::hash(&chunk_data.data).as_bytes().into();
let chunk_path = derive_chunk_path(&base_path, &chunk_digest);
upload_chunk(object_store, chunk_digest, chunk_path, chunk_data.data).await
}
// Use the fastcdc chunker to produce a stream of chunks, and upload these
// that don't exist to the backend.
let chunks = chunker
.as_stream()
.then(|resp| fastcdc_chunk_uploader(resp, base_path.clone(), object_store.clone()))
.collect::<io::Result<Vec<ChunkMeta>>>()
.await?;
let chunks = if chunks.len() < 2 {
// The chunker returned only one chunk, which is the entire blob.
// According to the protocol, we must return an empty list of chunks
// when the blob is not split up further.
vec![]
} else {
chunks
};
let stat_blob_response = StatBlobResponse {
chunks,
bao: "".into(), // still todo
};
// check for Blob, if it doesn't exist, persist.
let blob_digest: B3Digest = b3_r.digest().into();
let blob_path = derive_blob_path(&base_path, &blob_digest);
match object_store.head(&blob_path).await {
// blob already exists, nothing to do
Ok(_) => {
trace!(
blob.digest = %blob_digest,
blob.path = %blob_path,
"blob already exists on backend"
);
}
// chunk does not yet exist, upload first
Err(object_store::Error::NotFound { .. }) => {
debug!(
blob.digest = %blob_digest,
blob.path = %blob_path,
"uploading blob"
);
object_store
.put(&blob_path, stat_blob_response.encode_to_vec().into())
.await?;
}
Err(err) => {
// other error
Err(err)?
}
}
Ok(blob_digest)
}
/// upload chunk if it doesn't exist yet.
#[instrument(skip_all, fields(chunk.digest = %chunk_digest, chunk.size = chunk_data.len(), chunk.path = %chunk_path), err)]
async fn upload_chunk(
object_store: Arc<dyn ObjectStore>,
chunk_digest: B3Digest,
chunk_path: Path,
chunk_data: Vec<u8>,
) -> std::io::Result<ChunkMeta> {
let chunk_size = chunk_data.len();
match object_store.head(&chunk_path).await {
// chunk already exists, nothing to do
Ok(_) => {
debug!("chunk already exists");
}
// chunk does not yet exist, compress and upload.
Err(object_store::Error::NotFound { .. }) => {
let chunk_data_compressed =
zstd::encode_all(Cursor::new(chunk_data), zstd::DEFAULT_COMPRESSION_LEVEL)?;
debug!(chunk.compressed_size=%chunk_data_compressed.len(), "uploading chunk");
object_store
.as_ref()
.put(&chunk_path, chunk_data_compressed.into())
.await?;
}
// other error
Err(err) => Err(err)?,
}
Ok(ChunkMeta {
digest: chunk_digest.into(),
size: chunk_size as u64,
})
}
pin_project! {
/// Takes care of blob uploads.
/// All writes are relayed to self.writer, and we continuously poll the
/// future (which will internally read from the other side of the pipe and
/// upload chunks).
/// Our BlobWriter::close() needs to drop self.writer, so the other side
/// will read EOF and can finalize the blob.
/// The future should then resolve and return the blob digest.
pub struct ObjectStoreBlobWriter<W, Fut>
where
W: AsyncWrite,
Fut: Future,
{
#[pin]
writer: Option<W>,
#[pin]
fut: Option<Fut>,
fut_output: Option<io::Result<B3Digest>>
}
}
impl<W, Fut> tokio::io::AsyncWrite for ObjectStoreBlobWriter<W, Fut>
where
W: AsyncWrite + Send + Unpin,
Fut: Future,
{
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, io::Error>> {
let this = self.project();
// poll the future.
let fut = this.fut.as_pin_mut().expect("not future");
let fut_p = fut.poll(cx);
// if it's ready, the only way this could have happened is that the
// upload failed, because we're only closing `self.writer` after all
// writes happened.
if fut_p.is_ready() {
return Poll::Ready(Err(io::Error::other("upload failed")));
}
// write to the underlying writer
this.writer
.as_pin_mut()
.expect("writer must be some")
.poll_write(cx, buf)
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), io::Error>> {
let this = self.project();
// poll the future.
let fut = this.fut.as_pin_mut().expect("not future");
let fut_p = fut.poll(cx);
// if it's ready, the only way this could have happened is that the
// upload failed, because we're only closing `self.writer` after all
// writes happened.
if fut_p.is_ready() {
return Poll::Ready(Err(io::Error::other("upload failed")));
}
// Call poll_flush on the writer
this.writer
.as_pin_mut()
.expect("writer must be some")
.poll_flush(cx)
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), io::Error>> {
// There's nothing to do on shutdown. We might have written some chunks
// that are nowhere else referenced, but cleaning them up here would be racy.
std::task::Poll::Ready(Ok(()))
}
}
#[async_trait]
impl<W, Fut> BlobWriter for ObjectStoreBlobWriter<W, Fut>
where
W: AsyncWrite + Send + Unpin,
Fut: Future<Output = io::Result<B3Digest>> + Send + Unpin,
{
async fn close(&mut self) -> io::Result<B3Digest> {
match self.writer.take() {
Some(mut writer) => {
// shut down the writer, so the other side will read EOF.
writer.shutdown().await?;
// take out the future.
let fut = self.fut.take().expect("fut must be some");
// await it.
let resp = pin!(fut).await;
match resp.as_ref() {
// In the case of an Ok value, we store it in self.fut_output,
// so future calls to close can return that.
Ok(b3_digest) => {
self.fut_output = Some(Ok(b3_digest.clone()));
}
Err(e) => {
// for the error type, we need to cheat a bit, as
// they're not clone-able.
// Simply store a sloppy clone, with the same ErrorKind and message there.
self.fut_output = Some(Err(std::io::Error::new(e.kind(), e.to_string())))
}
}
resp
}
None => {
// called a second time, return self.fut_output.
match self.fut_output.as_ref().unwrap() {
Ok(ref b3_digest) => Ok(b3_digest.clone()),
Err(e) => Err(std::io::Error::new(e.kind(), e.to_string())),
}
}
}
}
}
#[cfg(test)]
mod test {
use super::{chunk_and_upload, default_avg_chunk_size};
use crate::{
blobservice::{BlobService, ObjectStoreBlobService},
fixtures::{BLOB_A, BLOB_A_DIGEST, BLOB_B, BLOB_B_DIGEST},
};
use std::{io::Cursor, sync::Arc};
use url::Url;
/// Tests chunk_and_upload directly, bypassing the BlobWriter at open_write().
#[rstest::rstest]
#[case::a(&BLOB_A, &BLOB_A_DIGEST)]
#[case::b(&BLOB_B, &BLOB_B_DIGEST)]
#[tokio::test]
async fn test_chunk_and_upload(
#[case] blob: &bytes::Bytes,
#[case] blob_digest: &crate::B3Digest,
) {
let (object_store, base_path) =
object_store::parse_url(&Url::parse("memory:///").unwrap()).unwrap();
let object_store: Arc<dyn object_store::ObjectStore> = Arc::from(object_store);
let blobsvc = Arc::new(ObjectStoreBlobService {
object_store: object_store.clone(),
avg_chunk_size: default_avg_chunk_size(),
base_path,
});
let inserted_blob_digest = chunk_and_upload(
&mut Cursor::new(blob.to_vec()),
object_store,
object_store::path::Path::from("/"),
1024 / 2,
1024,
1024 * 2,
)
.await
.expect("chunk_and_upload succeeds");
assert_eq!(blob_digest.clone(), inserted_blob_digest);
// Now we should have the blob
assert!(blobsvc.has(blob_digest).await.unwrap());
// Check if it was chunked correctly
let chunks = blobsvc.chunks(blob_digest).await.unwrap().unwrap();
if blob.len() < 1024 / 2 {
// The blob is smaller than the min chunk size, it should have been inserted as a whole
assert!(chunks.is_empty());
} else if blob.len() > 1024 * 2 {
// The blob is larger than the max chunk size, make sure it was split up into at least
// two chunks
assert!(chunks.len() >= 2);
}
}
}