Skip to content

Commit 5484687

Browse files
authored
Merge pull request #782 from lmnr-ai/dev
optimized browser events format, fix to clickhouse span payloads, migrate some tables to clickhouse, AI SDK v5 support
2 parents 78bbbe7 + 14c5e0c commit 5484687

File tree

151 files changed

+14955
-5153
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

151 files changed

+14955
-5153
lines changed

app-server/.env.example

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,5 @@ ENVIRONMENT=FULL
2020
# Optional, if you want to use Redis for caching. Create a Redis/Valkey instance and set the URL here.
2121
# REDIS_URL=redis://localhost:6379
2222

23+
24+
QUERY_ENGINE_URL=

app-server/build.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,15 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
88
.out_dir("./src/agent_manager/")
99
.compile_protos(&[proto_file], &["proto"])?;
1010

11+
let query_engine_proto_file = "./proto/query_engine.proto";
12+
13+
tonic_build::configure()
14+
.protoc_arg("--experimental_allow_proto3_optional") // for older systems
15+
.build_client(true)
16+
.build_server(false)
17+
.out_dir("./src/query_engine/")
18+
.compile_protos(&[query_engine_proto_file], &["proto"])?;
19+
1120
tonic_build::configure()
1221
.protoc_arg("--experimental_allow_proto3_optional") // for older systems
1322
.build_client(false)

app-server/proto/query_engine.proto

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
syntax = "proto3";
2+
3+
package query_engine;
4+
5+
message QueryRequest {
6+
string query = 1;
7+
string project_id = 2;
8+
}
9+
10+
message QueryResponse {
11+
oneof result {
12+
SuccessResponse success = 1;
13+
ErrorResponse error = 2;
14+
}
15+
}
16+
17+
message SuccessResponse {
18+
string query = 1;
19+
bool success = 2;
20+
}
21+
22+
message ErrorResponse {
23+
string error = 1;
24+
}
25+
26+
service QueryEngineService {
27+
rpc ValidateQuery(QueryRequest) returns (QueryResponse);
28+
}

app-server/src/api/v1/agent.rs

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
use std::sync::Arc;
22

3-
use actix_web::{post, web, HttpResponse};
3+
use actix_web::{HttpResponse, post, web};
44
use futures_util::StreamExt;
55
use serde::Deserialize;
66
use uuid::Uuid;
77

8+
use crate::agent_manager::RunAgentParams;
89
use crate::agent_manager::channel::AgentManagerWorkers;
910
use crate::agent_manager::types::RunAgentResponseStreamChunk;
10-
use crate::agent_manager::worker::{run_agent_worker, RunAgentWorkerOptions};
11-
use crate::agent_manager::RunAgentParams;
12-
use crate::agent_manager::{types::ModelProvider, AgentManager, AgentManagerTrait};
11+
use crate::agent_manager::worker::{RunAgentWorkerOptions, run_agent_worker};
12+
use crate::agent_manager::{AgentManager, AgentManagerTrait, types::ModelProvider};
1313
use crate::cache::Cache;
1414
use crate::db::project_api_keys::ProjectApiKey;
1515
use crate::db::{self, DB};
16-
use crate::features::{is_feature_enabled, Feature};
16+
use crate::features::{Feature, is_feature_enabled};
1717
use crate::routes::types::ResponseResult;
1818
use crate::traces::limits::get_workspace_limit_exceeded_by_project_id;
1919

