Skip to content

Commit bb72b9d

Browse files
Merge remote-tracking branch 'dan-da/danda/prover_job_queue_pr'
Co-authored-by: Alan Szepieniec <alan@neptune.cash>
2 parents ff2ca0f + 85ebb16 commit bb72b9d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1055
-550
lines changed

Cargo.lock

Lines changed: 36 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ readonly = "0.2.12"
7373
thiserror = "1.0.65"
7474
systemstat = "0.2.3"
7575
sysinfo = "0.31.4"
76+
async-priority-channel = "0.2.0"
7677

7778
[dev-dependencies]
7879
blake3 = "1.5.4"

benchmarks/neptune_transaction_hash_removal_record_index_sets_2.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
"benchmark_result": {
1616
"clock_cycle_count": 13237,
1717
"hash_table_height": 5056,
18-
"u32_table_height": 1186,
18+
"u32_table_height": 826,
1919
"op_stack_table_height": 9807,
2020
"ram_table_height": 7605
2121
},

src/job_queue/mod.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
//! This module implements a prioritized, heterogenous job queue that sends
2+
//! completed job results to the initiator/caller.
3+
//!
4+
//! This is intended for running heavy multi-threaded jobs that should be run
5+
//! one at a time to avoid resource contention. By using this queue, multiple
6+
//! (async) tasks can initiate these tasks and wait for results without need
7+
//! of any other synchronization.
8+
//!
9+
//! note: Other rust job queues I found either did not support waiting for job
10+
//! results or else were overly complicated, requiring backend database, etc.
11+
//!
12+
//! Both blocking and non-blocking (async) jobs are supported. Non-blocking jobs
13+
//! are called inside spawn_blocking() in order to execute on tokio's blocking
14+
//! thread-pool. Async jobs are simply awaited.
15+
//!
16+
//! An async_priority_channel::unbounded is used for queueing the jobs.
17+
//! This is much like tokio::sync::mpsc::unbounded except:
18+
//! 1. it supports prioritizing channel events (jobs)
19+
//! 2. order of events with same priority is undefined.
20+
//! see: https://github.com/rmcgibbo/async-priority-channel/issues/75
21+
//!
22+
//! Using an unbounded channel means that there is no backpressure and no
23+
//! upper limit on the number of jobs. (except RAM).
24+
//!
25+
//! A nice feature is that jobs may be of mixed (heterogenous) types
26+
//! in a single JobQueue instance. Any type that implements the Job trait
27+
//! may be a job.
28+
29+
mod queue;
30+
pub mod traits;
31+
pub mod triton_vm;
32+
33+
pub use queue::JobQueue;

