Skip to main content

CommonLibrary/Telemetry/
EmitOTLPSpan.rs

1//! Fire-and-forget OTLP span exporter. Lifted from Mountain's
2//! `IPC/DevLog/EmitOTLPSpan` so Air / Echo / Rest / Grove / Mist /
3//! SideCar all share the same raw HTTP path. Single failed POST flips
4//! `OTLP_AVAILABLE` to false so a missing collector doesn't tax every
5//! emit. Release builds compile out via `cfg!(debug_assertions)`.
6
7use std::{
8	collections::hash_map::DefaultHasher,
9	hash::{Hash, Hasher},
10	sync::{
11		OnceLock,
12		atomic::{AtomicBool, Ordering},
13	},
14	time::{SystemTime, UNIX_EPOCH},
15};
16
17use crate::Telemetry::{Client, IsAllowed};
18
19static OTLP_AVAILABLE:AtomicBool = AtomicBool::new(true);
20
21static OTLP_TRACE_ID:OnceLock<String> = OnceLock::new();
22
23fn NowNano() -> u64 {
24	SystemTime::now()
25		.duration_since(UNIX_EPOCH)
26		.map(|D| D.as_nanos() as u64)
27		.unwrap_or(0)
28}
29
30fn TraceId() -> &'static str {
31	OTLP_TRACE_ID.get_or_init(|| {
32		let mut H = DefaultHasher::new();
33
34		std::process::id().hash(&mut H);
35
36		NowNano().hash(&mut H);
37
38		format!("{:032x}", H.finish() as u128)
39	})
40}
41
42fn RandU64() -> u64 {
43	let mut H = DefaultHasher::new();
44
45	std::thread::current().id().hash(&mut H);
46
47	NowNano().hash(&mut H);
48
49	H.finish()
50}
51
52fn ParseEndpoint(Endpoint:&str) -> (String, String) {
53	let WithoutScheme = Endpoint
54		.strip_prefix("http://")
55		.or_else(|| Endpoint.strip_prefix("https://"))
56		.unwrap_or(Endpoint);
57
58	let (HostPort, Path) = match WithoutScheme.split_once('/') {
59		Some((HP, Rest)) => (HP.to_string(), format!("/{}", Rest.trim_start_matches('/'))),
60
61		None => (WithoutScheme.to_string(), "/v1/traces".to_string()),
62	};
63
64	let PathFinal = if Path == "/" { "/v1/traces".to_string() } else { Path };
65
66	(HostPort, PathFinal)
67}
68
69/// Emit one span. `StartNano` / `EndNano` are wall-clock (not monotonic)
70/// nanosecond timestamps - use `NowNano()` from the caller's start.
71pub fn Fn(Name:&str, StartNano:u64, EndNano:u64, Attributes:&[(&str, &str)]) {
72	if !IsAllowed::OTLP() {
73		return;
74	}
75
76	if !OTLP_AVAILABLE.load(Ordering::Relaxed) {
77		return;
78	}
79
80	let Configuration = IsAllowed::Cached();
81
82	let TierStr = Client::TIER.get().map(|T| T.AsStr()).unwrap_or("common");
83
84	let SpanId = format!("{:016x}", RandU64());
85
86	let TraceIdString = TraceId().to_string();
87
88	let SpanName = Name.to_string();
89
90	let AttributesJson:Vec<String> = Attributes
91		.iter()
92		.map(|(K, V)| {
93			format!(
94				r#"{{"key":"{}","value":{{"stringValue":"{}"}}}}"#,
95				K,
96				V.replace('\\', "\\\\").replace('"', "\\\"")
97			)
98		})
99		.collect();
100
101	let IsError = SpanName.contains("error");
102
103	let StatusCode = if IsError { 2 } else { 1 };
104
105	let ServiceName = format!("land-editor-{}", TierStr);
106
107	let Payload = format!(
108		concat!(
109			r#"{{"resourceSpans":[{{"resource":{{"attributes":["#,
110			r#"{{"key":"service.name","value":{{"stringValue":"{}"}}}},"#,
111			r#"{{"key":"service.version","value":{{"stringValue":"0.0.1"}}}},"#,
112			r#"{{"key":"land.tier","value":{{"stringValue":"{}"}}}}"#,
113			r#"]}},"scopeSpans":[{{"scope":{{"name":"land.{}","version":"1.0.0"}},"#,
114			r#""spans":[{{"traceId":"{}","spanId":"{}","name":"{}","kind":1,"#,
115			r#""startTimeUnixNano":"{}","endTimeUnixNano":"{}","#,
116			r#""attributes":[{}],"status":{{"code":{}}}}}]}}]}}]}}"#,
117		),
118		ServiceName,
119		TierStr,
120		TierStr,
121		TraceIdString,
122		SpanId,
123		SpanName,
124		StartNano,
125		EndNano,
126		AttributesJson.join(","),
127		StatusCode,
128	);
129
130	let (HostAddress, PathSegment) = ParseEndpoint(&Configuration.Pipe);
131
132	std::thread::spawn(move || {
133		use std::{
134			io::{Read as IoRead, Write as IoWrite},
135			net::TcpStream,
136			time::Duration,
137		};
138
139		let Ok(SocketAddress) = HostAddress.parse() else {
140			OTLP_AVAILABLE.store(false, Ordering::Relaxed);
141
142			return;
143		};
144
145		let Ok(mut Stream) = TcpStream::connect_timeout(&SocketAddress, Duration::from_millis(200)) else {
146			OTLP_AVAILABLE.store(false, Ordering::Relaxed);
147
148			return;
149		};
150
151		let _ = Stream.set_write_timeout(Some(Duration::from_millis(200)));
152
153		let _ = Stream.set_read_timeout(Some(Duration::from_millis(200)));
154
155		let HttpReq = format!(
156			"POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: \
157			 close\r\n\r\n",
158			PathSegment,
159			HostAddress,
160			Payload.len()
161		);
162
163		if Stream.write_all(HttpReq.as_bytes()).is_err() {
164			return;
165		}
166
167		if Stream.write_all(Payload.as_bytes()).is_err() {
168			return;
169		}
170
171		let mut Buf = [0u8; 32];
172
173		let _ = Stream.read(&mut Buf);
174
175		if !(Buf.starts_with(b"HTTP/1.1 2") || Buf.starts_with(b"HTTP/1.0 2")) {
176			OTLP_AVAILABLE.store(false, Ordering::Relaxed);
177		}
178	});
179}
180
181/// Helper exposed to callers that need a span window timestamp.
182pub fn NowNanoPub() -> u64 { NowNano() }