@@ -62,6 +62,7 @@ pub async fn run_agent_manager(
6262
agent_manager: web::Data<Arc<AgentManager>>,
6363
worker_states: web::Data<Arc<AgentManagerWorkers>>,
6464
db: web::Data<DB>,
65+
clickhouse: web::Data<clickhouse::Client>,
6566
project_api_key: ProjectApiKey,
6667
cache: web::Data<Cache>,
6768
request: web::Json<RunAgentRequest>,
@@ -76,6 +77,7 @@ pub async fn run_agent_manager(
7677
if is_feature_enabled(Feature::UsageLimit) {
7778
match get_workspace_limit_exceeded_by_project_id(
7879
db.clone(),
80+
clickhouse.into_inner().as_ref().clone(),
7981
cache.clone(),
8082
project_api_key.project_id,
8183
)
@@ -128,7 +130,6 @@ pub async fn run_agent_manager(
128130
return_agent_state: request.return_agent_state,
129131
return_storage_state: request.return_storage_state,
130132
};
131-
let pool = db.pool.clone();
132133
let worker_states_clone = worker_states.clone();
133134
let handle = tokio::spawn(async move {
134135
run_agent_worker(
@@ -149,13 +150,6 @@ pub async fn run_agent_manager(
149150
while let Some(message) = receiver.recv().await {
150151
match message {
151152
Ok(agent_chunk) => {
152-
if let Err(e) =
153-
db::stats::add_agent_steps_to_project_usage_stats(&pool, &project_api_key.project_id, 1)
154-
.await
155-
{
156-
log::error!("Error adding agent steps to project usage stats: {}", e);
157-
}
158-
159153
match agent_chunk {
160154
RunAgentResponseStreamChunk::FinalOutput(_) => {
161155
yield anyhow::Ok(agent_chunk.into());
@@ -222,15 +216,6 @@ pub async fn run_agent_manager(
222216
match fut.await {
223217
Ok(response) => {
224218
let response = response?;
225-
if let Err(e) = db::stats::add_agent_steps_to_project_usage_stats(
226-
&db.pool,
227-
&project_api_key.project_id,
228-
response.step_count.unwrap_or(0) as i64,
229-
)
230-
.await
231-
{
232-
log::error!("Error adding agent steps to project usage stats: {}", e);
233-
}
234219
Ok(HttpResponse::Ok().json(response))
235220
}
236221
Err(e) if e.is_cancelled() => Ok(HttpResponse::NoContent().finish()),

app-server/src/api/v1/browser_sessions.rs

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::sync::Arc;
22

3-
use actix_web::{HttpResponse, options, post, web};
4-
use serde::{Deserialize, Serialize};
3+
use actix_web::{HttpResponse, post, web};
4+
use serde::{Deserialize, Deserializer, Serialize};
55
use uuid::Uuid;
66

77
use crate::{
@@ -15,18 +15,42 @@ pub const BROWSER_SESSIONS_QUEUE: &str = "browser_sessions_queue";
1515
pub const BROWSER_SESSIONS_EXCHANGE: &str = "browser_sessions_exchange";
1616
pub const BROWSER_SESSIONS_ROUTING_KEY: &str = "browser_sessions_routing_key";
1717

18+
// Custom deserializer for the data field to support both Vec<u8> and base64 string
19+
fn deserialize_data<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
20+
where
21+
D: Deserializer<'de>,
22+
{
23+
use serde::de::Error;
24+
25+
#[derive(Deserialize)]
26+
#[serde(untagged)]
27+
enum DataFormat {
28+
Bytes(Vec<u8>),
29+
Base64String(String),
30+
}
31+
32+
match DataFormat::deserialize(deserializer)? {
33+
DataFormat::Bytes(bytes) => Ok(bytes),
34+
DataFormat::Base64String(s) => {
35+
base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &s)
36+
.map_err(|e| Error::custom(format!("Invalid base64: {}", e)))
37+
}
38+
}
39+
}
40+
1841
#[derive(Debug, Serialize, Deserialize, Clone)]
1942
pub struct RRWebEvent {
2043
#[serde(rename = "type")]
2144
pub event_type: u8,
22-
pub timestamp: i64,
45+
pub timestamp: f64, // milliseconds
46+
#[serde(deserialize_with = "deserialize_data")]
2347
pub data: Vec<u8>,
2448
}
2549

2650
impl RRWebEvent {
2751
pub fn estimate_size_bytes(&self) -> usize {
2852
// 1 byte for event_type, 8 bytes for timestamp
29-
return 9 + self.data.len();
53+
9 + self.data.len()
3054
}
3155
}
3256

@@ -42,19 +66,6 @@ pub struct EventBatch {
4266
pub sdk_version: Option<String>,
4367
}
4468

45-
#[options("events")]
46-
async fn options_handler() -> ResponseResult {
47-
Ok(HttpResponse::Ok()
48-
.insert_header(("Access-Control-Allow-Origin", "*"))
49-
.insert_header(("Access-Control-Allow-Methods", "POST, OPTIONS"))
50-
.insert_header((
51-
"Access-Control-Allow-Headers",
52-
"Authorization, Content-Type, Content-Encoding, Accept",
53-
))
54-
.insert_header(("Access-Control-Max-Age", "86400"))
55-
.finish())
56-
}
57-
5869
#[post("events")]
5970
async fn create_session_event(
6071
batch: web::Json<EventBatch>,

app-server/src/api/v1/datasets.rs

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
use actix_web::{HttpResponse, get, web};
1+
use actix_web::{HttpResponse, get, post, web};
22
use serde::Deserialize;
3+
use uuid::Uuid;
34

45
use crate::{
56
ch::datapoints as ch_datapoints,
@@ -58,3 +59,73 @@ async fn get_datapoints(
5859

5960
Ok(HttpResponse::Ok().json(response))
6061
}
62+
63+
#[derive(Deserialize)]
64+
pub struct CreateDatapointsRequest {
65+
pub dataset_name: String,
66+
pub datapoints: Vec<CreateDatapointRequest>,
67+
}
68+
69+
#[derive(Deserialize)]
70+
pub struct CreateDatapointRequest {
71+
pub data: serde_json::Value,
72+
pub target: Option<serde_json::Value>,
73+
#[serde(default)]
74+
pub metadata: std::collections::HashMap<String, serde_json::Value>,
75+
}
76+
77+
/// Create datapoints in a dataset
78+
///
79+
/// Request body should contain:
80+
/// - dataset_name: The name of the dataset to add datapoints to
81+
/// - datapoints: Array of datapoint objects with data, optional target, and optional metadata
82+
#[post("/datasets/datapoints")]
83+
async fn create_datapoints(
84+
req: web::Json<CreateDatapointsRequest>,
85+
db: web::Data<DB>,
86+
clickhouse: web::Data<clickhouse::Client>,
87+
project_api_key: ProjectApiKey,
88+
) -> ResponseResult {
89+
let project_id = project_api_key.project_id;
90+
let db = db.into_inner();
91+
let clickhouse = clickhouse.into_inner().as_ref().clone();
92+
let request = req.into_inner();
93+
94+
// Validate that we have datapoints to insert
95+
if request.datapoints.is_empty() {
96+
return Ok(HttpResponse::BadRequest().json(serde_json::json!({
97+
"error": "No datapoints provided"
98+
})));
99+
}
100+
101+
// Get dataset metadata from PostgreSQL
102+
let dataset_id =
103+
db::datasets::get_dataset_id_by_name(&db.pool, &request.dataset_name, project_id).await?;
104+
105+
// Convert request datapoints to Datapoint structs
106+
let datapoints: Vec<Datapoint> = request
107+
.datapoints
108+
.into_iter()
109+
.map(|dp_req| Datapoint {
110+
id: Uuid::new_v4(),
111+
dataset_id,
112+
data: dp_req.data,
113+
target: dp_req.target,
114+
metadata: dp_req.metadata,
115+
})
116+
.collect();
117+
118+
// Convert to ClickHouse datapoints
119+
let ch_datapoints: Vec<ch_datapoints::CHDatapoint> = datapoints
120+
.iter()
121+
.map(|dp| ch_datapoints::CHDatapoint::from_datapoint(dp, project_id))
122+
.collect();
123+
124+
// Insert into ClickHouse
125+
ch_datapoints::insert_datapoints(clickhouse, ch_datapoints).await?;
126+
127+
Ok(HttpResponse::Created().json(serde_json::json!({
128+
"message": "Datapoints created successfully",
129+
"count": datapoints.len()
130+
})))
131+
}

app-server/src/api/v1/evals.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ pub async fn update_eval_datapoint(
108108
.await?;
109109

110110
// Update database (PostgreSQL)
111-
db::evaluations::update_evaluation_datapoint(
111+
let trace_id = db::evaluations::update_evaluation_datapoint_and_get_trace_id(
112112
&db.pool,
113113
eval_id,
114114
datapoint_id,
@@ -124,6 +124,7 @@ pub async fn update_eval_datapoint(
124124
group_id,
125125
eval_id,
126126
datapoint_id,
127+
trace_id,
127128
scores_clone,
128129
)
129130
.await?;

app-server/src/api/v1/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,6 @@ pub mod datasets;
44
pub mod evals;
55
pub mod evaluators;
66
pub mod metrics;
7+
pub mod sql;
78
pub mod tag;
89
pub mod traces;

app-server/src/api/v1/sql.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use std::{collections::HashMap, sync::Arc};
2+
3+
use actix_web::{HttpResponse, post, web};
4+
use serde::Deserialize;
5+
6+
use crate::{
7+
db::project_api_keys::ProjectApiKey,
8+
query_engine::QueryEngine,
9+
sql::{self, ClickhouseReadonlyClient},
10+
};
11+
12+
use crate::routes::types::ResponseResult;
13+
14+
#[derive(Deserialize)]
15+
#[serde(rename_all = "camelCase")]
16+
pub struct SqlQueryRequest {
17+
pub query: String,
18+
}
19+
20+
#[post("sql/query")]
21+
pub async fn execute_sql_query(
22+
req: web::Json<SqlQueryRequest>,
23+
project_api_key: ProjectApiKey,
24+
clickhouse_ro: web::Data<Option<Arc<ClickhouseReadonlyClient>>>,
25+
query_engine: web::Data<Arc<QueryEngine>>,
26+
) -> ResponseResult {
27+
let project_id = project_api_key.project_id;
28+
let SqlQueryRequest { query } = req.into_inner();
29+
30+
match clickhouse_ro.as_ref() {
31+
Some(ro_client) => {
32+
match sql::execute_sql_query(
33+
query,
34+
project_id,
35+
HashMap::new(),
36+
ro_client.clone(),
37+
query_engine.into_inner().as_ref().clone(),
38+
)
39+
.await
40+
{
41+
Ok(result_json) => Ok(HttpResponse::Ok().json(result_json)),
42+
Err(e) => Err(e.into()),
43+
}
44+
}
45+
None => Err(anyhow::anyhow!("ClickHouse read-only client is not configured.").into()),
46+
}
47+
}

app-server/src/api/v1/traces.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub async fn process_traces(
2828
cache: web::Data<crate::cache::Cache>,
2929
spans_message_queue: web::Data<Arc<MessageQueue>>,
3030
db: web::Data<DB>,
31+
clickhouse: web::Data<clickhouse::Client>,
3132
) -> ResponseResult {
3233
let db = db.into_inner();
3334
let cache = cache.into_inner();
@@ -39,6 +40,7 @@ pub async fn process_traces(
3940
if is_feature_enabled(Feature::UsageLimit) {
4041
let limits_exceeded = get_workspace_limit_exceeded_by_project_id(
4142
db.clone(),
43+
clickhouse.into_inner().as_ref().clone(),
4244
cache.clone(),
4345
project_api_key.project_id,
4446
)

0 commit comments

Comments
 (0)