@@ -29,6 +29,32 @@ use crate::{
29
29
} ;
30
30
31
31
pub async fn process_queue_spans (
32
+ pipeline_runner : Arc < PipelineRunner > ,
33
+ db : Arc < DB > ,
34
+ cache : Arc < Cache > ,
35
+ semantic_search : Arc < SemanticSearch > ,
36
+ language_model_runner : Arc < LanguageModelRunner > ,
37
+ rabbitmq_connection : Arc < Connection > ,
38
+ clickhouse : clickhouse:: Client ,
39
+ chunker_runner : Arc < chunk:: runner:: ChunkerRunner > ,
40
+ ) {
41
+ loop {
42
+ inner_process_queue_spans (
43
+ pipeline_runner. clone ( ) ,
44
+ db. clone ( ) ,
45
+ cache. clone ( ) ,
46
+ semantic_search. clone ( ) ,
47
+ language_model_runner. clone ( ) ,
48
+ rabbitmq_connection. clone ( ) ,
49
+ clickhouse. clone ( ) ,
50
+ chunker_runner. clone ( ) ,
51
+ )
52
+ . await ;
53
+ log:: warn!( "Span listener exited. Creating a new RabbitMQ channel..." ) ;
54
+ }
55
+ }
56
+
57
+ async fn inner_process_queue_spans (
32
58
pipeline_runner : Arc < PipelineRunner > ,
33
59
db : Arc < DB > ,
34
60
cache : Arc < Cache > ,
@@ -171,7 +197,12 @@ pub async fn process_queue_spans(
171
197
}
172
198
173
199
if let Err ( e) = db:: spans:: record_span ( & db. pool , & span) . await {
174
- log:: error!( "Failed to record span [{}]: {:?}" , span. span_id, e) ;
200
+ log:: error!(
201
+ "Failed to record span. span_id [{}], project_id [{}]: {:?}" ,
202
+ span. span_id,
203
+ rabbitmq_span_message. project_id,
204
+ e
205
+ ) ;
175
206
} else {
176
207
// ack the message as soon as the span is recorded
177
208
let _ = delivery
@@ -184,7 +215,12 @@ pub async fn process_queue_spans(
184
215
// TODO: Queue batches and send them every 1-2 seconds
185
216
let insert_span_res = ch:: spans:: insert_span ( clickhouse. clone ( ) , & ch_span) . await ;
186
217
if let Err ( e) = insert_span_res {
187
- log:: error!( "Failed to insert span into Clickhouse: {:?}" , e) ;
218
+ log:: error!(
219
+ "Failed to insert span into Clickhouse. span_id [{}], project_id [{}]: {:?}" ,
220
+ span. span_id,
221
+ rabbitmq_span_message. project_id,
222
+ e
223
+ ) ;
188
224
}
189
225
190
226
let registered_label_classes = match get_registered_label_classes_for_path (
@@ -196,7 +232,11 @@ pub async fn process_queue_spans(
196
232
{
197
233
Ok ( classes) => classes,
198
234
Err ( e) => {
199
- log:: error!( "Failed to get registered label classes: {:?}" , e) ;
235
+ log:: error!(
236
+ "Failed to get registered label classes. project_id [{}]: {:?}" ,
237
+ rabbitmq_span_message. project_id,
238
+ e
239
+ ) ;
200
240
Vec :: new ( ) // Return an empty vector if there's an error
201
241
}
202
242
} ;
@@ -217,5 +257,5 @@ pub async fn process_queue_spans(
217
257
}
218
258
}
219
259
220
- log:: info !( "Shutting down span listener" ) ;
260
+ log:: warn !( "RabbitMQ closed connection. Shutting down span listener" ) ;
221
261
}
0 commit comments