Skip to content

Commit 0e1db5d

Browse files
authored
Merge pull request #680 from lmnr-ai/dev
CTEs, improved span processing, update models, support human evaluator, fix queue bug
2 parents 67200b4 + 20e740f commit 0e1db5d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+10220
-1178
lines changed

app-server/src/api/v1/evals.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@ use serde::{Deserialize, Serialize};
1414
use serde_json::Value;
1515
use uuid::Uuid;
1616

17-
#[derive(Debug, Serialize, Deserialize)]
17+
#[derive(Debug, Serialize, Deserialize, Default)]
1818
#[serde(rename_all = "camelCase")]
1919
pub struct InitEvalRequest {
2020
pub name: Option<String>,
2121
pub group_name: Option<String>,
22+
#[serde(default)]
23+
pub metadata: Option<Value>,
2224
}
2325

2426
#[post("/evals")]
@@ -31,15 +33,16 @@ pub async fn init_eval(
3133
let req = req.into_inner();
3234
let group_name = req.group_name.unwrap_or("default".to_string());
3335
let project_id = project_api_key.project_id;
34-
36+
let metadata = req.metadata;
3537
let name = if let Some(name) = req.name {
3638
name
3739
} else {
3840
name_generator.next().await
3941
};
4042

4143
let evaluation =
42-
db::evaluations::create_evaluation(&db.pool, &name, project_id, &group_name).await?;
44+
db::evaluations::create_evaluation(&db.pool, &name, project_id, &group_name, &metadata)
45+
.await?;
4346

4447
Ok(HttpResponse::Ok().json(evaluation))
4548
}

app-server/src/db/evaluations.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub struct Evaluation {
1818
///
1919
/// Conceptually, evaluations with different group ids are used to test different features.
2020
pub group_id: String,
21+
pub metadata: Option<Value>,
2122
}
2223

