1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::task::{Context, Poll};
use crate::{PutPayload, PutPayloadMut, PutResult, Result};
use async_trait::async_trait;
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::ready;
use tokio::task::JoinSet;
/// An upload part request
pub type UploadPart = BoxFuture<'static, Result<()>>;
/// A trait allowing writing an object in fixed size chunks
///
/// Consecutive chunks of data can be written by calling [`MultipartUpload::put_part`] and polling
/// the returned futures to completion. Multiple futures returned by [`MultipartUpload::put_part`]
/// may be polled in parallel, allowing for concurrent uploads.
///
/// Once all part uploads have been polled to completion, the upload can be completed by
/// calling [`MultipartUpload::complete`]. This will make the entire uploaded object visible
/// as an atomic operation.It is implementation behind behaviour if [`MultipartUpload::complete`]
/// is called before all [`UploadPart`] have been polled to completion.
#[async_trait]
pub trait MultipartUpload: Send + std::fmt::Debug {
/// Upload the next part
///
/// Most stores require that all parts excluding the last are at least 5 MiB, and some
/// further require that all parts excluding the last be the same size, e.g. [R2].
/// Clients wanting to maximise compatibility should therefore perform writes in
/// fixed size blocks larger than 5 MiB.
///
/// Implementations may invoke this method multiple times and then await on the
/// returned futures in parallel
///
/// ```no_run
/// # use futures::StreamExt;
/// # use object_store::MultipartUpload;
/// #
/// # async fn test() {
/// #
/// let mut upload: Box<&dyn MultipartUpload> = todo!();
/// let p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into());
/// let p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into());
/// futures::future::try_join(p1, p2).await.unwrap();
/// upload.complete().await.unwrap();
/// # }
/// ```
///
/// [R2]: https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations
fn put_part(&mut self, data: PutPayload) -> UploadPart;
/// Complete the multipart upload
///
/// It is implementation defined behaviour if this method is called before polling
/// all [`UploadPart`] returned by [`MultipartUpload::put_part`] to completion. Additionally,
/// it is implementation defined behaviour to call [`MultipartUpload::complete`]
/// on an already completed or aborted [`MultipartUpload`].
async fn complete(&mut self) -> Result<PutResult>;
/// Abort the multipart upload
///
/// If a [`MultipartUpload`] is dropped without calling [`MultipartUpload::complete`],
/// some object stores will automatically clean up any previously uploaded parts.
/// However, some stores, such as S3 and GCS, cannot perform cleanup on drop.
/// As such [`MultipartUpload::abort`] can be invoked to perform this cleanup.
///
/// It will not be possible to call `abort` in all failure scenarios, for example
/// non-graceful shutdown of the calling application. It is therefore recommended
/// object stores are configured with lifecycle rules to automatically cleanup
/// unused parts older than some threshold. See [crate::aws] and [crate::gcp]
/// for more information.
///
/// It is implementation defined behaviour to call [`MultipartUpload::abort`]
/// on an already completed or aborted [`MultipartUpload`]
async fn abort(&mut self) -> Result<()>;
}
#[async_trait]
impl<W: MultipartUpload + ?Sized> MultipartUpload for Box<W> {
fn put_part(&mut self, data: PutPayload) -> UploadPart {
(**self).put_part(data)
}
async fn complete(&mut self) -> Result<PutResult> {
(**self).complete().await
}
async fn abort(&mut self) -> Result<()> {
(**self).abort().await
}
}
/// A synchronous write API for uploading data in parallel in fixed size chunks
///
/// Uses multiple tokio tasks in a [`JoinSet`] to multiplex upload tasks in parallel
///
/// The design also takes inspiration from [`Sink`] with [`WriteMultipart::wait_for_capacity`]
/// allowing back pressure on producers, prior to buffering the next part. However, unlike
/// [`Sink`] this back pressure is optional, allowing integration with synchronous producers
///
/// [`Sink`]: futures::sink::Sink
#[derive(Debug)]
pub struct WriteMultipart {
upload: Box<dyn MultipartUpload>,
buffer: PutPayloadMut,
chunk_size: usize,
tasks: JoinSet<Result<()>>,
}
impl WriteMultipart {
/// Create a new [`WriteMultipart`] that will upload using 5MB chunks
pub fn new(upload: Box<dyn MultipartUpload>) -> Self {
Self::new_with_chunk_size(upload, 5 * 1024 * 1024)
}
/// Create a new [`WriteMultipart`] that will upload in fixed `chunk_size` sized chunks
pub fn new_with_chunk_size(upload: Box<dyn MultipartUpload>, chunk_size: usize) -> Self {
Self {
upload,
chunk_size,
buffer: PutPayloadMut::new(),
tasks: Default::default(),
}
}
/// Polls for there to be less than `max_concurrency` [`UploadPart`] in progress
///
/// See [`Self::wait_for_capacity`] for an async version of this function
pub fn poll_for_capacity(
&mut self,
cx: &mut Context<'_>,
max_concurrency: usize,
) -> Poll<Result<()>> {
while !self.tasks.is_empty() && self.tasks.len() >= max_concurrency {
ready!(self.tasks.poll_join_next(cx)).unwrap()??
}
Poll::Ready(Ok(()))
}
/// Wait until there are less than `max_concurrency` [`UploadPart`] in progress
///
/// See [`Self::poll_for_capacity`] for a [`Poll`] version of this function
pub async fn wait_for_capacity(&mut self, max_concurrency: usize) -> Result<()> {
futures::future::poll_fn(|cx| self.poll_for_capacity(cx, max_concurrency)).await
}
/// Write data to this [`WriteMultipart`]
///
/// Data is buffered using [`PutPayloadMut::extend_from_slice`]. Implementations looking to
/// write data from owned buffers may prefer [`Self::put`] as this avoids copying.
///
/// Note this method is synchronous (not `async`) and will immediately
/// start new uploads as soon as the internal `chunk_size` is hit,
/// regardless of how many outstanding uploads are already in progress.
///
/// Back pressure can optionally be applied to producers by calling
/// [`Self::wait_for_capacity`] prior to calling this method
pub fn write(&mut self, mut buf: &[u8]) {
while !buf.is_empty() {
let remaining = self.chunk_size - self.buffer.content_length();
let to_read = buf.len().min(remaining);
self.buffer.extend_from_slice(&buf[..to_read]);
if to_read == remaining {
let buffer = std::mem::take(&mut self.buffer);
self.put_part(buffer.into())
}
buf = &buf[to_read..]
}
}
/// Put a chunk of data into this [`WriteMultipart`] without copying
///
/// Data is buffered using [`PutPayloadMut::push`]. Implementations looking to
/// perform writes from non-owned buffers should prefer [`Self::write`] as this
/// will allow multiple calls to share the same underlying allocation.
///
/// See [`Self::write`] for information on backpressure
pub fn put(&mut self, mut bytes: Bytes) {
while !bytes.is_empty() {
let remaining = self.chunk_size - self.buffer.content_length();
if bytes.len() < remaining {
self.buffer.push(bytes);
return;
}
self.buffer.push(bytes.split_to(remaining));
let buffer = std::mem::take(&mut self.buffer);
self.put_part(buffer.into())
}
}
pub(crate) fn put_part(&mut self, part: PutPayload) {
self.tasks.spawn(self.upload.put_part(part));
}
/// Abort this upload, attempting to clean up any successfully uploaded parts
pub async fn abort(mut self) -> Result<()> {
self.tasks.shutdown().await;
self.upload.abort().await
}
/// Flush final chunk, and await completion of all in-flight requests
pub async fn finish(mut self) -> Result<PutResult> {
if !self.buffer.is_empty() {
let part = std::mem::take(&mut self.buffer);
self.put_part(part.into())
}
self.wait_for_capacity(0).await?;
match self.upload.complete().await {
Err(e) => {
self.tasks.shutdown().await;
self.upload.abort().await?;
Err(e)
}
Ok(result) => Ok(result),
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use futures::FutureExt;
use parking_lot::Mutex;
use rand::prelude::StdRng;
use rand::{Rng, SeedableRng};
use crate::memory::InMemory;
use crate::path::Path;
use crate::throttle::{ThrottleConfig, ThrottledStore};
use crate::ObjectStore;
use super::*;
#[tokio::test]
async fn test_concurrency() {
let config = ThrottleConfig {
wait_put_per_call: Duration::from_millis(1),
..Default::default()
};
let path = Path::from("foo");
let store = ThrottledStore::new(InMemory::new(), config);
let upload = store.put_multipart(&path).await.unwrap();
let mut write = WriteMultipart::new_with_chunk_size(upload, 10);
for _ in 0..20 {
write.write(&[0; 5]);
}
assert!(write.wait_for_capacity(10).now_or_never().is_none());
write.wait_for_capacity(10).await.unwrap()
}
#[derive(Debug, Default)]
struct InstrumentedUpload {
chunks: Arc<Mutex<Vec<PutPayload>>>,
}
#[async_trait]
impl MultipartUpload for InstrumentedUpload {
fn put_part(&mut self, data: PutPayload) -> UploadPart {
self.chunks.lock().push(data);
futures::future::ready(Ok(())).boxed()
}
async fn complete(&mut self) -> Result<PutResult> {
Ok(PutResult {
e_tag: None,
version: None,
})
}
async fn abort(&mut self) -> Result<()> {
unimplemented!()
}
}
#[tokio::test]
async fn test_write_multipart() {
let mut rng = StdRng::seed_from_u64(42);
for method in [0.0, 0.5, 1.0] {
for _ in 0..10 {
for chunk_size in [1, 17, 23] {
let upload = Box::<InstrumentedUpload>::default();
let chunks = Arc::clone(&upload.chunks);
let mut write = WriteMultipart::new_with_chunk_size(upload, chunk_size);
let mut expected = Vec::with_capacity(1024);
for _ in 0..50 {
let chunk_size = rng.gen_range(0..30);
let data: Vec<_> = (0..chunk_size).map(|_| rng.gen()).collect();
expected.extend_from_slice(&data);
match rng.gen_bool(method) {
true => write.put(data.into()),
false => write.write(&data),
}
}
write.finish().await.unwrap();
let chunks = chunks.lock();
let actual: Vec<_> = chunks.iter().flatten().flatten().copied().collect();
assert_eq!(expected, actual);
for chunk in chunks.iter().take(chunks.len() - 1) {
assert_eq!(chunk.content_length(), chunk_size)
}
let last_chunk = chunks.last().unwrap().content_length();
assert!(last_chunk <= chunk_size, "{chunk_size}");
}
}
}
}
}