Skip to main content

CommonLibrary/Transport/
TransportStrategy.rs

1//! # TransportStrategy Trait
2//!
3//! Defines the core trait that all transport implementations must implement.
4//! This trait provides a unified, transport-agnostic interface for sending
5//! requests and notifications, with optional event streaming capabilities.
6//!
7//! All transports must be async and thread-safe (`Send + Sync`).
8
9use async_trait::async_trait;
10use serde::{Deserialize, Serialize};
11
12use super::{
13	Common::TransportType,
14	TransportConfig::TransportConfig,
15	TransportError::TransportError,
16	UnifiedRequest::UnifiedRequest,
17	UnifiedResponse::UnifiedResponse,
18};
19
20/// Core transport strategy trait.
21///
22/// This trait defines the essential operations that any transport mechanism
23/// must provide. Components interact with transports through this trait,
24/// allowing them to be transport-agnostic.
25#[async_trait]
26pub trait TransportStrategy: Send + Sync {
27	/// Establishes a connection to the transport endpoint.
28	async fn Connect(&mut self) -> Result<(), TransportError>;
29
30	/// Closes the connection and releases any associated resources.
31	async fn Disconnect(&mut self) -> Result<(), TransportError>;
32
33	/// Sends a request and waits for a response.
34	async fn SendRequest(&mut self, Request:UnifiedRequest) -> Result<UnifiedResponse, TransportError>;
35
36	/// Sends a notification (fire-and-forget message).
37	async fn SendNotification(&mut self, Notification:UnifiedRequest) -> Result<(), TransportError>;
38
39	/// Creates a stream of events from the transport.
40	fn StreamEvents(&self)
41	-> std::result::Result<futures::stream::BoxStream<'static, UnifiedResponse>, TransportError>;
42
43	/// Checks if the transport is currently connected.
44	fn IsConnected(&self) -> bool;
45
46	/// Returns the estimated round-trip latency in milliseconds.
47	fn LatencyMilliseconds(&self) -> u64;
48
49	/// Returns the type of transport (gRPC, IPC, WASM, etc.).
50	fn TransportKind(&self) -> TransportType;
51
52	/// Returns the transport's configuration.
53	fn Configuration(&self) -> &TransportConfig;
54
55	/// Checks if the transport supports bidirectional streaming.
56	fn SupportsStreaming(&self) -> bool;
57
58	/// Returns the transport's current capabilities and limits.
59	fn Capabilities(&self) -> TransportCapabilities;
60
61	/// Collects and returns current performance metrics.
62	fn Metrics(&self) -> TransportMetrics;
63}
64
65/// Transport capabilities and limits.
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub struct TransportCapabilities {
68	/// Maximum size of a single message in bytes.
69	pub MaximumMessageSize:usize,
70
71	/// Whether the transport supports request-response pattern.
72	pub SupportsRequestResponse:bool,
73
74	/// Whether the transport supports server-side streaming.
75	pub SupportsServerStreaming:bool,
76
77	/// Whether the transport supports client-side streaming.
78	pub SupportsClientStreaming:bool,
79
80	/// Whether the transport supports bidirectional streaming.
81	pub SupportsBidirectionalStreaming:bool,
82
83	/// Whether the transport supports broadcast/notifications.
84	pub SupportsNotifications:bool,
85
86	/// Estimated maximum concurrent requests/connections.
87	pub MaximumConcurrent:usize,
88
89	/// Whether the transport requires network connectivity.
90	pub RequiresNetwork:bool,
91
92	/// Whether the transport supports encryption/TLS.
93	pub SupportsEncryption:bool,
94
95	/// Whether the transport supports compression.
96	pub SupportsCompression:bool,
97}
98
99impl Default for TransportCapabilities {
100	fn default() -> Self {
101		Self {
102			MaximumMessageSize:1024 * 1024, // 1MB
103
104			SupportsRequestResponse:true,
105
106			SupportsServerStreaming:false,
107
108			SupportsClientStreaming:false,
109
110			SupportsBidirectionalStreaming:false,
111
112			SupportsNotifications:true,
113
114			MaximumConcurrent:100,
115
116			RequiresNetwork:false,
117
118			SupportsEncryption:false,
119
120			SupportsCompression:false,
121		}
122	}
123}
124
125/// Transport performance metrics.
126#[derive(Debug, Clone, Default)]
127pub struct TransportMetrics {
128	/// Total number of requests sent (including retries).
129	pub RequestsTotal:u64,
130
131	/// Total number of successful requests (2xx/OK responses).
132	pub RequestsSuccessful:u64,
133
134	/// Total number of failed requests (excludes timeouts/retries).
135	pub RequestsFailed:u64,
136
137	/// Total number of notifications sent.
138	pub NotificationsSent:u64,
139
140	/// Total number of connections established (includes reconnections).
141	pub ConnectionsEstablished:u64,
142
143	/// Total number of connection failures.
144	pub ConnectionFailures:u64,
145
146	/// Total bytes sent (compressed size if compression enabled).
147	pub BytesSent:u64,
148
149	/// Total bytes received (compressed size if compression enabled).
150	pub BytesReceived:u64,
151
152	/// Counter for circuit breaker state changes.
153	pub CircuitBreakerState:u32,
154
155	/// Histogram of request latencies in milliseconds (p50, p95, p99).
156	/// Stored as (count, sum, sum of squares) for online variance calculation.
157	pub LatencyMillisecondsHistogram:Option<(u64, f64, f64)>,
158
159	/// Current active connections (gauge).
160	pub ActiveConnections:u32,
161
162	/// Current pending requests (gauge).
163	pub PendingRequests:u32,
164}
165
166impl TransportMetrics {
167	/// Creates a new, empty metrics container.
168	pub fn New() -> Self { Self::default() }
169
170	/// Resets all cumulative metrics to zero.
171	pub fn Reset(&mut self) { *self = Self::New(); }
172
173	/// Computes the success rate as a percentage (0-100).
174	pub fn SuccessRate(&self) -> Option<f64> {
175		let Total = self.RequestsTotal;
176
177		if Total == 0 {
178			None
179		} else {
180			Some((self.RequestsSuccessful as f64 / Total as f64) * 100.0)
181		}
182	}
183
184	/// Computes the average request latency in milliseconds.
185	pub fn AverageLatency(&self) -> Option<f64> {
186		let (Count, Sum, _) = self.LatencyMillisecondsHistogram?;
187
188		if Count == 0 { None } else { Some(Sum / Count as f64) }
189	}
190
191	/// Computes the 95th percentile latency from the histogram.
192	pub fn LatencyPercentile95(&self) -> Option<f64> {
193		let (Count, Mean, SumSquared) = self.LatencyMillisecondsHistogram?;
194
195		if Count < 20 {
196			return None;
197		}
198
199		let Variance = (SumSquared / Count as f64) - (Mean * Mean);
200
201		let StandardDeviation = Variance.sqrt();
202
203		Some(Mean + 1.645 * StandardDeviation)
204	}
205
206	/// Records a request latency sample.
207	pub fn RecordLatency(&mut self, LatencyMilliseconds:f64) {
208		let (Count, Sum, SumSquared) = self.LatencyMillisecondsHistogram.get_or_insert((0, 0.0, 0.0));
209
210		*Count += 1;
211		*Sum += LatencyMilliseconds;
212		*SumSquared += LatencyMilliseconds * LatencyMilliseconds;
213	}
214
215	/// Increments the RequestsTotal and RequestsSuccessful counters.
216	pub fn IncrementRequestSuccess(&mut self) {
217		self.RequestsTotal += 1;
218
219		self.RequestsSuccessful += 1;
220	}
221
222	/// Increments the RequestsTotal and RequestsFailed counters.
223	pub fn IncrementRequestFailure(&mut self) {
224		self.RequestsTotal += 1;
225
226		self.RequestsFailed += 1;
227	}
228
229	/// Updates the circuit breaker state.
230	pub fn SetCircuitBreakerState(&mut self, State:CircuitBreakerState) {
231		let StateCode = match State {
232			CircuitBreakerState::Closed => 1,
233
234			CircuitBreakerState::Open => 0,
235
236			CircuitBreakerState::HalfOpen => 2,
237		};
238
239		let OldState = self.CircuitBreakerState;
240
241		self.CircuitBreakerState = (OldState & 0xFFFF_0000) | StateCode as u32;
242	}
243}
244
245/// Circuit breaker state.
246#[derive(Debug, Clone, Copy, PartialEq, Eq)]
247pub enum CircuitBreakerState {
248	/// Circuit is closed; requests flow normally.
249	Closed,
250
251	/// Circuit is open; requests are rejected immediately.
252	Open,
253
254	/// Circuit is half-open; limited requests are allowed to test recovery.
255	HalfOpen,
256}
257
258/// Transport-specific error codes.
259#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
260#[repr(u16)]
261pub enum TransportErrorCode {
262	/// Connection to endpoint failed or was lost.
263	ConnectionFailed = 100,
264
265	/// Operation timed out.
266	Timeout = 101,
267
268	/// Target endpoint not found/service unavailable.
269	NotFound = 102,
270
271	/// Invalid request format or parameters.
272	InvalidRequest = 103,
273
274	/// Remote endpoint returned an application error.
275	RemoteError = 104,
276
277	/// Message too large for transport.
278	MessageTooLarge = 105,
279
280	/// Encryption/decryption failed.
281	EncryptionError = 106,
282
283	/// Serialization/deserialization failed.
284	SerializationError = 107,
285
286	/// Authentication/authorization failed.
287	Unauthorized = 108,
288
289	/// Rate limit exceeded.
290	RateLimited = 109,
291
292	/// Feature not supported by this transport.
293	NotSupported = 110,
294
295	/// Internal transport error (bug, corrupted state).
296	InternalError = 111,
297
298	/// Circuit breaker is open; request rejected.
299	CircuitBreakerOpen = 112,
300
301	/// Stream already in use or closed.
302	StreamError = 113,
303
304	/// Configuration error (invalid settings).
305	ConfigurationError = 114,
306}
307
308impl TransportErrorCode {
309	/// Returns `true` if this error code is retryable.
310	pub fn IsRetryable(&self) -> bool {
311		matches!(
312			self,
313			TransportErrorCode::ConnectionFailed
314				| TransportErrorCode::Timeout
315				| TransportErrorCode::RateLimited
316				| TransportErrorCode::RemoteError
317		)
318	}
319
320	/// Returns the recommended retry delay in milliseconds for this error.
321	pub fn RecommendedRetryDelayMilliseconds(&self) -> u64 {
322		match self {
323			TransportErrorCode::ConnectionFailed => 1000,
324
325			TransportErrorCode::Timeout => 500,
326
327			TransportErrorCode::RateLimited => 2000,
328
329			TransportErrorCode::RemoteError => 300,
330
331			_ => 0,
332		}
333	}
334}
335
336#[cfg(test)]
337mod tests {
338
339	use super::*;
340
341	#[test]
342	fn TestRetryableErrorCodes() {
343		assert!(TransportErrorCode::ConnectionFailed.IsRetryable());
344
345		assert!(TransportErrorCode::Timeout.IsRetryable());
346
347		assert!(TransportErrorCode::RateLimited.IsRetryable());
348
349		assert!(!TransportErrorCode::InvalidRequest.IsRetryable());
350
351		assert!(!TransportErrorCode::NotFound.IsRetryable());
352	}
353
354	#[test]
355	fn TestErrorRecommendedDelays() {
356		assert_eq!(TransportErrorCode::ConnectionFailed.RecommendedRetryDelayMilliseconds(), 1000);
357
358		assert_eq!(TransportErrorCode::Timeout.RecommendedRetryDelayMilliseconds(), 500);
359
360		assert_eq!(TransportErrorCode::RateLimited.RecommendedRetryDelayMilliseconds(), 2000);
361
362		assert_eq!(TransportErrorCode::InvalidRequest.RecommendedRetryDelayMilliseconds(), 0);
363	}
364}