opentelemetry_otlp/exporter/http/
logs.rs
use super::OtlpHttpClient;
use http::{header::CONTENT_TYPE, Method};
use opentelemetry::otel_debug;
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
impl LogExporter for OtlpHttpClient {
#[allow(clippy::manual_async_fn)]
fn export(
&self,
batch: LogBatch<'_>,
) -> impl std::future::Future<Output = OTelSdkResult> + Send {
async move {
let client = self
.client
.lock()
.map_err(|e| OTelSdkError::InternalFailure(format!("Mutex lock failed: {}", e)))?
.clone()
.ok_or(OTelSdkError::AlreadyShutdown)?;
let (body, content_type) = self
.build_logs_export_body(batch)
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
.header(CONTENT_TYPE, content_type)
.body(body.into())
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
for (k, v) in &self.headers {
request.headers_mut().insert(k.clone(), v.clone());
}
let request_uri = request.uri().to_string();
otel_debug!(name: "HttpLogsClient.CallingExport");
let response = client
.send_bytes(request)
.await
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;
if !response.status().is_success() {
let error = format!(
"OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}",
request_uri,
response.status().as_u16(),
response.body()
);
return Err(OTelSdkError::InternalFailure(error));
}
Ok(())
}
}
fn shutdown(&mut self) -> OTelSdkResult {
let mut client_guard = self.client.lock().map_err(|e| {
OTelSdkError::InternalFailure(format!("Failed to acquire client lock: {}", e))
})?;
if client_guard.take().is_none() {
return Err(OTelSdkError::AlreadyShutdown);
}
Ok(())
}
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.resource = resource.into();
}
}