use super::{PathInfo, PathInfoService};
use crate::proto;
use async_stream::try_stream;
use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2};
use bytes::Bytes;
use data_encoding::HEXLOWER;
use futures::stream::BoxStream;
use nix_compat::nixbase32;
use prost::Message;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DurationSeconds};
use std::sync::Arc;
use tonic::async_trait;
use tracing::{instrument, trace};
use tvix_castore::composition::{CompositionContext, ServiceBuilder};
use tvix_castore::Error;
const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
#[derive(Clone)]
pub struct BigtablePathInfoService {
instance_name: String,
client: bigtable::BigTable,
params: BigtableParameters,
#[cfg(test)]
#[allow(dead_code)]
emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
}
impl BigtablePathInfoService {
#[cfg(not(test))]
pub async fn connect(
instance_name: String,
params: BigtableParameters,
) -> Result<Self, bigtable::Error> {
let connection = bigtable::BigTableConnection::new(
¶ms.project_id,
¶ms.instance_name,
params.is_read_only,
params.channel_size,
params.timeout,
)
.await?;
Ok(Self {
instance_name,
client: connection.client(),
params,
})
}
#[cfg(test)]
pub async fn connect(
instance_name: String,
params: BigtableParameters,
) -> Result<Self, bigtable::Error> {
use std::time::Duration;
use async_process::{Command, Stdio};
use tempfile::TempDir;
use tokio_retry::{strategy::ExponentialBackoff, Retry};
let tmpdir = TempDir::new().unwrap();
let socket_path = tmpdir.path().join("cbtemulator.sock");
let emulator_process = Command::new("cbtemulator")
.arg("-address")
.arg(socket_path.clone())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.kill_on_drop(true)
.spawn()
.expect("failed to spawn emulator");
Retry::spawn(
ExponentialBackoff::from_millis(20)
.max_delay(Duration::from_secs(1))
.take(3),
|| async {
if socket_path.exists() {
Ok(())
} else {
Err(())
}
},
)
.await
.expect("failed to wait for socket");
for cmd in &[
vec!["createtable", ¶ms.table_name],
vec!["createfamily", ¶ms.table_name, ¶ms.family_name],
] {
Command::new("cbt")
.args({
let mut args = vec![
"-instance",
¶ms.instance_name,
"-project",
¶ms.project_id,
];
args.extend_from_slice(cmd);
args
})
.env(
"BIGTABLE_EMULATOR_HOST",
format!("unix://{}", socket_path.to_string_lossy()),
)
.output()
.await
.expect("failed to run cbt setup command");
}
let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator(
&format!("unix://{}", socket_path.to_string_lossy()),
¶ms.project_id,
¶ms.instance_name,
false,
None,
)?;
Ok(Self {
instance_name: instance_name.to_string(),
client: connection.client(),
params,
emulator: (tmpdir, emulator_process).into(),
})
}
}
fn derive_pathinfo_key(digest: &[u8; 20]) -> String {
HEXLOWER.encode(digest)
}
#[async_trait]
impl PathInfoService for BigtablePathInfoService {
#[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest), instance_name = %self.instance_name))]
async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
let mut client = self.client.clone();
let path_info_key = derive_pathinfo_key(&digest);
let request = bigtable_v2::ReadRowsRequest {
app_profile_id: self.params.app_profile_id.to_string(),
table_name: client.get_full_table_name(&self.params.table_name),
rows_limit: 1,
rows: Some(bigtable_v2::RowSet {
row_keys: vec![path_info_key.clone().into()],
row_ranges: vec![],
}),
filter: Some(bigtable_v2::RowFilter {
filter: Some(bigtable_v2::row_filter::Filter::Chain(
bigtable_v2::row_filter::Chain {
filters: vec![
bigtable_v2::RowFilter {
filter: Some(
bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
self.params.family_name.to_string(),
),
),
},
bigtable_v2::RowFilter {
filter: Some(
bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
path_info_key.clone().into(),
),
),
},
],
},
)),
}),
..Default::default()
};
let mut response = client
.read_rows(request)
.await
.map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?;
if response.len() != 1 {
if response.len() > 1 {
return Err(Error::StorageError(
"got more than one row from bigtable".into(),
));
}
return Ok(None);
}
let (row_key, mut cells) = response.pop().unwrap();
if row_key != path_info_key.as_bytes() {
return Err(Error::StorageError(
"got wrong row key from bigtable".into(),
));
}
let cell = cells
.pop()
.ok_or_else(|| Error::StorageError("found no cells".into()))?;
if !cells.is_empty() {
return Err(Error::StorageError(
"more than one cell returned from bigtable".into(),
));
}
if path_info_key.as_bytes() != cell.qualifier {
return Err(Error::StorageError("unexpected cell qualifier".into()));
}
let path_info_proto = proto::PathInfo::decode(Bytes::from(cell.value))
.map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?;
let path_info = PathInfo::try_from(path_info_proto)
.map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?;
if path_info.store_path.digest() != &digest {
return Err(Error::StorageError("PathInfo has unexpected digest".into()));
}
Ok(Some(path_info))
}
#[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))]
async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
let mut client = self.client.clone();
let path_info_key = derive_pathinfo_key(path_info.store_path.digest());
let data = proto::PathInfo::from(path_info.clone()).encode_to_vec();
if data.len() as u64 > CELL_SIZE_LIMIT {
return Err(Error::StorageError(
"PathInfo exceeds cell limit on Bigtable".into(),
));
}
let resp = client
.check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest {
table_name: client.get_full_table_name(&self.params.table_name),
app_profile_id: self.params.app_profile_id.to_string(),
row_key: path_info_key.clone().into(),
predicate_filter: Some(bigtable_v2::RowFilter {
filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
path_info_key.clone().into(),
)),
}),
true_mutations: vec![],
false_mutations: vec![
bigtable_v2::Mutation {
mutation: Some(bigtable_v2::mutation::Mutation::SetCell(
bigtable_v2::mutation::SetCell {
family_name: self.params.family_name.to_string(),
column_qualifier: path_info_key.clone().into(),
timestamp_micros: -1, value: data,
},
)),
},
],
})
.await
.map_err(|e| Error::StorageError(format!("unable to mutate rows: {}", e)))?;
if resp.predicate_matched {
trace!("already existed")
}
Ok(path_info)
}
fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
let mut client = self.client.clone();
let request = bigtable_v2::ReadRowsRequest {
app_profile_id: self.params.app_profile_id.to_string(),
table_name: client.get_full_table_name(&self.params.table_name),
filter: Some(bigtable_v2::RowFilter {
filter: Some(bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
self.params.family_name.to_string(),
)),
}),
..Default::default()
};
let stream = try_stream! {
let response = client
.read_rows(request)
.await
.map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?;
for (row_key, mut cells) in response {
let cell = cells
.pop()
.ok_or_else(|| Error::StorageError("found no cells".into()))?;
if row_key != cell.qualifier {
Err(Error::StorageError("unexpected cell qualifier".into()))?;
}
if !cells.is_empty() {
Err(Error::StorageError(
"more than one cell returned from bigtable".into(),
))?
}
let path_info_proto = proto::PathInfo::decode(Bytes::from(cell.value))
.map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?;
let path_info = PathInfo::try_from(path_info_proto).map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?;
let exp_path_info_key = derive_pathinfo_key(path_info.store_path.digest());
if exp_path_info_key.as_bytes() != row_key.as_slice() {
Err(Error::StorageError("PathInfo has unexpected digest".into()))?
}
yield path_info
}
};
Box::pin(stream)
}
}
#[serde_as]
#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
pub struct BigtableParameters {
project_id: String,
instance_name: String,
#[serde(default)]
is_read_only: bool,
#[serde(default = "default_channel_size")]
channel_size: usize,
#[serde_as(as = "Option<DurationSeconds<String>>")]
#[serde(default = "default_timeout")]
timeout: Option<std::time::Duration>,
table_name: String,
family_name: String,
#[serde(default = "default_app_profile_id")]
app_profile_id: String,
}
impl BigtableParameters {
#[cfg(test)]
pub fn default_for_tests() -> Self {
Self {
project_id: "project-1".into(),
instance_name: "instance-1".into(),
is_read_only: false,
channel_size: default_channel_size(),
timeout: default_timeout(),
table_name: "table-1".into(),
family_name: "cf1".into(),
app_profile_id: default_app_profile_id(),
}
}
}
fn default_app_profile_id() -> String {
"default".to_owned()
}
fn default_channel_size() -> usize {
4
}
fn default_timeout() -> Option<std::time::Duration> {
Some(std::time::Duration::from_secs(4))
}
#[async_trait]
impl ServiceBuilder for BigtableParameters {
type Output = dyn PathInfoService;
async fn build<'a>(
&'a self,
instance_name: &str,
_context: &CompositionContext,
) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync>> {
Ok(Arc::new(
BigtablePathInfoService::connect(instance_name.to_string(), self.clone()).await?,
))
}
}
impl TryFrom<url::Url> for BigtableParameters {
type Error = Box<dyn std::error::Error + Send + Sync>;
fn try_from(mut url: url::Url) -> Result<Self, Self::Error> {
let instance_name = url
.host_str()
.ok_or_else(|| Error::StorageError("instance name missing".into()))?
.to_string();
url.query_pairs_mut()
.append_pair("instance_name", &instance_name);
let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())
.map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?;
Ok(params)
}
}