1use async_trait::async_trait;
10use serde::{Deserialize, Serialize};
11
12use super::{
13 Common::TransportType,
14 TransportConfig::TransportConfig,
15 TransportError::TransportError,
16 UnifiedRequest::UnifiedRequest,
17 UnifiedResponse::UnifiedResponse,
18};
19
20#[async_trait]
26pub trait TransportStrategy: Send + Sync {
27 async fn Connect(&mut self) -> Result<(), TransportError>;
29
30 async fn Disconnect(&mut self) -> Result<(), TransportError>;
32
33 async fn SendRequest(&mut self, Request:UnifiedRequest) -> Result<UnifiedResponse, TransportError>;
35
36 async fn SendNotification(&mut self, Notification:UnifiedRequest) -> Result<(), TransportError>;
38
39 fn StreamEvents(&self)
41 -> std::result::Result<futures::stream::BoxStream<'static, UnifiedResponse>, TransportError>;
42
43 fn IsConnected(&self) -> bool;
45
46 fn LatencyMilliseconds(&self) -> u64;
48
49 fn TransportKind(&self) -> TransportType;
51
52 fn Configuration(&self) -> &TransportConfig;
54
55 fn SupportsStreaming(&self) -> bool;
57
58 fn Capabilities(&self) -> TransportCapabilities;
60
61 fn Metrics(&self) -> TransportMetrics;
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub struct TransportCapabilities {
68 pub MaximumMessageSize:usize,
70
71 pub SupportsRequestResponse:bool,
73
74 pub SupportsServerStreaming:bool,
76
77 pub SupportsClientStreaming:bool,
79
80 pub SupportsBidirectionalStreaming:bool,
82
83 pub SupportsNotifications:bool,
85
86 pub MaximumConcurrent:usize,
88
89 pub RequiresNetwork:bool,
91
92 pub SupportsEncryption:bool,
94
95 pub SupportsCompression:bool,
97}
98
99impl Default for TransportCapabilities {
100 fn default() -> Self {
101 Self {
102 MaximumMessageSize:1024 * 1024, SupportsRequestResponse:true,
105
106 SupportsServerStreaming:false,
107
108 SupportsClientStreaming:false,
109
110 SupportsBidirectionalStreaming:false,
111
112 SupportsNotifications:true,
113
114 MaximumConcurrent:100,
115
116 RequiresNetwork:false,
117
118 SupportsEncryption:false,
119
120 SupportsCompression:false,
121 }
122 }
123}
124
125#[derive(Debug, Clone, Default)]
127pub struct TransportMetrics {
128 pub RequestsTotal:u64,
130
131 pub RequestsSuccessful:u64,
133
134 pub RequestsFailed:u64,
136
137 pub NotificationsSent:u64,
139
140 pub ConnectionsEstablished:u64,
142
143 pub ConnectionFailures:u64,
145
146 pub BytesSent:u64,
148
149 pub BytesReceived:u64,
151
152 pub CircuitBreakerState:u32,
154
155 pub LatencyMillisecondsHistogram:Option<(u64, f64, f64)>,
158
159 pub ActiveConnections:u32,
161
162 pub PendingRequests:u32,
164}
165
166impl TransportMetrics {
167 pub fn New() -> Self { Self::default() }
169
170 pub fn Reset(&mut self) { *self = Self::New(); }
172
173 pub fn SuccessRate(&self) -> Option<f64> {
175 let Total = self.RequestsTotal;
176
177 if Total == 0 {
178 None
179 } else {
180 Some((self.RequestsSuccessful as f64 / Total as f64) * 100.0)
181 }
182 }
183
184 pub fn AverageLatency(&self) -> Option<f64> {
186 let (Count, Sum, _) = self.LatencyMillisecondsHistogram?;
187
188 if Count == 0 { None } else { Some(Sum / Count as f64) }
189 }
190
191 pub fn LatencyPercentile95(&self) -> Option<f64> {
193 let (Count, Mean, SumSquared) = self.LatencyMillisecondsHistogram?;
194
195 if Count < 20 {
196 return None;
197 }
198
199 let Variance = (SumSquared / Count as f64) - (Mean * Mean);
200
201 let StandardDeviation = Variance.sqrt();
202
203 Some(Mean + 1.645 * StandardDeviation)
204 }
205
206 pub fn RecordLatency(&mut self, LatencyMilliseconds:f64) {
208 let (Count, Sum, SumSquared) = self.LatencyMillisecondsHistogram.get_or_insert((0, 0.0, 0.0));
209
210 *Count += 1;
211 *Sum += LatencyMilliseconds;
212 *SumSquared += LatencyMilliseconds * LatencyMilliseconds;
213 }
214
215 pub fn IncrementRequestSuccess(&mut self) {
217 self.RequestsTotal += 1;
218
219 self.RequestsSuccessful += 1;
220 }
221
222 pub fn IncrementRequestFailure(&mut self) {
224 self.RequestsTotal += 1;
225
226 self.RequestsFailed += 1;
227 }
228
229 pub fn SetCircuitBreakerState(&mut self, State:CircuitBreakerState) {
231 let StateCode = match State {
232 CircuitBreakerState::Closed => 1,
233
234 CircuitBreakerState::Open => 0,
235
236 CircuitBreakerState::HalfOpen => 2,
237 };
238
239 let OldState = self.CircuitBreakerState;
240
241 self.CircuitBreakerState = (OldState & 0xFFFF_0000) | StateCode as u32;
242 }
243}
244
245#[derive(Debug, Clone, Copy, PartialEq, Eq)]
247pub enum CircuitBreakerState {
248 Closed,
250
251 Open,
253
254 HalfOpen,
256}
257
258#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
260#[repr(u16)]
261pub enum TransportErrorCode {
262 ConnectionFailed = 100,
264
265 Timeout = 101,
267
268 NotFound = 102,
270
271 InvalidRequest = 103,
273
274 RemoteError = 104,
276
277 MessageTooLarge = 105,
279
280 EncryptionError = 106,
282
283 SerializationError = 107,
285
286 Unauthorized = 108,
288
289 RateLimited = 109,
291
292 NotSupported = 110,
294
295 InternalError = 111,
297
298 CircuitBreakerOpen = 112,
300
301 StreamError = 113,
303
304 ConfigurationError = 114,
306}
307
308impl TransportErrorCode {
309 pub fn IsRetryable(&self) -> bool {
311 matches!(
312 self,
313 TransportErrorCode::ConnectionFailed
314 | TransportErrorCode::Timeout
315 | TransportErrorCode::RateLimited
316 | TransportErrorCode::RemoteError
317 )
318 }
319
320 pub fn RecommendedRetryDelayMilliseconds(&self) -> u64 {
322 match self {
323 TransportErrorCode::ConnectionFailed => 1000,
324
325 TransportErrorCode::Timeout => 500,
326
327 TransportErrorCode::RateLimited => 2000,
328
329 TransportErrorCode::RemoteError => 300,
330
331 _ => 0,
332 }
333 }
334}
335
336#[cfg(test)]
337mod tests {
338
339 use super::*;
340
341 #[test]
342 fn TestRetryableErrorCodes() {
343 assert!(TransportErrorCode::ConnectionFailed.IsRetryable());
344
345 assert!(TransportErrorCode::Timeout.IsRetryable());
346
347 assert!(TransportErrorCode::RateLimited.IsRetryable());
348
349 assert!(!TransportErrorCode::InvalidRequest.IsRetryable());
350
351 assert!(!TransportErrorCode::NotFound.IsRetryable());
352 }
353
354 #[test]
355 fn TestErrorRecommendedDelays() {
356 assert_eq!(TransportErrorCode::ConnectionFailed.RecommendedRetryDelayMilliseconds(), 1000);
357
358 assert_eq!(TransportErrorCode::Timeout.RecommendedRetryDelayMilliseconds(), 500);
359
360 assert_eq!(TransportErrorCode::RateLimited.RecommendedRetryDelayMilliseconds(), 2000);
361
362 assert_eq!(TransportErrorCode::InvalidRequest.RecommendedRetryDelayMilliseconds(), 0);
363 }
364}