use async_trait::async_trait;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use snafu::{OptionExt, ResultExt, Snafu};
use url::Url;
use crate::client::get::GetClientExt;
use crate::client::header::get_etag;
use crate::http::client::Client;
use crate::path::Path;
use crate::{
ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, RetryConfig,
};
mod client;
#[derive(Debug, Snafu)]
enum Error {
#[snafu(display("Must specify a URL"))]
MissingUrl,
#[snafu(display("Unable parse source url. Url: {}, Error: {}", url, source))]
UnableToParseUrl {
source: url::ParseError,
url: String,
},
#[snafu(display("Unable to extract metadata from headers: {}", source))]
Metadata {
source: crate::client::header::Error,
},
}
impl From<Error> for crate::Error {
fn from(err: Error) -> Self {
Self::Generic {
store: "HTTP",
source: Box::new(err),
}
}
}
#[derive(Debug)]
pub struct HttpStore {
client: Client,
}
impl std::fmt::Display for HttpStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "HttpStore")
}
}
#[async_trait]
impl ObjectStore for HttpStore {
async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
if opts.mode != PutMode::Overwrite {
return Err(crate::Error::NotImplemented);
}
let response = self.client.put(location, payload, opts.attributes).await?;
let e_tag = match get_etag(response.headers()) {
Ok(e_tag) => Some(e_tag),
Err(crate::client::header::Error::MissingEtag) => None,
Err(source) => return Err(Error::Metadata { source }.into()),
};
Ok(PutResult {
e_tag,
version: None,
})
}
async fn put_multipart_opts(
&self,
_location: &Path,
_opts: PutMultipartOpts,
) -> Result<Box<dyn MultipartUpload>> {
Err(crate::Error::NotImplemented)
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
self.client.get_opts(location, options).await
}
async fn delete(&self, location: &Path) -> Result<()> {
self.client.delete(location).await
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or_default();
let prefix = prefix.cloned();
futures::stream::once(async move {
let status = self.client.list(prefix.as_ref(), "infinity").await?;
let iter = status
.response
.into_iter()
.filter(|r| !r.is_dir())
.map(|response| {
response.check_ok()?;
response.object_meta(self.client.base_url())
})
.filter_ok(move |r| r.location.as_ref().len() > prefix_len);
Ok::<_, crate::Error>(futures::stream::iter(iter))
})
.try_flatten()
.boxed()
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
let status = self.client.list(prefix, "1").await?;
let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or(0);
let mut objects: Vec<ObjectMeta> = Vec::with_capacity(status.response.len());
let mut common_prefixes = Vec::with_capacity(status.response.len());
for response in status.response {
response.check_ok()?;
match response.is_dir() {
false => {
let meta = response.object_meta(self.client.base_url())?;
if meta.location.as_ref().len() > prefix_len {
objects.push(meta);
}
}
true => {
let path = response.path(self.client.base_url())?;
if path.as_ref().len() > prefix_len {
common_prefixes.push(path);
}
}
}
}
Ok(ListResult {
common_prefixes,
objects,
})
}
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
self.client.copy(from, to, true).await
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.client.copy(from, to, false).await
}
}
#[derive(Debug, Default, Clone)]
pub struct HttpBuilder {
url: Option<String>,
client_options: ClientOptions,
retry_config: RetryConfig,
}
impl HttpBuilder {
pub fn new() -> Self {
Default::default()
}
pub fn with_url(mut self, url: impl Into<String>) -> Self {
self.url = Some(url.into());
self
}
pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
self.retry_config = retry_config;
self
}
pub fn with_config(mut self, key: ClientConfigKey, value: impl Into<String>) -> Self {
self.client_options = self.client_options.with_config(key, value);
self
}
pub fn with_client_options(mut self, options: ClientOptions) -> Self {
self.client_options = options;
self
}
pub fn build(self) -> Result<HttpStore> {
let url = self.url.context(MissingUrlSnafu)?;
let parsed = Url::parse(&url).context(UnableToParseUrlSnafu { url })?;
Ok(HttpStore {
client: Client::new(parsed, self.client_options, self.retry_config)?,
})
}
}
#[cfg(test)]
mod tests {
use crate::integration::*;
use crate::tests::*;
use super::*;
#[tokio::test]
async fn http_test() {
maybe_skip_integration!();
let url = std::env::var("HTTP_URL").expect("HTTP_URL must be set");
let options = ClientOptions::new().with_allow_http(true);
let integration = HttpBuilder::new()
.with_url(url)
.with_client_options(options)
.build()
.unwrap();
put_get_delete_list(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
copy_if_not_exists(&integration).await;
}
}