use bytes::Bytes;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct PutPayload(Arc<[Bytes]>);
impl Default for PutPayload {
fn default() -> Self {
Self(Arc::new([]))
}
}
impl PutPayload {
pub fn new() -> Self {
Self::default()
}
pub fn from_static(s: &'static [u8]) -> Self {
s.into()
}
pub fn from_bytes(s: Bytes) -> Self {
s.into()
}
#[cfg(feature = "cloud")]
pub(crate) fn body(&self) -> reqwest::Body {
reqwest::Body::wrap_stream(futures::stream::iter(
self.clone().into_iter().map(Ok::<_, crate::Error>),
))
}
pub fn content_length(&self) -> usize {
self.0.iter().map(|b| b.len()).sum()
}
pub fn iter(&self) -> PutPayloadIter<'_> {
PutPayloadIter(self.0.iter())
}
}
impl AsRef<[Bytes]> for PutPayload {
fn as_ref(&self) -> &[Bytes] {
self.0.as_ref()
}
}
impl<'a> IntoIterator for &'a PutPayload {
type Item = &'a Bytes;
type IntoIter = PutPayloadIter<'a>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
impl IntoIterator for PutPayload {
type Item = Bytes;
type IntoIter = PutPayloadIntoIter;
fn into_iter(self) -> Self::IntoIter {
PutPayloadIntoIter {
payload: self,
idx: 0,
}
}
}
#[derive(Debug)]
pub struct PutPayloadIter<'a>(std::slice::Iter<'a, Bytes>);
impl<'a> Iterator for PutPayloadIter<'a> {
type Item = &'a Bytes;
fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
}
}
#[derive(Debug)]
pub struct PutPayloadIntoIter {
payload: PutPayload,
idx: usize,
}
impl Iterator for PutPayloadIntoIter {
type Item = Bytes;
fn next(&mut self) -> Option<Self::Item> {
let p = self.payload.0.get(self.idx)?.clone();
self.idx += 1;
Some(p)
}
fn size_hint(&self) -> (usize, Option<usize>) {
let l = self.payload.0.len() - self.idx;
(l, Some(l))
}
}
impl From<Bytes> for PutPayload {
fn from(value: Bytes) -> Self {
Self(Arc::new([value]))
}
}
impl From<Vec<u8>> for PutPayload {
fn from(value: Vec<u8>) -> Self {
Self(Arc::new([value.into()]))
}
}
impl From<&'static str> for PutPayload {
fn from(value: &'static str) -> Self {
Bytes::from(value).into()
}
}
impl From<&'static [u8]> for PutPayload {
fn from(value: &'static [u8]) -> Self {
Bytes::from(value).into()
}
}
impl From<String> for PutPayload {
fn from(value: String) -> Self {
Bytes::from(value).into()
}
}
impl FromIterator<u8> for PutPayload {
fn from_iter<T: IntoIterator<Item = u8>>(iter: T) -> Self {
Bytes::from_iter(iter).into()
}
}
impl FromIterator<Bytes> for PutPayload {
fn from_iter<T: IntoIterator<Item = Bytes>>(iter: T) -> Self {
Self(iter.into_iter().collect())
}
}
impl From<PutPayload> for Bytes {
fn from(value: PutPayload) -> Self {
match value.0.len() {
0 => Self::new(),
1 => value.0[0].clone(),
_ => {
let mut buf = Vec::with_capacity(value.content_length());
value.iter().for_each(|x| buf.extend_from_slice(x));
buf.into()
}
}
}
}
#[derive(Debug)]
pub struct PutPayloadMut {
len: usize,
completed: Vec<Bytes>,
in_progress: Vec<u8>,
block_size: usize,
}
impl Default for PutPayloadMut {
fn default() -> Self {
Self {
len: 0,
completed: vec![],
in_progress: vec![],
block_size: 8 * 1024,
}
}
}
impl PutPayloadMut {
pub fn new() -> Self {
Self::default()
}
pub fn with_block_size(self, block_size: usize) -> Self {
Self { block_size, ..self }
}
pub fn extend_from_slice(&mut self, slice: &[u8]) {
let remaining = self.in_progress.capacity() - self.in_progress.len();
let to_copy = remaining.min(slice.len());
self.in_progress.extend_from_slice(&slice[..to_copy]);
if self.in_progress.capacity() == self.in_progress.len() {
let new_cap = self.block_size.max(slice.len() - to_copy);
let completed = std::mem::replace(&mut self.in_progress, Vec::with_capacity(new_cap));
if !completed.is_empty() {
self.completed.push(completed.into())
}
self.in_progress.extend_from_slice(&slice[to_copy..])
}
self.len += slice.len();
}
pub fn push(&mut self, bytes: Bytes) {
if !self.in_progress.is_empty() {
let completed = std::mem::take(&mut self.in_progress);
self.completed.push(completed.into())
}
self.len += bytes.len();
self.completed.push(bytes);
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len == 0
}
#[inline]
pub fn content_length(&self) -> usize {
self.len
}
pub fn freeze(mut self) -> PutPayload {
if !self.in_progress.is_empty() {
let completed = std::mem::take(&mut self.in_progress).into();
self.completed.push(completed);
}
PutPayload(self.completed.into())
}
}
impl From<PutPayloadMut> for PutPayload {
fn from(value: PutPayloadMut) -> Self {
value.freeze()
}
}
#[cfg(test)]
mod test {
use crate::PutPayloadMut;
#[test]
fn test_put_payload() {
let mut chunk = PutPayloadMut::new().with_block_size(23);
chunk.extend_from_slice(&[1; 16]);
chunk.extend_from_slice(&[2; 32]);
chunk.extend_from_slice(&[2; 5]);
chunk.extend_from_slice(&[2; 21]);
chunk.extend_from_slice(&[2; 40]);
chunk.extend_from_slice(&[0; 0]);
chunk.push("foobar".into());
let payload = chunk.freeze();
assert_eq!(payload.content_length(), 120);
let chunks = payload.as_ref();
assert_eq!(chunks.len(), 6);
assert_eq!(chunks[0].len(), 23);
assert_eq!(chunks[1].len(), 25); assert_eq!(chunks[2].len(), 23);
assert_eq!(chunks[3].len(), 23);
assert_eq!(chunks[4].len(), 20);
assert_eq!(chunks[5].len(), 6);
}
#[test]
fn test_content_length() {
let mut chunk = PutPayloadMut::new();
chunk.push(vec![0; 23].into());
assert_eq!(chunk.content_length(), 23);
chunk.extend_from_slice(&[0; 4]);
assert_eq!(chunk.content_length(), 27);
chunk.push(vec![0; 121].into());
assert_eq!(chunk.content_length(), 148);
let payload = chunk.freeze();
assert_eq!(payload.content_length(), 148);
}
}