use super::client::GoogleCloudStorageClient;
use crate::client::retry::RetryExt;
use crate::client::token::TemporaryToken;
use crate::client::TokenProvider;
use crate::gcp::{GcpSigningCredentialProvider, STORE};
use crate::util::{hex_digest, hex_encode, STRICT_ENCODE_SET};
use crate::{RetryConfig, StaticCredentialProvider};
use async_trait::async_trait;
use base64::prelude::BASE64_URL_SAFE_NO_PAD;
use base64::Engine;
use chrono::{DateTime, Utc};
use futures::TryFutureExt;
use hyper::HeaderMap;
use itertools::Itertools;
use percent_encoding::utf8_percent_encode;
use reqwest::{Client, Method};
use ring::signature::RsaKeyPair;
use serde::Deserialize;
use snafu::{ResultExt, Snafu};
use std::collections::BTreeMap;
use std::env;
use std::fs::File;
use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::info;
use url::Url;
pub const DEFAULT_SCOPE: &str = "https://www.googleapis.com/auth/cloud-platform";
pub const DEFAULT_GCS_BASE_URL: &str = "https://storage.googleapis.com";
const DEFAULT_GCS_PLAYLOAD_STRING: &str = "UNSIGNED-PAYLOAD";
const DEFAULT_GCS_SIGN_BLOB_HOST: &str = "storage.googleapis.com";
const DEFAULT_METADATA_HOST: &str = "metadata.google.internal";
const DEFAULT_METADATA_IP: &str = "169.254.169.254";
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Unable to open service account file from {}: {}", path.display(), source))]
OpenCredentials {
source: std::io::Error,
path: PathBuf,
},
#[snafu(display("Unable to decode service account file: {}", source))]
DecodeCredentials { source: serde_json::Error },
#[snafu(display("No RSA key found in pem file"))]
MissingKey,
#[snafu(display("Invalid RSA key: {}", source), context(false))]
InvalidKey { source: ring::error::KeyRejected },
#[snafu(display("Error signing: {}", source))]
Sign { source: ring::error::Unspecified },
#[snafu(display("Error encoding jwt payload: {}", source))]
Encode { source: serde_json::Error },
#[snafu(display("Unsupported key encoding: {}", encoding))]
UnsupportedKey { encoding: String },
#[snafu(display("Error performing token request: {}", source))]
TokenRequest { source: crate::client::retry::Error },
#[snafu(display("Error getting token response body: {}", source))]
TokenResponseBody { source: reqwest::Error },
}
impl From<Error> for crate::Error {
fn from(value: Error) -> Self {
Self::Generic {
store: STORE,
source: Box::new(value),
}
}
}
#[derive(Debug)]
pub struct GcpSigningCredential {
pub email: String,
pub private_key: Option<ServiceAccountKey>,
}
#[derive(Debug)]
pub struct ServiceAccountKey(RsaKeyPair);
impl ServiceAccountKey {
pub fn from_pem(encoded: &[u8]) -> Result<Self> {
use rustls_pemfile::Item;
use std::io::Cursor;
let mut cursor = Cursor::new(encoded);
let mut reader = BufReader::new(&mut cursor);
match rustls_pemfile::read_one(&mut reader).unwrap() {
Some(Item::Pkcs8Key(key)) => Self::from_pkcs8(key.secret_pkcs8_der()),
Some(Item::Pkcs1Key(key)) => Self::from_der(key.secret_pkcs1_der()),
_ => Err(Error::MissingKey),
}
}
pub fn from_pkcs8(key: &[u8]) -> Result<Self> {
Ok(Self(RsaKeyPair::from_pkcs8(key)?))
}
pub fn from_der(key: &[u8]) -> Result<Self> {
Ok(Self(RsaKeyPair::from_der(key)?))
}
fn sign(&self, string_to_sign: &str) -> Result<String> {
let mut signature = vec![0; self.0.public().modulus_len()];
self.0
.sign(
&ring::signature::RSA_PKCS1_SHA256,
&ring::rand::SystemRandom::new(),
string_to_sign.as_bytes(),
&mut signature,
)
.context(SignSnafu)?;
Ok(hex_encode(&signature))
}
}
#[derive(Debug, Eq, PartialEq)]
pub struct GcpCredential {
pub bearer: String,
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Default, serde::Serialize)]
pub struct JwtHeader<'a> {
#[serde(skip_serializing_if = "Option::is_none")]
pub typ: Option<&'a str>,
pub alg: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
pub cty: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
pub jku: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
pub kid: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
pub x5u: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
pub x5t: Option<&'a str>,
}
#[derive(serde::Serialize)]
struct TokenClaims<'a> {
iss: &'a str,
sub: &'a str,
scope: &'a str,
exp: u64,
iat: u64,
}
#[derive(serde::Deserialize, Debug)]
struct TokenResponse {
access_token: String,
expires_in: u64,
}
#[derive(Debug)]
pub struct SelfSignedJwt {
issuer: String,
scope: String,
private_key: ServiceAccountKey,
key_id: String,
}
impl SelfSignedJwt {
pub fn new(
key_id: String,
issuer: String,
private_key: ServiceAccountKey,
scope: String,
) -> Result<Self> {
Ok(Self {
issuer,
scope,
private_key,
key_id,
})
}
}
#[async_trait]
impl TokenProvider for SelfSignedJwt {
type Credential = GcpCredential;
async fn fetch_token(
&self,
_client: &Client,
_retry: &RetryConfig,
) -> crate::Result<TemporaryToken<Arc<GcpCredential>>> {
let now = seconds_since_epoch();
let exp = now + 3600;
let claims = TokenClaims {
iss: &self.issuer,
sub: &self.issuer,
scope: &self.scope,
iat: now,
exp,
};
let jwt_header = b64_encode_obj(&JwtHeader {
alg: "RS256",
typ: Some("JWT"),
kid: Some(&self.key_id),
..Default::default()
})?;
let claim_str = b64_encode_obj(&claims)?;
let message = [jwt_header.as_ref(), claim_str.as_ref()].join(".");
let mut sig_bytes = vec![0; self.private_key.0.public().modulus_len()];
self.private_key
.0
.sign(
&ring::signature::RSA_PKCS1_SHA256,
&ring::rand::SystemRandom::new(),
message.as_bytes(),
&mut sig_bytes,
)
.context(SignSnafu)?;
let signature = BASE64_URL_SAFE_NO_PAD.encode(sig_bytes);
let bearer = [message, signature].join(".");
Ok(TemporaryToken {
token: Arc::new(GcpCredential { bearer }),
expiry: Some(Instant::now() + Duration::from_secs(3600)),
})
}
}
fn read_credentials_file<T>(service_account_path: impl AsRef<std::path::Path>) -> Result<T>
where
T: serde::de::DeserializeOwned,
{
let file = File::open(&service_account_path).context(OpenCredentialsSnafu {
path: service_account_path.as_ref().to_owned(),
})?;
let reader = BufReader::new(file);
serde_json::from_reader(reader).context(DecodeCredentialsSnafu)
}
#[derive(serde::Deserialize, Debug, Clone)]
pub struct ServiceAccountCredentials {
pub private_key: String,
pub private_key_id: String,
pub client_email: String,
#[serde(default)]
pub gcs_base_url: Option<String>,
#[serde(default)]
pub disable_oauth: bool,
}
impl ServiceAccountCredentials {
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
read_credentials_file(path)
}
pub fn from_key(key: &str) -> Result<Self> {
serde_json::from_str(key).context(DecodeCredentialsSnafu)
}
pub fn token_provider(self) -> crate::Result<SelfSignedJwt> {
Ok(SelfSignedJwt::new(
self.private_key_id,
self.client_email,
ServiceAccountKey::from_pem(self.private_key.as_bytes())?,
DEFAULT_SCOPE.to_string(),
)?)
}
pub fn signing_credentials(self) -> crate::Result<GcpSigningCredentialProvider> {
Ok(Arc::new(StaticCredentialProvider::new(
GcpSigningCredential {
email: self.client_email,
private_key: Some(ServiceAccountKey::from_pem(self.private_key.as_bytes())?),
},
)))
}
}
fn seconds_since_epoch() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
}
fn b64_encode_obj<T: serde::Serialize>(obj: &T) -> Result<String> {
let string = serde_json::to_string(obj).context(EncodeSnafu)?;
Ok(BASE64_URL_SAFE_NO_PAD.encode(string))
}
#[derive(Debug, Default)]
pub struct InstanceCredentialProvider {}
async fn make_metadata_request(
client: &Client,
hostname: &str,
retry: &RetryConfig,
) -> crate::Result<TokenResponse> {
let url =
format!("http://{hostname}/computeMetadata/v1/instance/service-accounts/default/token");
let response: TokenResponse = client
.request(Method::GET, url)
.header("Metadata-Flavor", "Google")
.query(&[("audience", "https://www.googleapis.com/oauth2/v4/token")])
.send_retry(retry)
.await
.context(TokenRequestSnafu)?
.json()
.await
.context(TokenResponseBodySnafu)?;
Ok(response)
}
#[async_trait]
impl TokenProvider for InstanceCredentialProvider {
type Credential = GcpCredential;
async fn fetch_token(
&self,
client: &Client,
retry: &RetryConfig,
) -> crate::Result<TemporaryToken<Arc<GcpCredential>>> {
let metadata_host = if let Ok(host) = env::var("GCE_METADATA_HOST") {
host
} else if let Ok(host) = env::var("GCE_METADATA_ROOT") {
host
} else {
DEFAULT_METADATA_HOST.to_string()
};
let metadata_ip = if let Ok(ip) = env::var("GCE_METADATA_IP") {
ip
} else {
DEFAULT_METADATA_IP.to_string()
};
info!("fetching token from metadata server");
let response = make_metadata_request(client, &metadata_host, retry)
.or_else(|_| make_metadata_request(client, &metadata_ip, retry))
.await?;
let token = TemporaryToken {
token: Arc::new(GcpCredential {
bearer: response.access_token,
}),
expiry: Some(Instant::now() + Duration::from_secs(response.expires_in)),
};
Ok(token)
}
}
async fn make_metadata_request_for_email(
client: &Client,
hostname: &str,
retry: &RetryConfig,
) -> crate::Result<String> {
let url =
format!("http://{hostname}/computeMetadata/v1/instance/service-accounts/default/email",);
let response = client
.request(Method::GET, url)
.header("Metadata-Flavor", "Google")
.send_retry(retry)
.await
.context(TokenRequestSnafu)?
.text()
.await
.context(TokenResponseBodySnafu)?;
Ok(response)
}
#[derive(Debug, Default)]
pub struct InstanceSigningCredentialProvider {}
#[async_trait]
impl TokenProvider for InstanceSigningCredentialProvider {
type Credential = GcpSigningCredential;
async fn fetch_token(
&self,
client: &Client,
retry: &RetryConfig,
) -> crate::Result<TemporaryToken<Arc<GcpSigningCredential>>> {
let metadata_host = if let Ok(host) = env::var("GCE_METADATA_HOST") {
host
} else if let Ok(host) = env::var("GCE_METADATA_ROOT") {
host
} else {
DEFAULT_METADATA_HOST.to_string()
};
let metadata_ip = if let Ok(ip) = env::var("GCE_METADATA_IP") {
ip
} else {
DEFAULT_METADATA_IP.to_string()
};
info!("fetching token from metadata server");
let email = make_metadata_request_for_email(client, &metadata_host, retry)
.or_else(|_| make_metadata_request_for_email(client, &metadata_ip, retry))
.await?;
let token = TemporaryToken {
token: Arc::new(GcpSigningCredential {
email,
private_key: None,
}),
expiry: None,
};
Ok(token)
}
}
#[derive(serde::Deserialize, Clone)]
#[serde(tag = "type")]
pub enum ApplicationDefaultCredentials {
#[serde(rename = "service_account")]
ServiceAccount(ServiceAccountCredentials),
#[serde(rename = "authorized_user")]
AuthorizedUser(AuthorizedUserCredentials),
}
impl ApplicationDefaultCredentials {
const CREDENTIALS_PATH: &'static str = if cfg!(windows) {
"gcloud/application_default_credentials.json"
} else {
".config/gcloud/application_default_credentials.json"
};
pub fn read(path: Option<&str>) -> Result<Option<Self>, Error> {
if let Some(path) = path {
return read_credentials_file::<Self>(path).map(Some);
}
let home_var = if cfg!(windows) { "APPDATA" } else { "HOME" };
if let Some(home) = env::var_os(home_var) {
let path = Path::new(&home).join(Self::CREDENTIALS_PATH);
if path.exists() {
return read_credentials_file::<Self>(path).map(Some);
}
}
Ok(None)
}
}
const DEFAULT_TOKEN_GCP_URI: &str = "https://accounts.google.com/o/oauth2/token";
#[derive(Debug, Deserialize, Clone)]
pub struct AuthorizedUserCredentials {
client_id: String,
client_secret: String,
refresh_token: String,
}
#[derive(Debug, Deserialize)]
pub struct AuthorizedUserSigningCredentials {
credential: AuthorizedUserCredentials,
}
#[derive(Debug, Deserialize)]
struct EmailResponse {
email: String,
}
impl AuthorizedUserSigningCredentials {
pub fn from(credential: AuthorizedUserCredentials) -> crate::Result<Self> {
Ok(Self { credential })
}
async fn client_email(&self, client: &Client, retry: &RetryConfig) -> crate::Result<String> {
let response = client
.request(Method::GET, "https://oauth2.googleapis.com/tokeninfo")
.query(&[("access_token", &self.credential.refresh_token)])
.send_retry(retry)
.await
.context(TokenRequestSnafu)?
.json::<EmailResponse>()
.await
.context(TokenResponseBodySnafu)?;
Ok(response.email)
}
}
#[async_trait]
impl TokenProvider for AuthorizedUserSigningCredentials {
type Credential = GcpSigningCredential;
async fn fetch_token(
&self,
client: &Client,
retry: &RetryConfig,
) -> crate::Result<TemporaryToken<Arc<GcpSigningCredential>>> {
let email = self.client_email(client, retry).await?;
Ok(TemporaryToken {
token: Arc::new(GcpSigningCredential {
email,
private_key: None,
}),
expiry: None,
})
}
}
#[async_trait]
impl TokenProvider for AuthorizedUserCredentials {
type Credential = GcpCredential;
async fn fetch_token(
&self,
client: &Client,
retry: &RetryConfig,
) -> crate::Result<TemporaryToken<Arc<GcpCredential>>> {
let response = client
.request(Method::POST, DEFAULT_TOKEN_GCP_URI)
.form(&[
("grant_type", "refresh_token"),
("client_id", &self.client_id),
("client_secret", &self.client_secret),
("refresh_token", &self.refresh_token),
])
.retryable(retry)
.idempotent(true)
.send()
.await
.context(TokenRequestSnafu)?
.json::<TokenResponse>()
.await
.context(TokenResponseBodySnafu)?;
Ok(TemporaryToken {
token: Arc::new(GcpCredential {
bearer: response.access_token,
}),
expiry: Some(Instant::now() + Duration::from_secs(response.expires_in)),
})
}
}
fn trim_header_value(value: &str) -> String {
let mut ret = value.to_string();
ret.retain(|c| !c.is_whitespace());
ret
}
#[derive(Debug)]
pub struct GCSAuthorizer {
date: Option<DateTime<Utc>>,
credential: Arc<GcpSigningCredential>,
}
impl GCSAuthorizer {
pub fn new(credential: Arc<GcpSigningCredential>) -> Self {
Self {
date: None,
credential,
}
}
pub(crate) async fn sign(
&self,
method: Method,
url: &mut Url,
expires_in: Duration,
client: &GoogleCloudStorageClient,
) -> crate::Result<()> {
let email = &self.credential.email;
let date = self.date.unwrap_or_else(Utc::now);
let scope = self.scope(date);
let credential_with_scope = format!("{}/{}", email, scope);
let mut headers = HeaderMap::new();
headers.insert("host", DEFAULT_GCS_SIGN_BLOB_HOST.parse().unwrap());
let (_, signed_headers) = Self::canonicalize_headers(&headers);
url.query_pairs_mut()
.append_pair("X-Goog-Algorithm", "GOOG4-RSA-SHA256")
.append_pair("X-Goog-Credential", &credential_with_scope)
.append_pair("X-Goog-Date", &date.format("%Y%m%dT%H%M%SZ").to_string())
.append_pair("X-Goog-Expires", &expires_in.as_secs().to_string())
.append_pair("X-Goog-SignedHeaders", &signed_headers);
let string_to_sign = self.string_to_sign(date, &method, url, &headers);
let signature = match &self.credential.private_key {
Some(key) => key.sign(&string_to_sign)?,
None => client.sign_blob(&string_to_sign, email).await?,
};
url.query_pairs_mut()
.append_pair("X-Goog-Signature", &signature);
Ok(())
}
fn scope(&self, date: DateTime<Utc>) -> String {
format!("{}/auto/storage/goog4_request", date.format("%Y%m%d"),)
}
fn canonicalize_request(url: &Url, method: &Method, headers: &HeaderMap) -> String {
let verb = method.as_str();
let path = url.path();
let query = Self::canonicalize_query(url);
let (canonical_headers, signed_headers) = Self::canonicalize_headers(headers);
format!(
"{}\n{}\n{}\n{}\n\n{}\n{}",
verb, path, query, canonical_headers, signed_headers, DEFAULT_GCS_PLAYLOAD_STRING
)
}
fn canonicalize_query(url: &Url) -> String {
url.query_pairs()
.sorted_unstable_by(|a, b| a.0.cmp(&b.0))
.map(|(k, v)| {
format!(
"{}={}",
utf8_percent_encode(k.as_ref(), &STRICT_ENCODE_SET),
utf8_percent_encode(v.as_ref(), &STRICT_ENCODE_SET)
)
})
.join("&")
}
fn canonicalize_headers(header_map: &HeaderMap) -> (String, String) {
let mut headers = BTreeMap::<String, Vec<&str>>::new();
for (k, v) in header_map {
headers
.entry(k.as_str().to_lowercase())
.or_default()
.push(std::str::from_utf8(v.as_bytes()).unwrap());
}
let canonicalize_headers = headers
.iter()
.map(|(k, v)| {
format!(
"{}:{}",
k.trim(),
v.iter().map(|v| trim_header_value(v)).join(",")
)
})
.join("\n");
let signed_headers = headers.keys().join(";");
(canonicalize_headers, signed_headers)
}
pub fn string_to_sign(
&self,
date: DateTime<Utc>,
request_method: &Method,
url: &Url,
headers: &HeaderMap,
) -> String {
let canonical_request = Self::canonicalize_request(url, request_method, headers);
let hashed_canonical_req = hex_digest(canonical_request.as_bytes());
let scope = self.scope(date);
format!(
"{}\n{}\n{}\n{}",
"GOOG4-RSA-SHA256",
date.format("%Y%m%dT%H%M%SZ"),
scope,
hashed_canonical_req
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_canonicalize_headers() {
let mut input_header = HeaderMap::new();
input_header.insert("content-type", "text/plain".parse().unwrap());
input_header.insert("host", "storage.googleapis.com".parse().unwrap());
input_header.insert("x-goog-meta-reviewer", "jane".parse().unwrap());
input_header.append("x-goog-meta-reviewer", "john".parse().unwrap());
assert_eq!(
GCSAuthorizer::canonicalize_headers(&input_header),
(
"content-type:text/plain
host:storage.googleapis.com
x-goog-meta-reviewer:jane,john"
.into(),
"content-type;host;x-goog-meta-reviewer".to_string()
)
);
}
#[test]
fn test_canonicalize_query() {
let mut url = Url::parse("https://storage.googleapis.com/bucket/object").unwrap();
url.query_pairs_mut()
.append_pair("max-keys", "2")
.append_pair("prefix", "object");
assert_eq!(
GCSAuthorizer::canonicalize_query(&url),
"max-keys=2&prefix=object".to_string()
);
}
}