CommonLibrary/Telemetry/
EmitOTLPSpan.rs1use std::{
8 collections::hash_map::DefaultHasher,
9 hash::{Hash, Hasher},
10 sync::{
11 OnceLock,
12 atomic::{AtomicBool, Ordering},
13 },
14 time::{SystemTime, UNIX_EPOCH},
15};
16
17use crate::Telemetry::{Client, IsAllowed};
18
19static OTLP_AVAILABLE:AtomicBool = AtomicBool::new(true);
20
21static OTLP_TRACE_ID:OnceLock<String> = OnceLock::new();
22
23fn NowNano() -> u64 {
24 SystemTime::now()
25 .duration_since(UNIX_EPOCH)
26 .map(|D| D.as_nanos() as u64)
27 .unwrap_or(0)
28}
29
30fn TraceId() -> &'static str {
31 OTLP_TRACE_ID.get_or_init(|| {
32 let mut H = DefaultHasher::new();
33
34 std::process::id().hash(&mut H);
35
36 NowNano().hash(&mut H);
37
38 format!("{:032x}", H.finish() as u128)
39 })
40}
41
42fn RandU64() -> u64 {
43 let mut H = DefaultHasher::new();
44
45 std::thread::current().id().hash(&mut H);
46
47 NowNano().hash(&mut H);
48
49 H.finish()
50}
51
52fn ParseEndpoint(Endpoint:&str) -> (String, String) {
53 let WithoutScheme = Endpoint
54 .strip_prefix("http://")
55 .or_else(|| Endpoint.strip_prefix("https://"))
56 .unwrap_or(Endpoint);
57
58 let (HostPort, Path) = match WithoutScheme.split_once('/') {
59 Some((HP, Rest)) => (HP.to_string(), format!("/{}", Rest.trim_start_matches('/'))),
60
61 None => (WithoutScheme.to_string(), "/v1/traces".to_string()),
62 };
63
64 let PathFinal = if Path == "/" { "/v1/traces".to_string() } else { Path };
65
66 (HostPort, PathFinal)
67}
68
69pub fn Fn(Name:&str, StartNano:u64, EndNano:u64, Attributes:&[(&str, &str)]) {
72 if !IsAllowed::OTLP() {
73 return;
74 }
75
76 if !OTLP_AVAILABLE.load(Ordering::Relaxed) {
77 return;
78 }
79
80 let Configuration = IsAllowed::Cached();
81
82 let TierStr = Client::TIER.get().map(|T| T.AsStr()).unwrap_or("common");
83
84 let SpanId = format!("{:016x}", RandU64());
85
86 let TraceIdString = TraceId().to_string();
87
88 let SpanName = Name.to_string();
89
90 let AttributesJson:Vec<String> = Attributes
91 .iter()
92 .map(|(K, V)| {
93 format!(
94 r#"{{"key":"{}","value":{{"stringValue":"{}"}}}}"#,
95 K,
96 V.replace('\\', "\\\\").replace('"', "\\\"")
97 )
98 })
99 .collect();
100
101 let IsError = SpanName.contains("error");
102
103 let StatusCode = if IsError { 2 } else { 1 };
104
105 let ServiceName = format!("land-editor-{}", TierStr);
106
107 let Payload = format!(
108 concat!(
109 r#"{{"resourceSpans":[{{"resource":{{"attributes":["#,
110 r#"{{"key":"service.name","value":{{"stringValue":"{}"}}}},"#,
111 r#"{{"key":"service.version","value":{{"stringValue":"0.0.1"}}}},"#,
112 r#"{{"key":"land.tier","value":{{"stringValue":"{}"}}}}"#,
113 r#"]}},"scopeSpans":[{{"scope":{{"name":"land.{}","version":"1.0.0"}},"#,
114 r#""spans":[{{"traceId":"{}","spanId":"{}","name":"{}","kind":1,"#,
115 r#""startTimeUnixNano":"{}","endTimeUnixNano":"{}","#,
116 r#""attributes":[{}],"status":{{"code":{}}}}}]}}]}}]}}"#,
117 ),
118 ServiceName,
119 TierStr,
120 TierStr,
121 TraceIdString,
122 SpanId,
123 SpanName,
124 StartNano,
125 EndNano,
126 AttributesJson.join(","),
127 StatusCode,
128 );
129
130 let (HostAddress, PathSegment) = ParseEndpoint(&Configuration.Pipe);
131
132 std::thread::spawn(move || {
133 use std::{
134 io::{Read as IoRead, Write as IoWrite},
135 net::TcpStream,
136 time::Duration,
137 };
138
139 let Ok(SocketAddress) = HostAddress.parse() else {
140 OTLP_AVAILABLE.store(false, Ordering::Relaxed);
141
142 return;
143 };
144
145 let Ok(mut Stream) = TcpStream::connect_timeout(&SocketAddress, Duration::from_millis(200)) else {
146 OTLP_AVAILABLE.store(false, Ordering::Relaxed);
147
148 return;
149 };
150
151 let _ = Stream.set_write_timeout(Some(Duration::from_millis(200)));
152
153 let _ = Stream.set_read_timeout(Some(Duration::from_millis(200)));
154
155 let HttpReq = format!(
156 "POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: \
157 close\r\n\r\n",
158 PathSegment,
159 HostAddress,
160 Payload.len()
161 );
162
163 if Stream.write_all(HttpReq.as_bytes()).is_err() {
164 return;
165 }
166
167 if Stream.write_all(Payload.as_bytes()).is_err() {
168 return;
169 }
170
171 let mut Buf = [0u8; 32];
172
173 let _ = Stream.read(&mut Buf);
174
175 if !(Buf.starts_with(b"HTTP/1.1 2") || Buf.starts_with(b"HTTP/1.0 2")) {
176 OTLP_AVAILABLE.store(false, Ordering::Relaxed);
177 }
178 });
179}
180
181pub fn NowNanoPub() -> u64 { NowNano() }