Skip to content

Commit 2bf593e

Browse files
authored
Merge pull request #401 from lmnr-ai/dev
Backoff, cache, optimizations, options for browser requests
2 parents 4c85544 + 9f141ca commit 2bf593e

File tree

101 files changed

+6149
-2488
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

101 files changed

+6149
-2488
lines changed
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
name: Build & publish app-server binary
2+
on:
3+
release:
4+
types:
5+
- published
6+
7+
jobs:
8+
build-and-upload:
9+
name: Build and Upload
10+
# This is used for HF Spaces, where we use
11+
# debian:bookworm, that has GLIBC 2.36, but
12+
# ubuntu-latest has GLIBC 2.38,
13+
runs-on: ubuntu-22.04
14+
permissions:
15+
contents: write
16+
packages: write
17+
attestations: write
18+
id-token: write
19+
20+
steps:
21+
- uses: actions/checkout@v4
22+
23+
- name: Update Rust toolchain
24+
run: rustup update
25+
26+
- name: Install additional dependencies
27+
run: sudo apt-get update && sudo apt-get install -y protobuf-compiler libssl-dev
28+
29+
- name: Build Release Binary
30+
run: cargo build --release
31+
working-directory: ./app-server
32+
33+
- name: verify binary path
34+
run: |
35+
if [[ -f app-server/target/release/app-server ]]; then
36+
echo "Binary found at app-server/target/release/app-server"
37+
else
38+
echo "Binary not found at app-server/target/release/app-server"
39+
exit 1
40+
fi
41+
42+
- name: Get latest release
43+
id: get_latest_release
44+
uses: actions/github-script@v7
45+
with:
46+
script: |
47+
const response = await github.rest.repos.listReleases({
48+
owner: context.repo.owner,
49+
repo: context.repo.repo
50+
});
51+
if (response.data.length === 0) {
52+
core.setFailed('No releases found');
53+
return;
54+
}
55+
const latest_release = response.data.sort((a, b) => new Date(b.created_at) - new Date(a.created_at))[0];
56+
core.setOutput('upload_url', latest_release.upload_url);
57+
core.setOutput('release_id', latest_release.id);
58+
59+
- name: Delete existing assets
60+
uses: actions/github-script@v7
61+
with:
62+
script: |
63+
const release_id = '${{ steps.get_latest_release.outputs.release_id }}';
64+
const response = await github.rest.repos.listReleaseAssets({
65+
owner: context.repo.owner,
66+
repo: context.repo.repo,
67+
release_id: release_id
68+
});
69+
70+
for (const asset of response.data) {
71+
if (asset.name === 'app-server') {
72+
await github.rest.repos.deleteReleaseAsset({
73+
owner: context.repo.owner,
74+
repo: context.repo.repo,
75+
asset_id: asset.id
76+
});
77+
}
78+
}
79+
80+
- name: Upload to latest release
81+
uses: actions/upload-release-asset@v1
82+
env:
83+
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
84+
with:
85+
upload_url: ${{ steps.get_latest_release.outputs.upload_url }}
86+
asset_path: ./app-server/target/release/app-server
87+
asset_name: app-server
88+
asset_content_type: application/octet-stream

app-server/.env.example

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,6 @@ CODE_EXECUTOR_URL=http://localhost:8811
1515
# must be exactly 32 bytes (64 hex characters)
1616
AEAD_SECRET_KEY=0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef
1717
ENVIRONMENT=FULL
18+
19+
# Optional, if you want to use Redis for caching. Create a Redis/Valkey instance and set the URL here.
20+
# REDIS_URL=redis://localhost:6379

app-server/Cargo.lock

Lines changed: 64 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