2324
#[derive(Serialize, FromRow)]
@@ -52,20 +53,23 @@ pub async fn create_evaluation(
5253
name: &String,
5354
project_id: Uuid,
5455
group_id: &str,
56+
metadata: &Option<Value>,
5557
) -> Result<Evaluation> {
5658
let evaluation = sqlx::query_as::<_, Evaluation>(
57-
"INSERT INTO evaluations (name, project_id, group_id)
58-
VALUES ($1, $2, $3)
59+
"INSERT INTO evaluations (name, project_id, group_id, metadata)
60+
VALUES ($1, $2, $3, $4)
5961
RETURNING
6062
id,
6163
created_at,
6264
name,
6365
project_id,
64-
group_id",
66+
group_id,
67+
metadata",
6568
)
6669
.bind(name)
6770
.bind(project_id)
6871
.bind(group_id)
72+
.bind(metadata)
6973
.fetch_one(pool)
7074
.await?;
7175

app-server/src/main.rs

Lines changed: 22 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -169,11 +169,11 @@ fn main() -> anyhow::Result<()> {
169169

170170
let connection_for_health = connection.clone(); // Clone before moving into HttpServer
171171

172-
// ==== 3.1 Spans message queue ====
173-
let spans_message_queue: Arc<MessageQueue> = if let Some(connection) = connection.as_ref() {
172+
let queue: Arc<MessageQueue> = if let Some(connection) = connection.as_ref() {
174173
runtime_handle.block_on(async {
175174
let channel = connection.create_channel().await.unwrap();
176-
175+
// Register queues
176+
// ==== 3.1 Spans message queue ====
177177
channel
178178
.exchange_declare(
179179
OBSERVATIONS_EXCHANGE,
@@ -193,28 +193,7 @@ fn main() -> anyhow::Result<()> {
193193
.await
194194
.unwrap();
195195

196-
let max_channel_pool_size = env::var("RABBITMQ_MAX_CHANNEL_POOL_SIZE")
197-
.ok()
198-
.and_then(|v| v.parse().ok())
199-
.unwrap_or(64);
200-
201-
log::info!("RabbitMQ span channels: {}", max_channel_pool_size);
202-
203-
let rabbit_mq = mq::rabbit::RabbitMQ::new(connection.clone(), max_channel_pool_size);
204-
Arc::new(rabbit_mq.into())
205-
})
206-
} else {
207-
log::info!("Using tokio mpsc span queue");
208-
Arc::new(mq::tokio_mpsc::TokioMpscQueue::new().into())
209-
};
210-
211-
// ==== 3.2 Browser events message queue ====
212-
let browser_events_message_queue: Arc<MessageQueue> = if let Some(connection) =
213-
connection.as_ref()
214-
{
215-
runtime_handle.block_on(async {
216-
let channel = connection.create_channel().await.unwrap();
217-
196+
// ==== 3.2 Browser events message queue ====
218197
channel
219198
.exchange_declare(
220199
BROWSER_SESSIONS_EXCHANGE,
@@ -234,30 +213,7 @@ fn main() -> anyhow::Result<()> {
234213
.await
235214
.unwrap();
236215

237-
let max_channel_pool_size = env::var("RABBITMQ_MAX_CHANNEL_POOL_SIZE")
238-
.ok()
239-
.and_then(|v| v.parse().ok())
240-
.unwrap_or(64);
241-
242-
log::info!(
243-
"RabbitMQ browser events channels: {}",
244-
max_channel_pool_size
245-
);
246-
247-
let rabbit_mq = mq::rabbit::RabbitMQ::new(connection.clone(), max_channel_pool_size);
248-
Arc::new(rabbit_mq.into())
249-
})
250-
} else {
251-
log::info!("Using tokio mpsc browser events queue");
252-
Arc::new(mq::tokio_mpsc::TokioMpscQueue::new().into())
253-
};
254-
255-
// ==== 3.3 Evaluators message queue ====
256-
let evaluators_message_queue: Arc<MessageQueue> = if let Some(connection) = connection.as_ref()
257-
{
258-
runtime_handle.block_on(async {
259-
let channel = connection.create_channel().await.unwrap();
260-
216+
// ==== 3.3 Evaluators message queue ====
261217
channel
262218
.exchange_declare(
263219
EVALUATORS_EXCHANGE,
@@ -282,14 +238,22 @@ fn main() -> anyhow::Result<()> {
282238
.and_then(|v| v.parse().ok())
283239
.unwrap_or(64);
284240

285-
log::info!("RabbitMQ evaluators channels: {}", max_channel_pool_size);
241+
log::info!("RabbitMQ channels: {}", max_channel_pool_size);
286242

287243
let rabbit_mq = mq::rabbit::RabbitMQ::new(connection.clone(), max_channel_pool_size);
288244
Arc::new(rabbit_mq.into())
289245
})
290246
} else {
291-
log::info!("Using tokio mpsc evaluators queue");
292-
Arc::new(mq::tokio_mpsc::TokioMpscQueue::new().into())
247+
let queue = mq::tokio_mpsc::TokioMpscQueue::new();
248+
// register queues
249+
// ==== 3.1 Spans message queue ====
250+
queue.register_queue(OBSERVATIONS_EXCHANGE, OBSERVATIONS_QUEUE);
251+
// ==== 3.2 Browser events message queue ====
252+
queue.register_queue(BROWSER_SESSIONS_EXCHANGE, BROWSER_SESSIONS_QUEUE);
253+
// ==== 3.3 Evaluators message queue ====
254+
queue.register_queue(EVALUATORS_EXCHANGE, EVALUATORS_QUEUE);
255+
log::info!("Using tokio mpsc queue");
256+
Arc::new(queue.into())
293257
};
294258

295259
// ==== 3.4 Agent worker message queue ====
@@ -298,7 +262,7 @@ fn main() -> anyhow::Result<()> {
298262
let runtime_handle_for_http = runtime_handle.clone();
299263
let db_for_http = db.clone();
300264
let cache_for_http = cache.clone();
301-
let spans_mq_for_http = spans_message_queue.clone();
265+
let mq_for_http = queue.clone();
302266

303267
// == HTTP server and listener workers ==
304268
let http_server_handle = thread::Builder::new()
@@ -456,8 +420,7 @@ fn main() -> anyhow::Result<()> {
456420
tokio::spawn(process_queue_spans(
457421
db_for_http.clone(),
458422
cache_for_http.clone(),
459-
spans_mq_for_http.clone(),
460-
evaluators_message_queue.clone(),
423+
mq_for_http.clone(),
461424
clickhouse.clone(),
462425
storage.clone(),
463426
));
@@ -467,15 +430,15 @@ fn main() -> anyhow::Result<()> {
467430
tokio::spawn(process_browser_events(
468431
db_for_http.clone(),
469432
clickhouse.clone(),
470-
browser_events_message_queue.clone(),
433+
mq_for_http.clone(),
471434
));
472435
}
473436

474437
for _ in 0..num_evaluators_workers_per_thread {
475438
tokio::spawn(process_evaluators(
476439
db_for_http.clone(),
477440
clickhouse.clone(),
478-
evaluators_message_queue.clone(),
441+
mq_for_http.clone(),
479442
evaluator_client.clone(),
480443
python_online_evaluator_url.clone(),
481444
));
@@ -488,13 +451,11 @@ fn main() -> anyhow::Result<()> {
488451
.app_data(PayloadConfig::new(http_payload_limit))
489452
.app_data(web::Data::from(cache_for_http.clone()))
490453
.app_data(web::Data::from(db_for_http.clone()))
491-
.app_data(web::Data::new(spans_mq_for_http.clone()))
454+
.app_data(web::Data::new(mq_for_http.clone()))
492455
.app_data(web::Data::new(clickhouse.clone()))
493456
.app_data(web::Data::new(name_generator.clone()))
494457
.app_data(web::Data::new(storage.clone()))
495458
.app_data(web::Data::new(machine_manager.clone()))
496-
.app_data(web::Data::new(browser_events_message_queue.clone()))
497-
.app_data(web::Data::new(evaluators_message_queue.clone()))
498459
.app_data(web::Data::new(agent_manager_workers.clone()))
499460
.app_data(web::Data::new(connection_for_health.clone()))
500461
.app_data(web::Data::new(browser_agent.clone()))
@@ -570,7 +531,7 @@ fn main() -> anyhow::Result<()> {
570531
let process_traces_service = ProcessTracesService::new(
571532
db.clone(),
572533
cache.clone(),
573-
spans_message_queue.clone(),
534+
queue.clone(),
574535
);
575536

576537
Server::builder()

app-server/src/mq/rabbit.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use deadpool::managed::{Manager, Pool, PoolError, RecycleError};
22
use futures_util::StreamExt;
33
use lapin::{
4+
BasicProperties, Channel, Connection, Consumer,
45
acker::Acker,
56
options::{BasicConsumeOptions, BasicPublishOptions, QueueBindOptions},
67
types::FieldTable,
7-
BasicProperties, Channel, Connection, Consumer,
88
};
99
use std::sync::Arc;
1010

app-server/src/mq/tokio_mpsc.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use super::{
55
use dashmap::DashMap;
66
use std::sync::Arc;
77
use tokio::sync::{
8-
mpsc::{self, Receiver, Sender},
98
Mutex,
9+
mpsc::{self, Receiver, Sender},
1010
};
1111

1212
const CHANNEL_CAPACITY: usize = 100;
@@ -53,6 +53,11 @@ impl TokioMpscQueue {
5353
fn key(&self, exchange: &str, routing_key: &str) -> String {
5454
format!("{}:-:{}", exchange, routing_key)
5555
}
56+
57+
pub fn register_queue(&self, exchange: &str, routing_key: &str) {
58+
let key = self.key(exchange, routing_key);
59+
self.senders.entry(key).or_default();
60+
}
5661
}
5762

5863
impl MessageQueueTrait for TokioMpscQueue {

app-server/src/traces/consumer.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ pub async fn process_queue_spans(
2121
db: Arc<DB>,
2222
cache: Arc<Cache>,
2323
queue: Arc<MessageQueue>,
24-
evaluators_queue: Arc<MessageQueue>,
2524
clickhouse: clickhouse::Client,
2625
storage: Arc<Storage>,
2726
) {
@@ -30,7 +29,6 @@ pub async fn process_queue_spans(
3029
db.clone(),
3130
cache.clone(),
3231
queue.clone(),
33-
evaluators_queue.clone(),
3432
clickhouse.clone(),
3533
storage.clone(),
3634
)
@@ -43,7 +41,6 @@ async fn inner_process_queue_spans(
4341
db: Arc<DB>,
4442
cache: Arc<Cache>,
4543
queue: Arc<MessageQueue>,
46-
evaluators_queue: Arc<MessageQueue>,
4744
clickhouse: clickhouse::Client,
4845
storage: Arc<Storage>,
4946
) {
@@ -142,7 +139,7 @@ async fn inner_process_queue_spans(
142139
clickhouse.clone(),
143140
cache.clone(),
144141
acker,
145-
evaluators_queue.clone(),
142+
queue.clone(),
146143
)
147144
.await;
148145
}

app-server/src/traces/spans.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -938,11 +938,26 @@ fn output_message_from_completion_content(
938938

939939
if tool_calls.is_empty() {
940940
if let Some(Value::String(s)) = msg_content {
941-
Some(ChatMessage {
942-
role: msg_role,
943-
content: ChatMessageContent::Text(s.clone()),
944-
tool_call_id: None,
945-
})
941+
if let Ok(content) =
942+
serde_json::from_str::<Vec<InstrumentationChatMessageContentPart>>(&s)
943+
{
944+
Some(ChatMessage {
945+
role: msg_role,
946+
content: ChatMessageContent::ContentPartList(
947+
content
948+
.into_iter()
949+
.map(ChatMessageContentPart::from_instrumentation_content_part)
950+
.collect(),
951+
),
952+
tool_call_id: None,
953+
})
954+
} else {
955+
Some(ChatMessage {
956+
role: msg_role,
957+
content: ChatMessageContent::Text(s.clone()),
958+
tool_call_id: None,
959+
})
960+
}
946961
} else {
947962
None
948963
}

0 commit comments

Comments
 (0)