CommonLibrary/Telemetry/
EmitOTLPSpan.rs1#![allow(non_snake_case)]
2
3use std::{
10 collections::hash_map::DefaultHasher,
11 hash::{Hash, Hasher},
12 sync::{
13 OnceLock,
14 atomic::{AtomicBool, Ordering},
15 },
16 time::{SystemTime, UNIX_EPOCH},
17};
18
19use crate::Telemetry::{Client, IsAllowed};
20
21static OTLP_AVAILABLE:AtomicBool = AtomicBool::new(true);
22static OTLP_TRACE_ID:OnceLock<String> = OnceLock::new();
23
24fn NowNano() -> u64 {
25 SystemTime::now()
26 .duration_since(UNIX_EPOCH)
27 .map(|D| D.as_nanos() as u64)
28 .unwrap_or(0)
29}
30
31fn TraceId() -> &'static str {
32 OTLP_TRACE_ID.get_or_init(|| {
33 let mut H = DefaultHasher::new();
34 std::process::id().hash(&mut H);
35 NowNano().hash(&mut H);
36 format!("{:032x}", H.finish() as u128)
37 })
38}
39
40fn RandU64() -> u64 {
41 let mut H = DefaultHasher::new();
42 std::thread::current().id().hash(&mut H);
43 NowNano().hash(&mut H);
44 H.finish()
45}
46
47fn ParseEndpoint(Endpoint:&str) -> (String, String) {
48 let WithoutScheme = Endpoint
49 .strip_prefix("http://")
50 .or_else(|| Endpoint.strip_prefix("https://"))
51 .unwrap_or(Endpoint);
52 let (HostPort, Path) = match WithoutScheme.split_once('/') {
53 Some((HP, Rest)) => (HP.to_string(), format!("/{}", Rest.trim_start_matches('/'))),
54 None => (WithoutScheme.to_string(), "/v1/traces".to_string()),
55 };
56 let PathFinal = if Path == "/" { "/v1/traces".to_string() } else { Path };
57 (HostPort, PathFinal)
58}
59
60pub fn Fn(Name:&str, StartNano:u64, EndNano:u64, Attributes:&[(&str, &str)]) {
63 if !IsAllowed::OTLP() {
64 return;
65 }
66 if !OTLP_AVAILABLE.load(Ordering::Relaxed) {
67 return;
68 }
69
70 let Configuration = IsAllowed::Cached();
71 let TierStr = Client::TIER.get().map(|T| T.AsStr()).unwrap_or("common");
72
73 let SpanId = format!("{:016x}", RandU64());
74 let TraceIdString = TraceId().to_string();
75 let SpanName = Name.to_string();
76
77 let AttributesJson:Vec<String> = Attributes
78 .iter()
79 .map(|(K, V)| {
80 format!(
81 r#"{{"key":"{}","value":{{"stringValue":"{}"}}}}"#,
82 K,
83 V.replace('\\', "\\\\").replace('"', "\\\"")
84 )
85 })
86 .collect();
87
88 let IsError = SpanName.contains("error");
89 let StatusCode = if IsError { 2 } else { 1 };
90 let ServiceName = format!("land-editor-{}", TierStr);
91 let Payload = format!(
92 concat!(
93 r#"{{"resourceSpans":[{{"resource":{{"attributes":["#,
94 r#"{{"key":"service.name","value":{{"stringValue":"{}"}}}},"#,
95 r#"{{"key":"service.version","value":{{"stringValue":"0.0.1"}}}},"#,
96 r#"{{"key":"land.tier","value":{{"stringValue":"{}"}}}}"#,
97 r#"]}},"scopeSpans":[{{"scope":{{"name":"land.{}","version":"1.0.0"}},"#,
98 r#""spans":[{{"traceId":"{}","spanId":"{}","name":"{}","kind":1,"#,
99 r#""startTimeUnixNano":"{}","endTimeUnixNano":"{}","#,
100 r#""attributes":[{}],"status":{{"code":{}}}}}]}}]}}]}}"#,
101 ),
102 ServiceName,
103 TierStr,
104 TierStr,
105 TraceIdString,
106 SpanId,
107 SpanName,
108 StartNano,
109 EndNano,
110 AttributesJson.join(","),
111 StatusCode,
112 );
113
114 let (HostAddress, PathSegment) = ParseEndpoint(&Configuration.OTLPEndpoint);
115
116 std::thread::spawn(move || {
117 use std::{
118 io::{Read as IoRead, Write as IoWrite},
119 net::TcpStream,
120 time::Duration,
121 };
122
123 let Ok(SocketAddress) = HostAddress.parse() else {
124 OTLP_AVAILABLE.store(false, Ordering::Relaxed);
125 return;
126 };
127 let Ok(mut Stream) = TcpStream::connect_timeout(&SocketAddress, Duration::from_millis(200)) else {
128 OTLP_AVAILABLE.store(false, Ordering::Relaxed);
129 return;
130 };
131 let _ = Stream.set_write_timeout(Some(Duration::from_millis(200)));
132 let _ = Stream.set_read_timeout(Some(Duration::from_millis(200)));
133
134 let HttpReq = format!(
135 "POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: \
136 close\r\n\r\n",
137 PathSegment,
138 HostAddress,
139 Payload.len()
140 );
141 if Stream.write_all(HttpReq.as_bytes()).is_err() {
142 return;
143 }
144 if Stream.write_all(Payload.as_bytes()).is_err() {
145 return;
146 }
147 let mut Buf = [0u8; 32];
148 let _ = Stream.read(&mut Buf);
149 if !(Buf.starts_with(b"HTTP/1.1 2") || Buf.starts_with(b"HTTP/1.0 2")) {
150 OTLP_AVAILABLE.store(false, Ordering::Relaxed);
151 }
152 });
153}
154
155pub fn NowNanoPub() -> u64 { NowNano() }