Skip to content

Commit

Permalink
Add --parallel to hnsm distance
Browse files Browse the repository at this point in the history
  • Loading branch information
wang-q committed Jan 7, 2025
1 parent eb3b17c commit ba91356
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

* Add `hnsm mask`
* Add `hnsm sixframe`
* Add --parallel to `hnsm distance`

* Improve help texts

Expand Down
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ flate2 = "1.0.35"
tera = "1.20.0"

crossbeam = "0.8.4"
rayon = "1.10.0"

intspan = "0.8.4"
noodles = { version = "0.86.0", features = ["core", "fasta", "fastq", "bgzf"] }
Expand Down
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ hnsm filter -a 400 tests/fasta/ufasta.fa |
hnsm split about -c 2000 tests/fasta/ufasta.fa -o tmp

cargo run --bin hnsm sixframe tests/fasta/trans.fa
cargo run --bin hnsm sixframe tests/fasta/trans.fa --len 3 --start --end

cargo run --bin hnsm sort

Expand Down Expand Up @@ -286,6 +287,23 @@ K1J4J6_9GAMM 0.372263 0.416058 0.496350 0.518248 0.569343 0.372263 0.3722
```

* proteomes

```shell
curl -L https://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/000/005/845/GCF_000005845.2_ASM584v2/GCF_000005845.2_ASM584v2_protein.faa.gz \
> tests/clust/mg1655.pro.fa.gz

curl -L https://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/000/008/865/GCF_000008865.2_ASM886v2/GCF_000008865.2_ASM886v2_protein.faa.gz \
> tests/clust/sakai.pro.fa.gz

curl -L https://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/000/006/765/GCF_000006765.1_ASM676v1/GCF_000006765.1_ASM676v1_protein.faa.gz \
> tests/clust/pao1.pro.fa.gz

hnsm distance tests/clust/mg1655.pro.fa.gz tests/clust/pao1.pro.fa.gz -k 7 -w 4 |
rgr filter stdin --ne 3:1

```

#### Matrix conversion

```shell
Expand Down
134 changes: 102 additions & 32 deletions src/cmd/distance.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use clap::*;
use hnsm::Minimizer;
use noodles_fasta as fasta;
use std::collections::{HashMap, HashSet};
use std::collections::BTreeSet;
use std::iter::FromIterator;

// Create clap subcommand arguments
Expand Down Expand Up @@ -43,13 +43,18 @@ Examples:
5. Use 4 threads for parallel processing:
hnsm distance input.fa --parallel 4
6. Compare two FA files:
hnsm distance file1.fa file2.fa
"###,
)
.arg(
Arg::new("infile")
Arg::new("infiles")
.required(true)
.num_args(1..=2)
.index(1)
.help("Input FA file to process"),
.required(true)
.help("Input filenames. [stdin] for standard input"),
)
.arg(
Arg::new("hasher")
Expand Down Expand Up @@ -104,14 +109,17 @@ Examples:
)
}

#[derive(Default, Clone)]
struct MinimizerEntry {
name: String,
set: BTreeSet<u64>,
}