app-server/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ aws-config = "1.5.10"
2424
aws-credential-types = "1.2.1"
2525
aws-sdk-bedrockruntime = "1.61.0"
2626
aws-sdk-s3 = "1.63.0"
27+
backoff = {version = "0.4.0", features = ["tokio"]}
2728
base64 = "0.22.1"
2829
bimap = "0.6.3"
2930
bytes = "1.7.1"
@@ -49,6 +50,7 @@ num_cpus = "1.16.0"
4950
prost = "0.13"
5051
rand = "0.8.5"
5152
rayon = "1.10"
53+
redis = {version = "0.28.2", features = ["tokio-comp"]}
5254
regex = "1.10.3"
5355
reqwest = {version = "0.12", default-features = false, features = ["rustls-tls", "json", "stream", "multipart"]}
5456
reqwest-eventsource = "0.6.0"

app-server/src/api/utils.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@ use std::sync::Arc;
33
use sqlx::PgPool;
44
use uuid::Uuid;
55

6-
use crate::db::pipelines::PipelineVersion;
76
use crate::pipeline::utils::get_target_pipeline_version_cache_key;
87
use crate::routes::api_keys::hash_api_key;
98
use crate::routes::error;
109
use crate::{
11-
cache::Cache,
12-
db::{self, DB},
10+
cache::{keys::PROJECT_API_KEY_CACHE_KEY, Cache, CacheTrait},
11+
db::{self, pipelines::PipelineVersion, project_api_keys::ProjectApiKey, DB},
1312
};
1413

