use std::{
cmp,
collections::VecDeque,
path::Path,
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::{Context, Poll},
};
use tokio::{
io::{self, AsyncRead as Read, AsyncReadExt},
sync::Mutex,
};
use tokio_stream::*;
use crate::{
entry::{EntryFields, EntryIo},
error::TarError,
other, Entry, GnuExtSparseHeader, GnuSparseHeader, Header,
};
#[derive(Debug)]
pub struct Archive<R: Read + Unpin> {
inner: Arc<ArchiveInner<R>>,
}
impl<R: Read + Unpin> Clone for Archive<R> {
fn clone(&self) -> Self {
Archive {
inner: self.inner.clone(),
}
}
}
#[derive(Debug)]
pub struct ArchiveInner<R> {
pos: AtomicU64,
unpack_xattrs: bool,
preserve_permissions: bool,
preserve_mtime: bool,
ignore_zeros: bool,
obj: Mutex<R>,
}
pub struct ArchiveBuilder<R: Read + Unpin> {
obj: R,
unpack_xattrs: bool,
preserve_permissions: bool,
preserve_mtime: bool,
ignore_zeros: bool,
}
impl<R: Read + Unpin> ArchiveBuilder<R> {
pub fn new(obj: R) -> Self {
ArchiveBuilder {
unpack_xattrs: false,
preserve_permissions: false,
preserve_mtime: true,
ignore_zeros: false,
obj,
}
}
pub fn set_unpack_xattrs(mut self, unpack_xattrs: bool) -> Self {
self.unpack_xattrs = unpack_xattrs;
self
}
pub fn set_preserve_permissions(mut self, preserve: bool) -> Self {
self.preserve_permissions = preserve;
self
}
pub fn set_preserve_mtime(mut self, preserve: bool) -> Self {
self.preserve_mtime = preserve;
self
}
pub fn set_ignore_zeros(mut self, ignore_zeros: bool) -> Self {
self.ignore_zeros = ignore_zeros;
self
}
pub fn build(self) -> Archive<R> {
let Self {
unpack_xattrs,
preserve_permissions,
preserve_mtime,
ignore_zeros,
obj,
} = self;
Archive {
inner: Arc::new(ArchiveInner {
unpack_xattrs,
preserve_permissions,
preserve_mtime,
ignore_zeros,
obj: Mutex::new(obj),
pos: 0.into(),
}),
}
}
}
impl<R: Read + Unpin> Archive<R> {
pub fn new(obj: R) -> Archive<R> {
Archive {
inner: Arc::new(ArchiveInner {
unpack_xattrs: false,
preserve_permissions: false,
preserve_mtime: true,
ignore_zeros: false,
obj: Mutex::new(obj),
pos: 0.into(),
}),
}
}
pub fn into_inner(self) -> Result<R, Self> {
let Self { inner } = self;
match Arc::try_unwrap(inner) {
Ok(inner) => Ok(inner.obj.into_inner()),
Err(inner) => Err(Self { inner }),
}
}
pub fn entries(&mut self) -> io::Result<Entries<R>> {
if self.inner.pos.load(Ordering::SeqCst) != 0 {
return Err(other(
"cannot call entries unless archive is at \
position 0",
));
}
Ok(Entries {
archive: self.clone(),
current: (0, None, 0, None),
gnu_longlink: None,
gnu_longname: None,
pax_extensions: None,
})
}
pub fn entries_raw(&mut self) -> io::Result<RawEntries<R>> {
if self.inner.pos.load(Ordering::SeqCst) != 0 {
return Err(other(
"cannot call entries_raw unless archive is at \
position 0",
));
}
Ok(RawEntries {
archive: self.clone(),
current: (0, None, 0),
})
}
pub async fn unpack<P: AsRef<Path>>(&mut self, dst: P) -> io::Result<()> {
let mut entries = self.entries()?;
let mut pinned = Pin::new(&mut entries);
while let Some(entry) = pinned.next().await {
let mut file = entry.map_err(|e| TarError::new("failed to iterate over archive", e))?;
file.unpack_in(dst.as_ref()).await?;
}
Ok(())
}
}
pub struct Entries<R: Read + Unpin> {
archive: Archive<R>,
current: (u64, Option<Header>, usize, Option<GnuExtSparseHeader>),
gnu_longname: Option<Vec<u8>>,
gnu_longlink: Option<Vec<u8>>,
pax_extensions: Option<Vec<u8>>,
}
macro_rules! ready_opt_err {
($val:expr) => {
match futures_core::ready!($val) {
Some(Ok(val)) => val,
Some(Err(err)) => return Poll::Ready(Some(Err(err))),
None => return Poll::Ready(None),
}
};
}
macro_rules! ready_err {
($val:expr) => {
match futures_core::ready!($val) {
Ok(val) => val,
Err(err) => return Poll::Ready(Some(Err(err))),
}
};
}
impl<R: Read + Unpin> Stream for Entries<R> {
type Item = io::Result<Entry<Archive<R>>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let archive = self.archive.clone();
let (next, current_header, current_header_pos, _) = &mut self.current;
let entry = ready_opt_err!(poll_next_raw(
archive,
next,
current_header,
current_header_pos,
cx
));
let is_recognized_header =
entry.header().as_gnu().is_some() || entry.header().as_ustar().is_some();
if is_recognized_header && entry.header().entry_type().is_gnu_longname() {
if self.gnu_longname.is_some() {
return Poll::Ready(Some(Err(other(
"two long name entries describing \
the same member",
))));
}
let mut ef = EntryFields::from(entry);
let val = ready_err!(Pin::new(&mut ef).poll_read_all(cx));
self.gnu_longname = Some(val);
continue;
}
if is_recognized_header && entry.header().entry_type().is_gnu_longlink() {
if self.gnu_longlink.is_some() {
return Poll::Ready(Some(Err(other(
"two long name entries describing \
the same member",
))));
}
let mut ef = EntryFields::from(entry);
let val = ready_err!(Pin::new(&mut ef).poll_read_all(cx));
self.gnu_longlink = Some(val);
continue;
}
if is_recognized_header && entry.header().entry_type().is_pax_local_extensions() {
if self.pax_extensions.is_some() {
return Poll::Ready(Some(Err(other(
"two pax extensions entries describing \
the same member",
))));
}
let mut ef = EntryFields::from(entry);
let val = ready_err!(Pin::new(&mut ef).poll_read_all(cx));
self.pax_extensions = Some(val);
continue;
}
let mut fields = EntryFields::from(entry);
fields.long_pathname = self.gnu_longname.take();
fields.long_linkname = self.gnu_longlink.take();
fields.pax_extensions = self.pax_extensions.take();
let archive = self.archive.clone();
let (next, _, current_pos, current_ext) = &mut self.current;
ready_err!(poll_parse_sparse_header(
archive,
next,
current_ext,
current_pos,
&mut fields,
cx
));
return Poll::Ready(Some(Ok(fields.into_entry())));
}
}
}
pub struct RawEntries<R: Read + Unpin> {
archive: Archive<R>,
current: (u64, Option<Header>, usize),
}
impl<R: Read + Unpin> Stream for RawEntries<R> {
type Item = io::Result<Entry<Archive<R>>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let archive = self.archive.clone();
let (next, current_header, current_header_pos) = &mut self.current;
poll_next_raw(archive, next, current_header, current_header_pos, cx)
}
}
fn poll_next_raw<R: Read + Unpin>(
mut archive: Archive<R>,
next: &mut u64,
current_header: &mut Option<Header>,
current_header_pos: &mut usize,
cx: &mut Context<'_>,
) -> Poll<Option<io::Result<Entry<Archive<R>>>>> {
let mut header_pos = *next;
loop {
if current_header.is_none() {
let delta = *next - archive.inner.pos.load(Ordering::SeqCst);
match futures_core::ready!(poll_skip(&mut archive, cx, delta)) {
Ok(_) => {}
Err(err) => return Poll::Ready(Some(Err(err))),
}
*current_header = Some(Header::new_old());
*current_header_pos = 0;
}
let header = current_header.as_mut().unwrap();
match futures_core::ready!(poll_try_read_all(
&mut archive,
cx,
header.as_mut_bytes(),
current_header_pos,
)) {
Ok(true) => {}
Ok(false) => return Poll::Ready(None),
Err(err) => return Poll::Ready(Some(Err(err))),
}
if !header.as_bytes().iter().all(|i| *i == 0) {
*next += 512;
break;
}
if !archive.inner.ignore_zeros {
return Poll::Ready(None);
}
*next += 512;
header_pos = *next;
}
let header = current_header.as_mut().unwrap();
let sum = header.as_bytes()[..148]
.iter()
.chain(&header.as_bytes()[156..])
.fold(0, |a, b| a + (*b as u32))
+ 8 * 32;
let cksum = header.cksum()?;
if sum != cksum {
return Poll::Ready(Some(Err(other("archive header checksum mismatch"))));
}
let file_pos = *next;
let size = header.entry_size()?;
let mut data = VecDeque::with_capacity(1);
data.push_back(EntryIo::Data(archive.clone().take(size)));
let header = current_header.take().unwrap();
let ret = EntryFields {
size,
header_pos,
file_pos,
data,
header,
long_pathname: None,
long_linkname: None,
pax_extensions: None,
unpack_xattrs: archive.inner.unpack_xattrs,
preserve_permissions: archive.inner.preserve_permissions,
preserve_mtime: archive.inner.preserve_mtime,
read_state: None,
};
let size = (size + 511) & !(512 - 1);
*next += size;
Poll::Ready(Some(Ok(ret.into_entry())))
}
fn poll_parse_sparse_header<R: Read + Unpin>(
mut archive: Archive<R>,
next: &mut u64,
current_ext: &mut Option<GnuExtSparseHeader>,
current_ext_pos: &mut usize,
entry: &mut EntryFields<Archive<R>>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
if !entry.header.entry_type().is_gnu_sparse() {
return Poll::Ready(Ok(()));
}
let gnu = match entry.header.as_gnu() {
Some(gnu) => gnu,
None => return Poll::Ready(Err(other("sparse entry type listed but not GNU header"))),
};
entry.data.truncate(0);
let mut cur = 0;
let mut remaining = entry.size;
{
let data = &mut entry.data;
let reader = archive.clone();
let size = entry.size;
let mut add_block = |block: &GnuSparseHeader| -> io::Result<_> {
if block.is_empty() {
return Ok(());
}
let off = block.offset()?;
let len = block.length()?;
if (size - remaining) % 512 != 0 {
return Err(other(
"previous block in sparse file was not \
aligned to 512-byte boundary",
));
} else if off < cur {
return Err(other(
"out of order or overlapping sparse \
blocks",
));
} else if cur < off {
let block = io::repeat(0).take(off - cur);
data.push_back(EntryIo::Pad(block));
}
cur = off
.checked_add(len)
.ok_or_else(|| other("more bytes listed in sparse file than u64 can hold"))?;
remaining = remaining.checked_sub(len).ok_or_else(|| {
other(
"sparse file consumed more data than the header \
listed",
)
})?;
data.push_back(EntryIo::Data(reader.clone().take(len)));
Ok(())
};
for block in gnu.sparse.iter() {
add_block(block)?
}
if gnu.is_extended() {
let started_header = current_ext.is_some();
if !started_header {
let mut ext = GnuExtSparseHeader::new();
ext.isextended[0] = 1;
*current_ext = Some(ext);
*current_ext_pos = 0;
}
let ext = current_ext.as_mut().unwrap();
while ext.is_extended() {
match futures_core::ready!(poll_try_read_all(
&mut archive,
cx,
ext.as_mut_bytes(),
current_ext_pos,
)) {
Ok(true) => {}
Ok(false) => return Poll::Ready(Err(other("failed to read extension"))),
Err(err) => return Poll::Ready(Err(err)),
}
*next += 512;
for block in ext.sparse.iter() {
add_block(block)?;
}
}
}
}
if cur != gnu.real_size()? {
return Poll::Ready(Err(other(
"mismatch in sparse file chunks and \
size in header",
)));
}
entry.size = cur;
if remaining > 0 {
return Poll::Ready(Err(other(
"mismatch in sparse file chunks and \
entry size in header",
)));
}
Poll::Ready(Ok(()))
}
impl<R: Read + Unpin> Read for Archive<R> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
into: &mut io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let mut r = if let Ok(v) = self.inner.obj.try_lock() {
v
} else {
return Poll::Pending;
};
let res = futures_core::ready!(Pin::new(&mut *r).poll_read(cx, into));
match res {
Ok(()) => {
self.inner
.pos
.fetch_add(into.filled().len() as u64, Ordering::SeqCst);
Poll::Ready(Ok(()))
}
Err(err) => Poll::Ready(Err(err)),
}
}
}
fn poll_try_read_all<R: Read + Unpin>(
mut source: R,
cx: &mut Context<'_>,
buf: &mut [u8],
pos: &mut usize,
) -> Poll<io::Result<bool>> {
while *pos < buf.len() {
let mut read_buf = io::ReadBuf::new(&mut buf[*pos..]);
match futures_core::ready!(Pin::new(&mut source).poll_read(cx, &mut read_buf)) {
Ok(()) if read_buf.filled().is_empty() => {
if *pos == 0 {
return Poll::Ready(Ok(false));
}
return Poll::Ready(Err(other("failed to read entire block")));
}
Ok(()) => *pos += read_buf.filled().len(),
Err(err) => return Poll::Ready(Err(err)),
}
}
*pos = 0;
Poll::Ready(Ok(true))
}
fn poll_skip<R: Read + Unpin>(
mut source: R,
cx: &mut Context<'_>,
mut amt: u64,
) -> Poll<io::Result<()>> {
let mut buf = [0u8; 4096 * 8];
while amt > 0 {
let n = cmp::min(amt, buf.len() as u64);
let mut read_buf = io::ReadBuf::new(&mut buf[..n as usize]);
match futures_core::ready!(Pin::new(&mut source).poll_read(cx, &mut read_buf)) {
Ok(()) if read_buf.filled().is_empty() => {
return Poll::Ready(Err(other("unexpected EOF during skip")));
}
Ok(()) => {
amt -= read_buf.filled().len() as u64;
}
Err(err) => return Poll::Ready(Err(err)),
}
}
Poll::Ready(Ok(()))
}