// command implementation
pub fn execute(args: &ArgMatches) -> anyhow::Result<()> {
//----------------------------
// Args
//----------------------------
let reader = intspan::reader(args.get_one::<String>("infile").unwrap());
let mut fa_in = fasta::io::Reader::new(reader);

let opt_hasher = args.get_one::<String>("hasher").unwrap();
let opt_kmer = *args.get_one::<usize>("kmer").unwrap();
let opt_window = *args.get_one::<usize>("window").unwrap();
Expand All @@ -121,15 +129,93 @@ pub fn execute(args: &ArgMatches) -> anyhow::Result<()> {

let mut writer = intspan::writer(args.get_one::<String>("outfile").unwrap());

let infiles = args
.get_many::<String>("infiles")
.unwrap()
.map(|s| s.as_str())
.collect::<Vec<_>>();

//----------------------------
// Ops
//----------------------------
let mut set_of = HashMap::new();
let mut names = vec![]; // track original orders of names
let entries = load_file(infiles.get(0).unwrap(), opt_hasher, opt_kmer, opt_window);
let others = if infiles.len() == 2 {
load_file(infiles.get(1).unwrap(), opt_hasher, opt_kmer, opt_window)
} else {
entries.clone()
};

// Channel 1 - Entries
let (snd1, rcv1) = crossbeam::channel::bounded::<(&MinimizerEntry, &MinimizerEntry)>(10);
// Channel 2 - Results
let (snd2, rcv2) = crossbeam::channel::bounded::<String>(10);

crossbeam::scope(|s| {
//----------------------------
// Reader thread
//----------------------------
s.spawn(|_| {
for e1 in &entries {
for e2 in &others {
snd1.send((e1, e2)).unwrap();
}
}
// Close the channel - this is necessary to exit the for-loop in the worker
drop(snd1);
});

//----------------------------
// Worker threads
//----------------------------
for _ in 0..opt_parallel {
// Send to sink, receive from source
let (sendr, recvr) = (snd2.clone(), rcv1.clone());
// Spawn workers in separate threads
s.spawn(move |_| {
// Receive until channel closes
for (e1, e2) in recvr.iter() {
let (jaccard, containment, mash) = calc_distances(&e1.set, &e2.set);
let out_string = format!(
"{}\t{}\t{:.4}\t{:.4}\t{:.4}\n",
e1.name,
e2.name,
if is_sim { 1.0 - mash } else { mash },
jaccard,
containment
);
sendr.send(out_string).unwrap();
}
});
}
// Close the channel, otherwise sink will never exit the for-loop
drop(snd2);

//----------------------------
// Writer (main) thread
//----------------------------
for out_string in rcv2.iter() {
writer.write_all(out_string.as_ref()).unwrap();
}
})
.unwrap();

Ok(())
}

fn load_file(
infile: &str,
opt_hasher: &String,
opt_kmer: usize,
opt_window: usize,
) -> Vec<MinimizerEntry> {
let reader = intspan::reader(infile);
let mut fa_in = fasta::io::Reader::new(reader);

let mut entries = vec![];

for result in fa_in.records() {
// obtain record or fail with error
let record = result?;
let record = result.unwrap();

let name = String::from_utf8(record.name().into()).unwrap();
let seq = record.sequence();
Expand All @@ -150,33 +236,17 @@ pub fn execute(args: &ArgMatches) -> anyhow::Result<()> {
_ => unreachable!(),
};

names.push(name.clone());
let set: HashSet<u64> = HashSet::from_iter(minimizers.iter().map(|t| t.1));
set_of.insert(name, set);
}

for n1 in &names {
for n2 in &names {
let (jaccard, containment, mash) =
calc_distances(set_of.get(n1).unwrap(), set_of.get(n2).unwrap());

writer.write_fmt(format_args!(
"{}\t{}\t{:.4}\t{:.4}\t{:.4}\n",
n1,
n2,
if is_sim { 1.0 - mash } else { mash },
jaccard,
containment
))?;
}
let set: BTreeSet<u64> = BTreeSet::from_iter(minimizers.iter().map(|t| t.1));
let entry = MinimizerEntry { name, set };
entries.push(entry);
}

Ok(())
entries
}

// Calculate Jaccard, Containment, and Mash distance between two HashSets
fn calc_distances(s1: &HashSet<u64>, s2: &HashSet<u64>) -> (f64, f64, f64) {
let inter = s1.intersection(&s2).count();
// Calculate Jaccard, Containment, and Mash distance between two BTreeSet
fn calc_distances(s1: &BTreeSet<u64>, s2: &BTreeSet<u64>) -> (f64, f64, f64) {
let inter = s1.intersection(&s2).cloned().count();
let union = s1.len() + s2.len() - inter;

let jaccard = inter as f64 / union as f64;
Expand Down
7 changes: 3 additions & 4 deletions src/hnsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,17 @@ Subcommand groups:
* Fasta files
* info: size / count / masked / n50
* records: one / some / order / split
* transform: replace / rc / filter / dedup / mask
* transform: replace / rc / filter / dedup / mask / sixframe
* indexing: gz / range
* `hnsm gz` writes out the BGZF format and `hnsm range` reads it
* Fastq files
* interleave
* Clustering
* vectors: similarity
* DNA/protein: distance
* vectors: similarity
* convert
* cluster
* manifold
Expand Down Expand Up @@ -88,7 +89,6 @@ Subcommand groups:
Some(("filter", sub_matches)) => cmd::filter::execute(sub_matches),
Some(("dedup", sub_matches)) => cmd::dedup::execute(sub_matches),
Some(("mask", sub_matches)) => cmd::mask::execute(sub_matches),
//
Some(("sixframe", sub_matches)) => cmd::sixframe::execute(sub_matches),
// index
Some(("gz", sub_matches)) => cmd::gz::execute(sub_matches),
Expand All @@ -111,4 +111,3 @@ Subcommand groups:
}

// TODO: Remove fully contained sequences
// TODO: distance --parallel
Binary file added tests/clust/mg1655.pro.fa.gz
Binary file not shown.
Binary file added tests/clust/pao1.pro.fa.gz
Binary file not shown.
Binary file added tests/clust/sakai.pro.fa.gz
Binary file not shown.

0 comments on commit ba91356

Please sign in to comment.