1514
pub async fn query_target_pipeline_version(
@@ -32,7 +31,7 @@ pub async fn query_target_pipeline_version(
3231
.await?;
3332
if let Some(pipeline_version) = &pipeline_version {
3433
let _ = cache
35-
.insert::<PipelineVersion>(cache_key, pipeline_version)
34+
.insert::<PipelineVersion>(&cache_key, pipeline_version.clone())
3635
.await;
3736
}
3837
Ok(pipeline_version)
@@ -44,17 +43,18 @@ pub async fn get_api_key_from_raw_value(
4443
pool: &PgPool,
4544
cache: Arc<Cache>,
4645
raw_api_key: String,
47-
) -> anyhow::Result<db::project_api_keys::ProjectApiKey> {
46+
) -> anyhow::Result<ProjectApiKey> {
4847
let api_key_hash = hash_api_key(&raw_api_key);
48+
let cache_key = format!("{PROJECT_API_KEY_CACHE_KEY}:{api_key_hash}");
4949
let cache_res = cache
50-
.get::<db::project_api_keys::ProjectApiKey>(&api_key_hash)
50+
.get::<db::project_api_keys::ProjectApiKey>(&cache_key)
5151
.await;
5252
match cache_res {
5353
Ok(Some(api_key)) => Ok(api_key),
5454
Ok(None) | Err(_) => {
5555
let api_key = db::project_api_keys::get_api_key(pool, &api_key_hash).await?;
5656
let _ = cache
57-
.insert::<db::project_api_keys::ProjectApiKey>(api_key_hash, &api_key)
57+
.insert::<db::project_api_keys::ProjectApiKey>(&cache_key, api_key.clone())
5858
.await;
5959

6060
Ok(api_key)
Lines changed: 41 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,86 +1,75 @@
1+
use std::sync::Arc;
2+
13
use actix_web::{options, post, web, HttpResponse};
24
use serde::{Deserialize, Serialize};
35
use uuid::Uuid;
46

5-
use crate::{db::project_api_keys::ProjectApiKey, routes::types::ResponseResult};
7+
use crate::{db::project_api_keys::ProjectApiKey, mq, routes::types::ResponseResult};
8+
9+
pub const BROWSER_SESSIONS_QUEUE: &str = "browser_sessions_queue";
10+
pub const BROWSER_SESSIONS_EXCHANGE: &str = "browser_sessions_exchange";
11+
pub const BROWSER_SESSIONS_ROUTING_KEY: &str = "browser_sessions_routing_key";
612

7-
#[derive(Debug, Serialize, Deserialize)]
8-
struct RRWebEvent {
13+
#[derive(Debug, Serialize, Deserialize, Clone)]
14+
pub struct RRWebEvent {
915
#[serde(rename = "type")]
10-
event_type: i32,
11-
timestamp: i64,
12-
data: serde_json::Value,
16+
pub event_type: i32,
17+
pub timestamp: i64,
18+
pub data: serde_json::Value,
1319
}
1420

15-
#[derive(Debug, Serialize, Deserialize)]
21+
#[derive(Debug, Serialize, Deserialize, Clone)]
1622
#[serde(rename_all = "camelCase")]
17-
struct EventBatch {
18-
events: Vec<RRWebEvent>,
19-
session_id: Uuid,
20-
trace_id: Uuid,
23+
pub struct EventBatch {
24+
pub events: Vec<RRWebEvent>,
25+
pub session_id: Uuid,
26+
pub trace_id: Uuid,
27+
}
28+
29+
#[derive(Debug, Serialize, Deserialize, Clone)]
30+
pub struct QueueBrowserEventMessage {
31+
pub batch: EventBatch,
32+
pub project_id: Uuid,
2133
}
2234

2335
#[options("events")]
2436
async fn options_handler() -> ResponseResult {
25-
// TODO: use cors middleware from actix_cors crate
2637
Ok(HttpResponse::Ok()
2738
.insert_header(("Access-Control-Allow-Origin", "*"))
2839
.insert_header(("Access-Control-Allow-Methods", "POST, OPTIONS"))
2940
.insert_header((
3041
"Access-Control-Allow-Headers",
31-
"Authorization, Content-Type",
42+
"Authorization, Content-Type, Content-Encoding, Accept",
3243
))
3344
.insert_header(("Access-Control-Max-Age", "86400"))
3445
.finish())
3546
}
3647

3748
#[post("events")]
3849
async fn create_session_event(
39-
clickhouse: web::Data<clickhouse::Client>,
4050
batch: web::Json<EventBatch>,
4151
project_api_key: ProjectApiKey,
52+
queue: web::Data<Arc<dyn mq::MessageQueue<QueueBrowserEventMessage>>>,
4253
) -> ResponseResult {
43-
// Skip if there are no events
44-
if batch.events.is_empty() {
45-
return Ok(HttpResponse::Ok().finish());
46-
}
54+
let filtered_batch = batch.into_inner();
4755

48-
// Prepare batch data
49-
let mut query = String::from(
50-
"
51-
INSERT INTO browser_session_events (
52-
event_id, session_id, trace_id, timestamp,
53-
event_type, data, project_id
54-
)
55-
VALUES ",
56-
);
57-
58-
let mut values = Vec::new();
59-
60-
for (i, event) in batch.events.iter().enumerate() {
61-
if i > 0 {
62-
query.push_str(", ");
63-
}
64-
query.push_str("(?, ?, ?, ?, ?, ?, ?)");
65-
66-
// Add each value individually
67-
values.extend_from_slice(&[
68-
Uuid::new_v4().to_string(),
69-
batch.session_id.to_string(),
70-
batch.trace_id.to_string(),
71-
event.timestamp.to_string(),
72-
event.event_type.to_string(),
73-
event.data.to_string(),
74-
project_api_key.project_id.to_string(),
75-
]);
56+
// Return 400 Bad Request if trace_id is null (00000000-0000-0000-0000-000000000000)
57+
if filtered_batch.trace_id == Uuid::nil() {
58+
return Ok(HttpResponse::BadRequest().json(serde_json::json!({
59+
"error": "Invalid trace_id: must not be null (00000000-0000-0000-0000-000000000000)"
60+
})));
7661
}
7762

78-
// Execute batch insert with individual bindings
79-
let mut query_with_bindings = clickhouse.query(&query);
80-
for value in values {
81-
query_with_bindings = query_with_bindings.bind(value);
82-
}
83-
query_with_bindings.execute().await?;
63+
queue
64+
.publish(
65+
&QueueBrowserEventMessage {
66+
batch: filtered_batch,
67+
project_id: project_api_key.project_id,
68+
},
69+
BROWSER_SESSIONS_EXCHANGE,
70+
BROWSER_SESSIONS_ROUTING_KEY,
71+
)
72+
.await?;
8473

8574
Ok(HttpResponse::Ok().finish())
8675
}

0 commit comments

Comments
 (0)