1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
use std::{
    io::{Cursor, Write},
    sync::Arc,
};

use tokio::{
    io::AsyncRead,
    sync::Semaphore,
    task::{JoinError, JoinSet},
};
use tokio_util::io::InspectReader;
use tracing::{info_span, Instrument};

use crate::{blobservice::BlobService, B3Digest, Path, PathBuf};

/// Files smaller than this threshold, in bytes, are uploaded to the [BlobService] in the
/// background.
///
/// This is a u32 since we acquire a weighted semaphore using the size of the blob.
/// [Semaphore::acquire_many_owned] takes a u32, so we need to ensure the size of
/// the blob can be represented using a u32 and will not cause an overflow.
const CONCURRENT_BLOB_UPLOAD_THRESHOLD: u32 = 1024 * 1024;

/// The maximum amount of bytes allowed to be buffered in memory to perform async blob uploads.
const MAX_BUFFER_SIZE: usize = 128 * 1024 * 1024;

#[derive(Debug, thiserror::Error)]
pub enum Error {
    #[error("unable to read blob contents for {0}: {1}")]
    BlobRead(PathBuf, std::io::Error),

    #[error("unable to check whether blob at {0} already exists: {1}")]
    BlobCheck(PathBuf, std::io::Error),

    // FUTUREWORK: proper error for blob finalize
    #[error("unable to finalize blob {0}: {1}")]
    BlobFinalize(PathBuf, std::io::Error),

    #[error("unexpected size for {path} wanted: {wanted} got: {got}")]
    UnexpectedSize {
        path: PathBuf,
        wanted: u64,
        got: u64,
    },

    #[error("blob upload join error: {0}")]
    JoinError(#[from] JoinError),
}

/// The concurrent blob uploader provides a mechanism for concurrently uploading small blobs.
/// This is useful when ingesting from sources like tarballs and archives which each blob entry
/// must be read sequentially. Ingesting many small blobs sequentially becomes slow due to
/// round trip time with the blob service. The concurrent blob uploader will buffer small
/// blobs in memory and upload them to the blob service in the background.
///
/// Once all blobs have been uploaded, make sure to call [ConcurrentBlobUploader::join] to wait
/// for all background jobs to complete and check for any errors.
pub struct ConcurrentBlobUploader<BS> {
    blob_service: BS,
    upload_tasks: JoinSet<Result<(), Error>>,
    upload_semaphore: Arc<Semaphore>,
}

impl<BS> ConcurrentBlobUploader<BS>
where
    BS: BlobService + Clone + 'static,
{
    /// Creates a new concurrent blob uploader which uploads blobs to the provided
    /// blob service.
    pub fn new(blob_service: BS) -> Self {
        Self {
            blob_service,
            upload_tasks: JoinSet::new(),
            upload_semaphore: Arc::new(Semaphore::new(MAX_BUFFER_SIZE)),
        }
    }

    /// Uploads a blob to the blob service. If the blob is small enough it will be read to a buffer
    /// and uploaded in the background.
    /// This will read the entirety of the provided reader unless an error occurs, even if blobs
    /// are uploaded in the background..
    pub async fn upload<R>(
        &mut self,
        path: &Path,
        expected_size: u64,
        mut r: R,
    ) -> Result<B3Digest, Error>
    where
        R: AsyncRead + Unpin,
    {
        if expected_size < CONCURRENT_BLOB_UPLOAD_THRESHOLD as u64 {
            let mut buffer = Vec::with_capacity(expected_size as usize);
            let mut hasher = blake3::Hasher::new();
            let mut reader = InspectReader::new(&mut r, |bytes| {
                hasher.write_all(bytes).unwrap();
            });

            let permit = self
                .upload_semaphore
                .clone()
                // This cast is safe because ensure the header_size is less than
                // CONCURRENT_BLOB_UPLOAD_THRESHOLD which is a u32.
                .acquire_many_owned(expected_size as u32)
                .await
                .unwrap();
            let size = tokio::io::copy(&mut reader, &mut buffer)
                .await
                .map_err(|e| Error::BlobRead(path.into(), e))?;
            let digest: B3Digest = hasher.finalize().as_bytes().into();

            if size != expected_size {
                return Err(Error::UnexpectedSize {
                    path: path.into(),
                    wanted: expected_size,
                    got: size,
                });
            }

            self.upload_tasks.spawn({
                let blob_service = self.blob_service.clone();
                let expected_digest = digest.clone();
                let path = path.to_owned();
                let r = Cursor::new(buffer);
                async move {
                    // We know the blob digest already, check it exists before sending it.
                    if blob_service
                        .has(&expected_digest)
                        .await
                        .map_err(|e| Error::BlobCheck(path.clone(), e))?
                    {
                        drop(permit);
                        return Ok(());
                    }

                    let digest = upload_blob(&blob_service, &path, expected_size, r).await?;

                    assert_eq!(digest, expected_digest, "Tvix bug: blob digest mismatch");

                    // Make sure we hold the permit until we finish writing the blob
                    // to the [BlobService].
                    drop(permit);
                    Ok(())
                }
                .instrument(info_span!("upload_task"))
            });

            return Ok(digest);
        }

        upload_blob(&self.blob_service, path, expected_size, r).await
    }

    /// Waits for all background upload jobs to complete, returning any upload errors.
    pub async fn join(mut self) -> Result<(), Error> {
        while let Some(result) = self.upload_tasks.join_next().await {
            result??;
        }
        Ok(())
    }
}

async fn upload_blob<BS, R>(
    blob_service: &BS,
    path: &Path,
    expected_size: u64,
    mut r: R,
) -> Result<B3Digest, Error>
where
    BS: BlobService,
    R: AsyncRead + Unpin,
{
    let mut writer = blob_service.open_write().await;

    let size = tokio::io::copy(&mut r, &mut writer)
        .await
        .map_err(|e| Error::BlobRead(path.into(), e))?;

    let digest = writer
        .close()
        .await
        .map_err(|e| Error::BlobFinalize(path.into(), e))?;

    if size != expected_size {
        return Err(Error::UnexpectedSize {
            path: path.into(),
            wanted: expected_size,
            got: size,
        });
    }

    Ok(digest)
}