src/job_queue/queue.rs

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
use async_priority_channel as mpsc;
2+
use tokio::sync::oneshot;
3+
4+
use super::traits::Job;
5+
use super::traits::JobResult;
6+
7+
// todo: fix it so that jobs with same priority execute FIFO.
8+
/// implements a prioritized job queue that sends result of each job to a listener.
9+
/// At present order of jobs with the same priority is undefined.
10+
type JobResultOneShotChannel = oneshot::Sender<Box<dyn JobResult>>;
11+
12+
pub struct JobQueue<P: Ord> {
13+
tx: mpsc::Sender<(Box<dyn Job>, JobResultOneShotChannel), P>,
14+
}
15+
16+
impl<P: Ord> std::fmt::Debug for JobQueue<P> {
17+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18+
f.debug_struct("JobQueue")
19+
.field("tx", &"mpsc::Sender")
20+
.finish()
21+
}
22+
}
23+
24+
impl<P: Ord> Clone for JobQueue<P> {
25+
fn clone(&self) -> Self {
26+
Self {
27+
tx: self.tx.clone(),
28+
}
29+
}
30+
}
31+
32+
impl<P: Ord + Send + Sync + 'static> JobQueue<P> {
33+
// creates job queue and starts it processing. returns immediately.
34+
pub fn start() -> Self {
35+
let (tx, rx) = mpsc::unbounded::<(Box<dyn Job>, JobResultOneShotChannel), P>();
36+
37+
// spawns background task that processes job-queue and runs jobs.
38+
tokio::spawn(async move {
39+
while let Ok(r) = rx.recv().await {
40+
let (job, otx) = r.0;
41+
42+
let result = match job.is_async() {
43+
true => job.run_async().await,
44+
false => tokio::task::spawn_blocking(move || job.run())
45+
.await
46+
.unwrap(),
47+
};
48+
let _ = otx.send(result);
49+
}
50+
});
51+
52+
Self { tx }
53+
}
54+
55+
// alias of Self::start().
56+
// here for two reasons:
57+
// 1. backwards compat with existing tests
58+
// 2. if tests call dummy() instead of start(), then it is easier
59+
// to find where start() is called for real.
60+
#[cfg(test)]
61+
pub fn dummy() -> Self {
62+
Self::start()
63+
}
64+
65+
// adds job to job-queue and returns immediately.
66+
pub async fn add_job(
67+
&self,
68+
job: Box<dyn Job>,
69+
priority: P,
70+
) -> anyhow::Result<oneshot::Receiver<Box<dyn JobResult>>> {
71+
let (otx, orx) = oneshot::channel();
72+
self.tx.send((job, otx), priority).await?;
73+
Ok(orx)
74+
}
75+
76+
// adds job to job-queue, waits for job completion, and returns job result.
77+
pub async fn add_and_await_job(
78+
&self,
79+
job: Box<dyn Job>,
80+
priority: P,
81+
) -> anyhow::Result<Box<dyn JobResult>> {
82+
let (otx, orx) = oneshot::channel();
83+
self.tx.send((job, otx), priority).await?;
84+
Ok(orx.await?)
85+
}
86+
87+
#[cfg(test)]
88+
pub async fn wait_until_queue_empty(&self) {
89+
loop {
90+
if self.tx.is_empty() {
91+
break;
92+
}
93+
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
94+
}
95+
}
96+
}
97+
98+
#[cfg(test)]
99+
mod tests {
100+
use std::any::Any;
101+
102+
use super::*;
103+
104+
#[derive(PartialEq, Debug)]
105+
struct DoubleJobResult(u64, u64);
106+
impl JobResult for DoubleJobResult {
107+
fn as_any(&self) -> &dyn Any {
108+
self
109+
}
110+
}
111+
112+
// a job that doubles the input value. implements Job.
113+
struct DoubleJob {
114+
data: u64,
115+
}
116+
117+
impl Job for DoubleJob {
118+
fn is_async(&self) -> bool {
119+
false
120+
}
121+
122+
fn run(&self) -> Box<dyn JobResult> {
123+
let r = DoubleJobResult(self.data, self.data * 2);
124+
125+
println!("{} * 2 = {}", r.0, r.1);
126+
Box::new(r)
127+
}
128+
}
129+
130+
// todo: make test(s) for async jobs.
131+
132+
/// todo: this should verify the priority order of jobs.
133+
/// presently each job just prints result and
134+
/// human can manually verify output.
135+
#[tokio::test]
136+
async fn run_jobs_by_priority() -> anyhow::Result<()> {
137+
// create a job queue
138+
let job_queue = JobQueue::<u8>::start();
139+
140+
// create 10 jobs
141+
for i in 0..10 {
142+
let job1 = Box::new(DoubleJob { data: i });
143+
let job2 = Box::new(DoubleJob { data: i * 100 });
144+
let job3 = Box::new(DoubleJob { data: i * 1000 });
145+
146+
// process job and print results.
147+
job_queue.add_job(job1, 1).await?;
148+
job_queue.add_job(job2, 2).await?;
149+
job_queue.add_job(job3, 3).await?;
150+
}
151+
152+
job_queue.wait_until_queue_empty().await;
153+
154+
Ok(())
155+
}
156+
157+
#[tokio::test]
158+
async fn get_result() -> anyhow::Result<()> {
159+
// create a job queue
160+
let job_queue = JobQueue::<u8>::start();
161+
162+
// create 10 jobs
163+
for i in 0..10 {
164+
let job = Box::new(DoubleJob { data: i });
165+
166+
let result = job_queue.add_and_await_job(job, 1).await?;
167+
assert_eq!(
168+
Some(&DoubleJobResult(i, i * 2)),
169+
result.as_any().downcast_ref::<DoubleJobResult>()
170+
);
171+
}
172+
173+
Ok(())
174+
}
175+
}

src/job_queue/traits.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
use std::any::Any;
2+
3+
pub trait JobResult: Any + Send + Sync + std::fmt::Debug {
4+
fn as_any(&self) -> &dyn Any;
5+
}
6+
7+
// represents any kind of job
8+
#[async_trait::async_trait]
9+
pub trait Job: Send + Sync {
10+
fn is_async(&self) -> bool;
11+
12+
// note: we provide unimplemented default methods for
13+
// run and run_async. This is so that implementing types
14+
// only need to impl the appropriate method.
15+
16+
fn run(&self) -> Box<dyn JobResult> {
17+
unimplemented!()
18+
}
19+
20+
// fn run_async(&self) -> std::future::Future<Output = Box<dyn JobResult>> + Send;
21+
async fn run_async(&self) -> Box<dyn JobResult> {
22+
unimplemented!()
23+
}
24+
}

src/job_queue/triton_vm/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
mod triton_vm_job_queue;
2+
3+
pub use triton_vm_job_queue::TritonVmJobPriority;
4+
pub use triton_vm_job_queue::TritonVmJobQueue;
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
use super::super::JobQueue;
2+
3+
// todo: maybe we want to have more levels or just make it an integer eg u8.
4+
// or maybe name the levels by type/usage of job/proof.
5+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
6+
pub enum TritonVmJobPriority {
7+
Lowest = 1,
8+
Low = 2,
9+
#[default]
10+
Normal = 3,
11+
High = 4,
12+
Highest = 5,
13+
}
14+
15+
/// provides type safety and clarity in case we implement multiple job queues.
16+
pub type TritonVmJobQueue = JobQueue<TritonVmJobPriority>;

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
pub mod config_models;
1212
pub mod connect_to_peers;
1313
pub mod database;
14+
pub mod job_queue;
1415
pub mod locks;
1516
pub mod macros;
1617
pub mod main_loop;

0 commit comments

Comments
 (0)