opentelemetry_otlp/exporter/http/
logs.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
use super::OtlpHttpClient;
use http::{header::CONTENT_TYPE, Method};
use opentelemetry::otel_debug;
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
use opentelemetry_sdk::logs::{LogBatch, LogExporter};

impl LogExporter for OtlpHttpClient {
    #[allow(clippy::manual_async_fn)]
    fn export(
        &self,
        batch: LogBatch<'_>,
    ) -> impl std::future::Future<Output = OTelSdkResult> + Send {
        async move {
            let client = self
                .client
                .lock()
                .map_err(|e| OTelSdkError::InternalFailure(format!("Mutex lock failed: {}", e)))?
                .clone()
                .ok_or(OTelSdkError::AlreadyShutdown)?;

            let (body, content_type) = self
                .build_logs_export_body(batch)
                .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;

            let mut request = http::Request::builder()
                .method(Method::POST)
                .uri(&self.collector_endpoint)
                .header(CONTENT_TYPE, content_type)
                .body(body.into())
                .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;

            for (k, v) in &self.headers {
                request.headers_mut().insert(k.clone(), v.clone());
            }

            let request_uri = request.uri().to_string();
            otel_debug!(name: "HttpLogsClient.CallingExport");
            let response = client
                .send_bytes(request)
                .await
                .map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;
            if !response.status().is_success() {
                let error = format!(
                    "OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}",
                    request_uri,
                    response.status().as_u16(),
                    response.body()
                );
                return Err(OTelSdkError::InternalFailure(error));
            }
            Ok(())
        }
    }

    fn shutdown(&mut self) -> OTelSdkResult {
        let mut client_guard = self.client.lock().map_err(|e| {
            OTelSdkError::InternalFailure(format!("Failed to acquire client lock: {}", e))
        })?;

        if client_guard.take().is_none() {
            return Err(OTelSdkError::AlreadyShutdown);
        }

        Ok(())
    }

    fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
        self.resource = resource.into();
    }
}