pub mod backoff;
#[cfg(test)]
pub mod mock_server;
pub mod retry;
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
pub mod pagination;
pub mod get;
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
pub mod list;
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
pub mod token;
pub mod header;
#[cfg(any(feature = "aws", feature = "gcp"))]
pub mod s3;
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
pub mod parts;
use async_trait::async_trait;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use reqwest::header::{HeaderMap, HeaderValue};
use reqwest::{Client, ClientBuilder, NoProxy, Proxy, RequestBuilder};
use serde::{Deserialize, Serialize};
use crate::config::{fmt_duration, ConfigValue};
use crate::path::Path;
use crate::{GetOptions, Result};
fn map_client_error(e: reqwest::Error) -> super::Error {
super::Error::Generic {
store: "HTTP client",
source: Box::new(e),
}
}
static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy, Deserialize, Serialize)]
#[non_exhaustive]
pub enum ClientConfigKey {
AllowHttp,
AllowInvalidCertificates,
ConnectTimeout,
DefaultContentType,
Http1Only,
Http2KeepAliveInterval,
Http2KeepAliveTimeout,
Http2KeepAliveWhileIdle,
Http2Only,
PoolIdleTimeout,
PoolMaxIdlePerHost,
ProxyUrl,
ProxyCaCertificate,
ProxyExcludes,
Timeout,
UserAgent,
}
impl AsRef<str> for ClientConfigKey {
fn as_ref(&self) -> &str {
match self {
Self::AllowHttp => "allow_http",
Self::AllowInvalidCertificates => "allow_invalid_certificates",
Self::ConnectTimeout => "connect_timeout",
Self::DefaultContentType => "default_content_type",
Self::Http1Only => "http1_only",
Self::Http2Only => "http2_only",
Self::Http2KeepAliveInterval => "http2_keep_alive_interval",
Self::Http2KeepAliveTimeout => "http2_keep_alive_timeout",
Self::Http2KeepAliveWhileIdle => "http2_keep_alive_while_idle",
Self::PoolIdleTimeout => "pool_idle_timeout",
Self::PoolMaxIdlePerHost => "pool_max_idle_per_host",
Self::ProxyUrl => "proxy_url",
Self::ProxyCaCertificate => "proxy_ca_certificate",
Self::ProxyExcludes => "proxy_excludes",
Self::Timeout => "timeout",
Self::UserAgent => "user_agent",
}
}
}
impl FromStr for ClientConfigKey {
type Err = super::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"allow_http" => Ok(Self::AllowHttp),
"allow_invalid_certificates" => Ok(Self::AllowInvalidCertificates),
"connect_timeout" => Ok(Self::ConnectTimeout),
"default_content_type" => Ok(Self::DefaultContentType),
"http1_only" => Ok(Self::Http1Only),
"http2_only" => Ok(Self::Http2Only),
"http2_keep_alive_interval" => Ok(Self::Http2KeepAliveInterval),
"http2_keep_alive_timeout" => Ok(Self::Http2KeepAliveTimeout),
"http2_keep_alive_while_idle" => Ok(Self::Http2KeepAliveWhileIdle),
"pool_idle_timeout" => Ok(Self::PoolIdleTimeout),
"pool_max_idle_per_host" => Ok(Self::PoolMaxIdlePerHost),
"proxy_url" => Ok(Self::ProxyUrl),
"timeout" => Ok(Self::Timeout),
"user_agent" => Ok(Self::UserAgent),
_ => Err(super::Error::UnknownConfigurationKey {
store: "HTTP",
key: s.into(),
}),
}
}
}
#[derive(Debug, Clone)]
pub struct ClientOptions {
user_agent: Option<ConfigValue<HeaderValue>>,
content_type_map: HashMap<String, String>,
default_content_type: Option<String>,
default_headers: Option<HeaderMap>,
proxy_url: Option<String>,
proxy_ca_certificate: Option<String>,
proxy_excludes: Option<String>,
allow_http: ConfigValue<bool>,
allow_insecure: ConfigValue<bool>,
timeout: Option<ConfigValue<Duration>>,
connect_timeout: Option<ConfigValue<Duration>>,
pool_idle_timeout: Option<ConfigValue<Duration>>,
pool_max_idle_per_host: Option<ConfigValue<usize>>,
http2_keep_alive_interval: Option<ConfigValue<Duration>>,
http2_keep_alive_timeout: Option<ConfigValue<Duration>>,
http2_keep_alive_while_idle: ConfigValue<bool>,
http1_only: ConfigValue<bool>,
http2_only: ConfigValue<bool>,
}
impl Default for ClientOptions {
fn default() -> Self {
Self {
user_agent: None,
content_type_map: Default::default(),
default_content_type: None,
default_headers: None,
proxy_url: None,
proxy_ca_certificate: None,
proxy_excludes: None,
allow_http: Default::default(),
allow_insecure: Default::default(),
timeout: Some(Duration::from_secs(30).into()),
connect_timeout: Some(Duration::from_secs(5).into()),
pool_idle_timeout: None,
pool_max_idle_per_host: None,
http2_keep_alive_interval: None,
http2_keep_alive_timeout: None,
http2_keep_alive_while_idle: Default::default(),
http1_only: true.into(),
http2_only: Default::default(),
}
}
}
impl ClientOptions {
pub fn new() -> Self {
Default::default()
}
pub fn with_config(mut self, key: ClientConfigKey, value: impl Into<String>) -> Self {
match key {
ClientConfigKey::AllowHttp => self.allow_http.parse(value),
ClientConfigKey::AllowInvalidCertificates => self.allow_insecure.parse(value),
ClientConfigKey::ConnectTimeout => {
self.connect_timeout = Some(ConfigValue::Deferred(value.into()))
}
ClientConfigKey::DefaultContentType => self.default_content_type = Some(value.into()),
ClientConfigKey::Http1Only => self.http1_only.parse(value),
ClientConfigKey::Http2Only => self.http2_only.parse(value),
ClientConfigKey::Http2KeepAliveInterval => {
self.http2_keep_alive_interval = Some(ConfigValue::Deferred(value.into()))
}
ClientConfigKey::Http2KeepAliveTimeout => {
self.http2_keep_alive_timeout = Some(ConfigValue::Deferred(value.into()))
}
ClientConfigKey::Http2KeepAliveWhileIdle => {
self.http2_keep_alive_while_idle.parse(value)
}
ClientConfigKey::PoolIdleTimeout => {
self.pool_idle_timeout = Some(ConfigValue::Deferred(value.into()))
}
ClientConfigKey::PoolMaxIdlePerHost => {
self.pool_max_idle_per_host = Some(ConfigValue::Deferred(value.into()))
}
ClientConfigKey::ProxyUrl => self.proxy_url = Some(value.into()),
ClientConfigKey::ProxyCaCertificate => self.proxy_ca_certificate = Some(value.into()),
ClientConfigKey::ProxyExcludes => self.proxy_excludes = Some(value.into()),
ClientConfigKey::Timeout => self.timeout = Some(ConfigValue::Deferred(value.into())),
ClientConfigKey::UserAgent => {
self.user_agent = Some(ConfigValue::Deferred(value.into()))
}
}
self
}
pub fn get_config_value(&self, key: &ClientConfigKey) -> Option<String> {
match key {
ClientConfigKey::AllowHttp => Some(self.allow_http.to_string()),
ClientConfigKey::AllowInvalidCertificates => Some(self.allow_insecure.to_string()),
ClientConfigKey::ConnectTimeout => self.connect_timeout.as_ref().map(fmt_duration),
ClientConfigKey::DefaultContentType => self.default_content_type.clone(),
ClientConfigKey::Http1Only => Some(self.http1_only.to_string()),
ClientConfigKey::Http2KeepAliveInterval => {
self.http2_keep_alive_interval.as_ref().map(fmt_duration)
}
ClientConfigKey::Http2KeepAliveTimeout => {
self.http2_keep_alive_timeout.as_ref().map(fmt_duration)
}
ClientConfigKey::Http2KeepAliveWhileIdle => {
Some(self.http2_keep_alive_while_idle.to_string())
}
ClientConfigKey::Http2Only => Some(self.http2_only.to_string()),
ClientConfigKey::PoolIdleTimeout => self.pool_idle_timeout.as_ref().map(fmt_duration),
ClientConfigKey::PoolMaxIdlePerHost => {
self.pool_max_idle_per_host.as_ref().map(|v| v.to_string())
}
ClientConfigKey::ProxyUrl => self.proxy_url.clone(),
ClientConfigKey::ProxyCaCertificate => self.proxy_ca_certificate.clone(),
ClientConfigKey::ProxyExcludes => self.proxy_excludes.clone(),
ClientConfigKey::Timeout => self.timeout.as_ref().map(fmt_duration),
ClientConfigKey::UserAgent => self
.user_agent
.as_ref()
.and_then(|v| v.get().ok())
.and_then(|v| v.to_str().ok().map(|s| s.to_string())),
}
}
pub fn with_user_agent(mut self, agent: HeaderValue) -> Self {
self.user_agent = Some(agent.into());
self
}
pub fn with_default_content_type(mut self, mime: impl Into<String>) -> Self {
self.default_content_type = Some(mime.into());
self
}
pub fn with_content_type_for_suffix(
mut self,
extension: impl Into<String>,
mime: impl Into<String>,
) -> Self {
self.content_type_map.insert(extension.into(), mime.into());
self
}
pub fn with_default_headers(mut self, headers: HeaderMap) -> Self {
self.default_headers = Some(headers);
self
}
pub fn with_allow_http(mut self, allow_http: bool) -> Self {
self.allow_http = allow_http.into();
self
}
pub fn with_allow_invalid_certificates(mut self, allow_insecure: bool) -> Self {
self.allow_insecure = allow_insecure.into();
self
}
pub fn with_http1_only(mut self) -> Self {
self.http2_only = false.into();
self.http1_only = true.into();
self
}
pub fn with_http2_only(mut self) -> Self {
self.http1_only = false.into();
self.http2_only = true.into();
self
}
pub fn with_allow_http2(mut self) -> Self {
self.http1_only = false.into();
self.http2_only = false.into();
self
}
pub fn with_proxy_url(mut self, proxy_url: impl Into<String>) -> Self {
self.proxy_url = Some(proxy_url.into());
self
}
pub fn with_proxy_ca_certificate(mut self, proxy_ca_certificate: impl Into<String>) -> Self {
self.proxy_ca_certificate = Some(proxy_ca_certificate.into());
self
}
pub fn with_proxy_excludes(mut self, proxy_excludes: impl Into<String>) -> Self {
self.proxy_excludes = Some(proxy_excludes.into());
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(ConfigValue::Parsed(timeout));
self
}
pub fn with_timeout_disabled(mut self) -> Self {
self.timeout = None;
self
}
pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
self.connect_timeout = Some(ConfigValue::Parsed(timeout));
self
}
pub fn with_connect_timeout_disabled(mut self) -> Self {
self.timeout = None;
self
}
pub fn with_pool_idle_timeout(mut self, timeout: Duration) -> Self {
self.pool_idle_timeout = Some(ConfigValue::Parsed(timeout));
self
}
pub fn with_pool_max_idle_per_host(mut self, max: usize) -> Self {
self.pool_max_idle_per_host = Some(max.into());
self
}
pub fn with_http2_keep_alive_interval(mut self, interval: Duration) -> Self {
self.http2_keep_alive_interval = Some(ConfigValue::Parsed(interval));
self
}
pub fn with_http2_keep_alive_timeout(mut self, interval: Duration) -> Self {
self.http2_keep_alive_timeout = Some(ConfigValue::Parsed(interval));
self
}
pub fn with_http2_keep_alive_while_idle(mut self) -> Self {
self.http2_keep_alive_while_idle = true.into();
self
}
pub fn get_content_type(&self, path: &Path) -> Option<&str> {
match path.extension() {
Some(extension) => match self.content_type_map.get(extension) {
Some(ct) => Some(ct.as_str()),
None => self.default_content_type.as_deref(),
},
None => self.default_content_type.as_deref(),
}
}
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
pub(crate) fn metadata_client(&self) -> Result<Client> {
self.clone()
.with_allow_http(true)
.with_connect_timeout(Duration::from_secs(1))
.client()
}
pub(crate) fn client(&self) -> Result<Client> {
let mut builder = ClientBuilder::new();
match &self.user_agent {
Some(user_agent) => builder = builder.user_agent(user_agent.get()?),
None => builder = builder.user_agent(DEFAULT_USER_AGENT),
}
if let Some(headers) = &self.default_headers {
builder = builder.default_headers(headers.clone())
}
if let Some(proxy) = &self.proxy_url {
let mut proxy = Proxy::all(proxy).map_err(map_client_error)?;
if let Some(certificate) = &self.proxy_ca_certificate {
let certificate = reqwest::tls::Certificate::from_pem(certificate.as_bytes())
.map_err(map_client_error)?;
builder = builder.add_root_certificate(certificate);
}
if let Some(proxy_excludes) = &self.proxy_excludes {
let no_proxy = NoProxy::from_string(proxy_excludes);
proxy = proxy.no_proxy(no_proxy);
}
builder = builder.proxy(proxy);
}
if let Some(timeout) = &self.timeout {
builder = builder.timeout(timeout.get()?)
}
if let Some(timeout) = &self.connect_timeout {
builder = builder.connect_timeout(timeout.get()?)
}
if let Some(timeout) = &self.pool_idle_timeout {
builder = builder.pool_idle_timeout(timeout.get()?)
}
if let Some(max) = &self.pool_max_idle_per_host {
builder = builder.pool_max_idle_per_host(max.get()?)
}
if let Some(interval) = &self.http2_keep_alive_interval {
builder = builder.http2_keep_alive_interval(interval.get()?)
}
if let Some(interval) = &self.http2_keep_alive_timeout {
builder = builder.http2_keep_alive_timeout(interval.get()?)
}
if self.http2_keep_alive_while_idle.get()? {
builder = builder.http2_keep_alive_while_idle(true)
}
if self.http1_only.get()? {
builder = builder.http1_only()
}
if self.http2_only.get()? {
builder = builder.http2_prior_knowledge()
}
if self.allow_insecure.get()? {
builder = builder.danger_accept_invalid_certs(true)
}
builder
.https_only(!self.allow_http.get()?)
.build()
.map_err(map_client_error)
}
}
pub trait GetOptionsExt {
fn with_get_options(self, options: GetOptions) -> Self;
}
impl GetOptionsExt for RequestBuilder {
fn with_get_options(mut self, options: GetOptions) -> Self {
use hyper::header::*;
if let Some(range) = options.range {
self = self.header(RANGE, range.to_string());
}
if let Some(tag) = options.if_match {
self = self.header(IF_MATCH, tag);
}
if let Some(tag) = options.if_none_match {
self = self.header(IF_NONE_MATCH, tag);
}
const DATE_FORMAT: &str = "%a, %d %b %Y %H:%M:%S GMT";
if let Some(date) = options.if_unmodified_since {
self = self.header(IF_UNMODIFIED_SINCE, date.format(DATE_FORMAT).to_string());
}
if let Some(date) = options.if_modified_since {
self = self.header(IF_MODIFIED_SINCE, date.format(DATE_FORMAT).to_string());
}
self
}
}
#[async_trait]
pub trait CredentialProvider: std::fmt::Debug + Send + Sync {
type Credential;
async fn get_credential(&self) -> Result<Arc<Self::Credential>>;
}
#[derive(Debug)]
pub struct StaticCredentialProvider<T> {
credential: Arc<T>,
}
impl<T> StaticCredentialProvider<T> {
pub fn new(credential: T) -> Self {
Self {
credential: Arc::new(credential),
}
}
}
#[async_trait]
impl<T> CredentialProvider for StaticCredentialProvider<T>
where
T: std::fmt::Debug + Send + Sync,
{
type Credential = T;
async fn get_credential(&self) -> Result<Arc<T>> {
Ok(Arc::clone(&self.credential))
}
}
#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
mod cloud {
use super::*;
use crate::client::token::{TemporaryToken, TokenCache};
use crate::RetryConfig;
#[derive(Debug)]
pub struct TokenCredentialProvider<T: TokenProvider> {
inner: T,
client: Client,
retry: RetryConfig,
cache: TokenCache<Arc<T::Credential>>,
}
impl<T: TokenProvider> TokenCredentialProvider<T> {
pub fn new(inner: T, client: Client, retry: RetryConfig) -> Self {
Self {
inner,
client,
retry,
cache: Default::default(),
}
}
#[cfg(feature = "aws")]
pub fn with_min_ttl(mut self, min_ttl: Duration) -> Self {
self.cache = self.cache.with_min_ttl(min_ttl);
self
}
}
#[async_trait]
impl<T: TokenProvider> CredentialProvider for TokenCredentialProvider<T> {
type Credential = T::Credential;
async fn get_credential(&self) -> Result<Arc<Self::Credential>> {
self.cache
.get_or_insert_with(|| self.inner.fetch_token(&self.client, &self.retry))
.await
}
}
#[async_trait]
pub trait TokenProvider: std::fmt::Debug + Send + Sync {
type Credential: std::fmt::Debug + Send + Sync;
async fn fetch_token(
&self,
client: &Client,
retry: &RetryConfig,
) -> Result<TemporaryToken<Arc<Self::Credential>>>;
}
}
#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
pub use cloud::*;
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
#[test]
fn client_test_config_from_map() {
let allow_http = "true".to_string();
let allow_invalid_certificates = "false".to_string();
let connect_timeout = "90 seconds".to_string();
let default_content_type = "object_store:fake_default_content_type".to_string();
let http1_only = "true".to_string();
let http2_only = "false".to_string();
let http2_keep_alive_interval = "90 seconds".to_string();
let http2_keep_alive_timeout = "91 seconds".to_string();
let http2_keep_alive_while_idle = "92 seconds".to_string();
let pool_idle_timeout = "93 seconds".to_string();
let pool_max_idle_per_host = "94".to_string();
let proxy_url = "https://fake_proxy_url".to_string();
let timeout = "95 seconds".to_string();
let user_agent = "object_store:fake_user_agent".to_string();
let options = HashMap::from([
("allow_http", allow_http.clone()),
(
"allow_invalid_certificates",
allow_invalid_certificates.clone(),
),
("connect_timeout", connect_timeout.clone()),
("default_content_type", default_content_type.clone()),
("http1_only", http1_only.clone()),
("http2_only", http2_only.clone()),
(
"http2_keep_alive_interval",
http2_keep_alive_interval.clone(),
),
("http2_keep_alive_timeout", http2_keep_alive_timeout.clone()),
(
"http2_keep_alive_while_idle",
http2_keep_alive_while_idle.clone(),
),
("pool_idle_timeout", pool_idle_timeout.clone()),
("pool_max_idle_per_host", pool_max_idle_per_host.clone()),
("proxy_url", proxy_url.clone()),
("timeout", timeout.clone()),
("user_agent", user_agent.clone()),
]);
let builder = options
.into_iter()
.fold(ClientOptions::new(), |builder, (key, value)| {
builder.with_config(key.parse().unwrap(), value)
});
assert_eq!(
builder
.get_config_value(&ClientConfigKey::AllowHttp)
.unwrap(),
allow_http
);
assert_eq!(
builder
.get_config_value(&ClientConfigKey::AllowInvalidCertificates)
.unwrap(),
allow_invalid_certificates
);
assert_eq!(
builder
.get_config_value(&ClientConfigKey::ConnectTimeout)
.unwrap(),
connect_timeout
);
assert_eq!(
builder
.get_config_value(&ClientConfigKey::DefaultContentType)
.unwrap(),
default_content_type
);
assert_eq!(
builder
.get_config_value(&ClientConfigKey::Http1Only)
.unwrap(),
http1_only
);
assert_eq!(
builder
.get_config_value(&ClientConfigKey::Http2Only)
.unwrap(),
http2_only
);
assert_eq!(
builder
.get_config_value(&ClientConfigKey::Http2KeepAliveInterval)
.unwrap(),
http2_keep_alive_interval
);
assert_eq!(
builder
.get_config_value(&ClientConfigKey::Http2KeepAliveTimeout)
.unwrap(),
http2_keep_alive_timeout
);
assert_eq!(
builder
.get_config_value(&ClientConfigKey::Http2KeepAliveWhileIdle)
.unwrap(),
http2_keep_alive_while_idle
);
assert_eq!(
builder
.get_config_value(&ClientConfigKey::PoolIdleTimeout)
.unwrap(),
pool_idle_timeout
);
assert_eq!(
builder
.get_config_value(&ClientConfigKey::PoolMaxIdlePerHost)
.unwrap(),
pool_max_idle_per_host
);
assert_eq!(
builder
.get_config_value(&ClientConfigKey::ProxyUrl)
.unwrap(),
proxy_url
);
assert_eq!(
builder.get_config_value(&ClientConfigKey::Timeout).unwrap(),
timeout
);
assert_eq!(
builder
.get_config_value(&ClientConfigKey::UserAgent)
.unwrap(),
user_agent
);
}
}