Skip to content

Commit 93980e9

Browse files
committed
use rayon in sort subcli
1 parent 9134937 commit 93980e9

File tree

9 files changed

+669
-632
lines changed

9 files changed

+669
-632
lines changed

Cargo.lock

Lines changed: 628 additions & 606 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ name = "fqkit"
55
version = "0.4.11"
66
edition = "2021"
77
authors = ["sharkLoc <mmtinfo@163.com>"]
8-
rust-version = "1.77.2"
8+
rust-version = "1.83.0"
99
homepage = "https://github.com/sharkLoc/fqkit"
1010
repository = "https://github.com/sharkLoc/fqkit"
1111
categories = ["science"]
@@ -36,9 +36,11 @@ flate2 = "1.0.24"
3636
log = "0.4.20"
3737
lowcharts = "0.5.8"
3838
nthash = "0.5.1"
39+
num_cpus = "1.16.0"
3940
plotters = "0.3.4"
4041
rand = "0.8.5"
4142
rand_pcg = "0.3.1"
43+
rayon = "1.10.0"
4244
regex = "1.9.5"
4345
rgb = "0.8.36"
4446
term_size = "0.3.2"

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ Commands:
9797
help Print this message or the help of the given subcommand(s)
9898

9999
Global Arguments:
100+
-@, --threads <INT> threads number [default: 1]
100101
--compress-level <INT> set gzip/bzip2/xz compression level 1 (compress faster) - 9 (compress better) for gzip/bzip2/xz output file, just work with option -o/--out [default: 6]
101102
--output-type <u|g|b|x> output type for stdout: 'g' gzip; 'b' bzip2; 'x' xz; 'u' uncompressed txt format [default: u
102103
--log <FILE> if file name specified, write log message to this file, or write to stderr

src/cli/filter.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{error::FqkitError, utils::*};
22
use anyhow::{Ok, Result};
33
use bio::io::fastq;
4-
use crossbeam::channel::unbounded;
4+
use crossbeam::channel::bounded;
55
use log::*;
66

77
#[allow(clippy::too_many_arguments)]
@@ -112,7 +112,7 @@ pub fn filter_fastq(
112112
chunk = 5000;
113113
}
114114

115-
let (tx, rx) = unbounded();
115+
let (tx, rx) = bounded(5000);
116116
let mut fq_iter1 = fq_reader1.records();
117117
let mut fq_iter2 = fq_reader2.records();
118118
loop {
@@ -130,7 +130,7 @@ pub fn filter_fastq(
130130
drop(tx);
131131

132132
crossbeam::scope(|s| {
133-
let (tx2, rx2) = unbounded();
133+
let (tx2, rx2) = bounded(5000);
134134
let _handles: Vec<_> = (0..ncpu)
135135
.map(|_| {
136136
let tx_tmp = tx2.clone();

src/cli/size.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use crate::utils::*;
22
use anyhow::Result;
33
use bio::io::fastq;
4-
use crossbeam::channel::unbounded;
4+
//use crossbeam::channel::unbounded;
5+
use crossbeam::channel::bounded;
56
use log::*;
67

78
#[derive(Clone, Copy)]
@@ -79,7 +80,7 @@ pub fn size_fastq(
7980
}
8081
bases = base.a + base.t + base.g + base.c + base.n;
8182
} else {
82-
let (tx, rx) = unbounded();
83+
let (tx, rx) = bounded(5000);//unbounded();
8384
let mut fqiter = fq_reader.records();
8485
loop {
8586
let chunk: Vec<_> = fqiter.by_ref().take(chunk).map_while(Result::ok).collect();
@@ -91,7 +92,7 @@ pub fn size_fastq(
9192
drop(tx);
9293

9394
crossbeam::scope(|s| {
94-
let (tx2, rx2) = unbounded();
95+
let (tx2, rx2) = bounded(5000); //unbounded();
9596
let _handles: Vec<_> = (0..ncpu)
9697
.map(|_| {
9798
let rx_tmp = rx.clone();

src/cli/sort.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::utils::*;
22
use anyhow::{Error, Ok};
33
use bio::io::fastq;
44
use log::*;
5+
use rayon::prelude::*;
56

67
#[allow(clippy::too_many_arguments)]
78
pub fn sort_fastq(
@@ -13,7 +14,7 @@ pub fn sort_fastq(
1314
reverse: bool,
1415
out: Option<&String>,
1516
compression_level: u32,
16-
stdout_type: char,
17+
stdout_type: char
1718
) -> Result<(), Error> {
1819

1920
let mut n = 0;
@@ -57,28 +58,28 @@ pub fn sort_fastq(
5758
if sort_by_name {
5859
info!("sort read by name");
5960
if reverse {
60-
vec_reads.sort_by(|a, b| {
61+
vec_reads.par_sort_by(|a, b| {
6162
let read_name1 = if let Some(des) = a.desc() {
6263
format!("{} {}", a.id(), des)
6364
} else {
6465
a.id().to_string()
6566
};
6667
let read_name2 = if let Some(des) = b.desc() {
67-
format!("{} {}", a.id(), des)
68+
format!("{} {}", b.id(), des)
6869
} else {
6970
b.id().to_string()
7071
};
7172
read_name2.cmp(&read_name1)
7273
});
7374
} else {
74-
vec_reads.sort_by(|a, b| {
75+
vec_reads.par_sort_by(|a, b| {
7576
let read_name1 = if let Some(des) = a.desc() {
7677
format!("{} {}", a.id(), des)
7778
} else {
7879
a.id().to_string()
7980
};
8081
let read_name2 = if let Some(des) = b.desc() {
81-
format!("{} {}", a.id(), des)
82+
format!("{} {}", b.id(), des)
8283
} else {
8384
b.id().to_string()
8485
};
@@ -88,23 +89,23 @@ pub fn sort_fastq(
8889
} else if sort_by_seq {
8990
info!("sort read by sequence");
9091
if reverse {
91-
vec_reads.sort_by(|a, b| b.seq().cmp(a.seq()));
92+
vec_reads.par_sort_by(|a, b| b.seq().cmp(a.seq()));
9293
} else {
93-
vec_reads.sort_by(|a, b| a.seq().cmp(b.seq()));
94+
vec_reads.par_sort_by(|a, b| a.seq().cmp(b.seq()));
9495
}
9596
} else if sort_by_length {
9697
info!("sort read by length");
9798
if reverse {
9899
//vec_reads.sort_by(|a, b| b.seq().len().cmp(&a.seq().len()));
99-
vec_reads.sort_by_key(|b| std::cmp::Reverse(b.seq().len()))
100+
vec_reads.par_sort_by_key(|b| std::cmp::Reverse(b.seq().len()))
100101
} else {
101102
//vec_reads.sort_by(|a, b| a.seq().len().cmp(&b.seq().len()));
102-
vec_reads.sort_by_key(|a| a.seq().len())
103+
vec_reads.par_sort_by_key(|a| a.seq().len())
103104
}
104105
} else if sort_by_gc {
105106
info!("sort read by gc content");
106107
if reverse {
107-
vec_reads.sort_by(|a, b| {
108+
vec_reads.par_sort_by(|a, b| {
108109
let r1_gc = a
109110
.seq()
110111
.iter()
@@ -120,7 +121,7 @@ pub fn sort_fastq(
120121
r2_gc.partial_cmp(&r1_gc).unwrap()
121122
});
122123
} else {
123-
vec_reads.sort_by(|a, b| {
124+
vec_reads.par_sort_by(|a, b| {
124125
let r1_gc = a
125126
.seq()
126127
.iter()

src/command.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,13 @@ pub struct Args {
3333
#[clap(subcommand)]
3434
pub command: Subcli,
3535

36+
/// threads number
37+
#[arg(short = '@', long = "threads", default_value_t = 1, global = true, value_name = "INT", help_heading = Some("Global Arguments"))]
38+
pub threads: usize,
39+
3640
/// set gzip/bzip2/xz compression level 1 (compress faster) - 9 (compress better) for gzip/bzip2/xz output file, just work with option -o/--out
3741
#[arg(long = "compress-level", default_value_t = 6, global = true,
38-
value_parser = value_parser!(u32).range(1..=9), value_name = "INT",
39-
help_heading = Some("Global Arguments")
42+
value_parser = value_parser!(u32).range(1..=9), value_name = "INT", help_heading = Some("Global Arguments")
4043
)]
4144
pub compression_level: u32,
4245

src/error.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,24 @@ use thiserror::Error;
33

44
#[derive(Debug, Error)]
55
pub enum FqkitError {
6-
#[error("stdin not detected")]
6+
#[error("Stdin not detected")]
77
StdinNotDetected,
88

9-
#[error("failed to open file: {0}")]
9+
#[error("Failed to open file: {0}")]
1010
IoError(#[from] io::Error),
1111

12-
#[error("invalid output dir: {0}")]
12+
#[error("Invalid output dir: {0}")]
1313
InvalidOutputDir(String),
1414

15-
#[error("empty file: {0}")]
15+
#[error("ThreadPoolBuildError error")]
16+
ThreadPoolBuildError(#[from] rayon::ThreadPoolBuildError),
17+
18+
#[error("Empty file: {0}")]
1619
EmptyFile(String),
1720

18-
#[error("invalid phred value")]
21+
#[error("Invalid phred value")]
1922
InvalidPhredValue,
2023

21-
#[error("invalid figure types")]
24+
#[error("Invalid figure types")]
2225
InvalidFigureType,
2326
}

src/main.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ fn main() -> Result<(), Error> {
2323
logger(arg.verbose, arg.logfile, arg.quiet)?;
2424
let start = Instant::now();
2525
info!("version: {}", VERSION);
26+
27+
let cpus = num_cpus::get();
28+
info!("cpu numbers: {}", cpus);
29+
rayon::ThreadPoolBuilder::new().num_threads(arg.threads).build_global()?;
2630

2731
match arg.command {
2832
Subcli::topn { input, num, out } => {

0 commit comments

Comments
 (0)