Skip to content

Thread scaling improvements #123

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 44 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
49a6a48
added working version of load balancing for Rail in EMR, note hardcod…
ChristopherWilks Feb 19, 2016
e0959b8
applying the latest bug fix from the fix_parallel_build_support_for_t…
ChristopherWilks Feb 19, 2016
a416046
added rlb to VERSION
ChristopherWilks Feb 19, 2016
9cd4c99
changed to using ps to get pids, seeing an abort at the very end, aft…
ChristopherWilks Feb 20, 2016
ef0159a
merged the paralell fix branch code
ChristopherWilks Mar 1, 2016
975cef4
added TBB support for the load balancing code, fixed ending segfault …
ChristopherWilks Mar 2, 2016
b0f8fdd
re-enabled atomic support for thread_count when WITH_TBB=1, cleaned u…
ChristopherWilks Mar 3, 2016
04855cf
merged master into rail load balancing
BenLangmead Jun 17, 2016
84fdb20
fixing some errors in the merge
BenLangmead Jun 17, 2016
9f195e7
fixing more errors in merging; switched from stat to kill to be mac c…
BenLangmead Jun 17, 2016
d3ac765
several formatting improvements and bug fixes
BenLangmead Jun 18, 2016
4a22f90
removing unneeded includes and replacing nthreads > 0 conditionals
BenLangmead Jun 18, 2016
60e1f46
addressing warnings and error in WITH_TBB=1 mode
BenLangmead Jun 18, 2016
c13d06a
small cleanups
BenLangmead Jun 22, 2016
988b355
added --thread-piddir and got rid of hard coded directory for thread …
BenLangmead Jun 22, 2016
8a1afe0
first commit of tbb::threads + 10ms delay
ChristopherWilks Mar 17, 2017
c1ca69d
output buffer with user defined size in reads
ChristopherWilks Mar 24, 2017
c10f3ab
mem page pool used by AlignmentCache now allocated in one big chunk
ChristopherWilks Mar 24, 2017
882ce84
added master travis config
ChristopherWilks Mar 24, 2017
b2216a9
commenting out readline and gzip compression to allow for easy portab…
ChristopherWilks Apr 6, 2017
1e1c8d6
moved ThreadSafe to outside the loop
May 3, 2017
b6cb7b0
ported stack mutex; removed readsPerBatchOutput option; fixed nthread…
ChristopherWilks May 3, 2017
dfeaf84
fixed thread timing no-output race condition
ChristopherWilks May 5, 2017
d4debcd
merge master into rail_load_balancing
BenLangmead May 9, 2017
1450dce
merge 2.3.2 commits into rail_load_balancing
BenLangmead May 9, 2017
ff9a750
stability and error checking enhancements for thread stealing
BenLangmead May 26, 2017
d53e053
merging in master
BenLangmead Jul 18, 2017
017bbb8
another master merge
BenLangmead Jul 18, 2017
6d01f37
merging
BenLangmead Jul 18, 2017
6711a45
fix busy-waiting on worker threads
BenLangmead Jul 25, 2017
71a78f5
remove debug message
BenLangmead Jul 25, 2017
c645a4a
put more params in PatternSourceParams
BenLangmead Jul 26, 2017
fac8f9d
update gitignore
BenLangmead Jul 26, 2017
668e95f
Merge branch 'rail_load_balancing' into batch_parsing_output
ch4rr0 Aug 25, 2017
1ac8753
Revert "Merge branch 'rail_load_balancing' into batch_parsing_output"
ch4rr0 Aug 25, 2017
068a2eb
let input buffer be configurable size
BenLangmead Oct 27, 2017
dd5e225
simplify some parameter passing
BenLangmead Oct 28, 2017
2e89e75
--buffer-size now controls both input and output buffer
BenLangmead Oct 30, 2017
aa05a02
fix inability to write to file
BenLangmead Oct 30, 2017
4756bb4
set default --buffer-size to 64
BenLangmead Oct 30, 2017
a254b7d
some normalizing with blocked_input
BenLangmead Oct 31, 2017
a6ab063
fix for thread id structure going out of scope
ch4rr0 Dec 3, 2017
0c9595b
update gitignore
BenLangmead Dec 11, 2017
f424c15
Add missing headers
ch4rr0 Aug 3, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@ bowtie2-align-l-debug
bowtie2-inspect-s-debug
bowtie2-inspect-l-debug
Xcode
reads_*.fq
reads_*.sam
.DS_Store
.tmp.simple_tests.*.pl
23 changes: 19 additions & 4 deletions aligner_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,14 @@ bool SAVal::repOk(const AlignmentCache& ac) const {
* Add a new association between a read sequnce ('seq') and a
* reference sequence ('')
*/
bool AlignmentCache::addOnTheFly(
bool AlignmentCache::addOnTheFlyImpl(
QVal& qv, // qval that points to the range of reference substrings
const SAKey& sak, // the key holding the reference substring
TIndexOffU topf, // top range elt in BWT index
TIndexOffU botf, // bottom range elt in BWT index
TIndexOffU topb, // top range elt in BWT' index
TIndexOffU botb, // bottom range elt in BWT' index
bool getLock)
TIndexOffU botb) // bottom range elt in BWT' index
{
ThreadSafe ts(lockPtr(), shared_ && getLock);
bool added = true;
// If this is the first reference sequence we're associating with
// the query sequence, initialize the QVal.
Expand Down Expand Up @@ -104,6 +102,23 @@ bool AlignmentCache::addOnTheFly(
return true;
}

bool AlignmentCache::addOnTheFly(
QVal& qv, // qval that points to the range of reference substrings
const SAKey& sak, // the key holding the reference substring
TIndexOffU topf, // top range elt in BWT index
TIndexOffU botf, // bottom range elt in BWT index
TIndexOffU topb, // top range elt in BWT' index
TIndexOffU botb, // bottom range elt in BWT' index
bool getLock)
{
if(shared_ && getLock) {
ThreadSafe ts(mutex_m);
return addOnTheFlyImpl(qv, sak, topf, botf, topb, botb);
} else {
return addOnTheFlyImpl(qv, sak, topf, botf, topb, botb);
}
}

#ifdef ALIGNER_CACHE_MAIN

#include <iostream>
Expand Down
141 changes: 90 additions & 51 deletions aligner_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,8 @@ class AlignmentCache {
samap_(CACHE_PAGE_SZ, CA_CAT),
salist_(CA_CAT),
shared_(shared),
mutex_m(),
version_(0)
{
}
mutex_m(),
version_(0) { }

/**
* Given a QVal, populate the given EList of SATuples with records
Expand All @@ -462,37 +460,11 @@ class AlignmentCache {
size_t& nelt,
bool getLock = true)
{
ThreadSafe ts(lockPtr(), shared_ && getLock);
assert(qv.repOk(*this));
const size_t refi = qv.offset();
const size_t reff = refi + qv.numRanges();
// For each reference sequence sufficiently similar to the
// query sequence in the QKey...
for(size_t i = refi; i < reff; i++) {
// Get corresponding SAKey, containing similar reference
// sequence & length
SAKey sak = qlist_.get(i);
// Shouldn't have identical keys in qlist_
assert(i == refi || qlist_.get(i) != qlist_.get(i-1));
// Get corresponding SANode
SANode *n = samap_.lookup(sak);
assert(n != NULL);
const SAVal& sav = n->payload;
assert(sav.repOk(*this));
if(sav.len > 0) {
nrange++;
satups.expand();
satups.back().init(sak, sav.topf, sav.topb, TSlice(salist_, sav.i, sav.len));
nelt += sav.len;
#ifndef NDEBUG
// Shouldn't add consecutive identical entries too satups
if(i > refi) {
const SATuple b1 = satups.back();
const SATuple b2 = satups[satups.size()-2];
assert(b1.key != b2.key || b1.topf != b2.topf || b1.offs != b2.offs);
}
#endif
}
if(shared_ && getLock) {
ThreadSafe ts(mutex_m);
queryQvalImpl(qv, satups, nrange, nelt);
} else {
queryQvalImpl(qv, satups, nrange, nelt);
}
}

Expand Down Expand Up @@ -522,12 +494,14 @@ class AlignmentCache {
bool *added,
bool getLock = true)
{
ThreadSafe ts(lockPtr(), shared_ && getLock);
assert(qk.cacheable());
QNode *n = qmap_.add(pool(), qk, added);
return (n != NULL ? &n->payload : NULL);
if(shared_ && getLock) {
ThreadSafe ts(mutex_m);
return addImpl(qk, added);
} else {
return addImpl(qk, added);
}
}

/**
* Add a new association between a read sequnce ('seq') and a
* reference sequence ('')
Expand All @@ -546,8 +520,8 @@ class AlignmentCache {
* ranges in this cache will become invalid and the corresponding
* reads will have to be re-aligned.
*/
void clear(bool getLock = true) {
ThreadSafe ts(lockPtr(), shared_ && getLock);
void clear() {
ThreadSafe ts(mutex_m);
pool_.clear();
qmap_.clear();
qlist_.clear();
Expand Down Expand Up @@ -585,17 +559,9 @@ class AlignmentCache {
* Return the lock object.
*/
MUTEX_T& lock() {
return mutex_m;
return mutex_m;
}

/**
* Return a const pointer to the lock object. This allows us to
* write const member functions that grab the lock.
*/
MUTEX_T* lockPtr() const {
return const_cast<MUTEX_T*>(&mutex_m);
}

/**
* Return true iff this cache is shared among threads.
*/
Expand All @@ -618,6 +584,79 @@ class AlignmentCache {
bool shared_; // true -> this cache is global
MUTEX_T mutex_m; // mutex used for syncronization in case the the cache is shared.
uint32_t version_; // cache version

private:

template <int S>
void queryQvalImpl(
const QVal& qv,
EList<SATuple, S>& satups,
size_t& nrange,
size_t& nelt)
{
assert(qv.repOk(*this));
const size_t refi = qv.offset();
const size_t reff = refi + qv.numRanges();
// For each reference sequence sufficiently similar to the
// query sequence in the QKey...
for(size_t i = refi; i < reff; i++) {
// Get corresponding SAKey, containing similar reference
// sequence & length
SAKey sak = qlist_.get(i);
// Shouldn't have identical keys in qlist_
assert(i == refi || qlist_.get(i) != qlist_.get(i-1));
// Get corresponding SANode
SANode *n = samap_.lookup(sak);
assert(n != NULL);
const SAVal& sav = n->payload;
assert(sav.repOk(*this));
if(sav.len > 0) {
nrange++;
satups.expand();
satups.back().init(sak, sav.topf, sav.topb, TSlice(salist_, sav.i, sav.len));
nelt += sav.len;
#ifndef NDEBUG
// Shouldn't add consecutive identical entries too satups
if(i > refi) {
const SATuple b1 = satups.back();
const SATuple b2 = satups[satups.size()-2];
assert(b1.key != b2.key || b1.topf != b2.topf || b1.offs != b2.offs);
}
#endif
}
}
}

/**
* Add a new association between a read sequnce ('seq') and a
* reference sequence ('')
*/
bool addOnTheFlyImpl(
QVal& qv, // qval that points to the range of reference substrings
const SAKey& sak, // the key holding the reference substring
TIndexOffU topf, // top range elt in BWT index
TIndexOffU botf, // bottom range elt in BWT index
TIndexOffU topb, // top range elt in BWT' index
TIndexOffU botb); // bottom range elt in BWT' index

/**
* Add a new query key ('qk'), usually a 2-bit encoded substring of
* the read) as the key in a new Red-Black node in the qmap and
* return a pointer to the node's QVal.
*
* The expectation is that the caller is about to set about finding
* associated reference substrings, and that there will be future
* calls to addOnTheFly to add associations to reference substrings
* found.
*/
QVal* addImpl(
const QKey& qk,
bool *added)
{
assert(qk.cacheable());
QNode *n = qmap_.add(pool(), qk, added);
return (n != NULL ? &n->payload : NULL);
}
};

/**
Expand Down
18 changes: 9 additions & 9 deletions aligner_seed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1526,7 +1526,7 @@ SeedAligner::searchSeedBi(
bool leaveZone = s.zones[i].first < 0;
//bool leaveZoneIns = zones_[i].second < 0;
Constraint& cons = *zones[abs(s.zones[i].first)];
Constraint& insCons = *zones[abs(s.zones[i].second)];
//Constraint& insCons = *zones[abs(s.zones[i].second)];
int c = (*seq_)[off]; assert_range(0, 4, c);
int q = (*qual_)[off];
// Is it legal for us to advance on characters other than 'c'?
Expand Down Expand Up @@ -1598,14 +1598,14 @@ SeedAligner::searchSeedBi(
}
if(cons.canGap() && overall.canGap()) {
throw 1; // TODO
int delEx = 0;
if(cons.canDelete(delEx, *sc_) && overall.canDelete(delEx, *sc_)) {
// Try delete
}
int insEx = 0;
if(insCons.canInsert(insEx, *sc_) && overall.canInsert(insEx, *sc_)) {
// Try insert
}
// int delEx = 0;
// if(cons.canDelete(delEx, *sc_) && overall.canDelete(delEx, *sc_)) {
// // Try delete
// }
// int insEx = 0;
// if(insCons.canInsert(insEx, *sc_) && overall.canInsert(insEx, *sc_)) {
// // Try insert
// }
}
} // if(!bail)
}
Expand Down
3 changes: 1 addition & 2 deletions aligner_seed.h
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ class SeedResults {
size_t numRepeatSeeds() const {
return repTot_;
}

/**
* Return fraction of seeds that align repetitively.
*/
Expand Down Expand Up @@ -1401,7 +1401,6 @@ struct SeedSearchMetrics {
* SeedSearchMetrics object shread by multiple threads.
*/
void merge(const SeedSearchMetrics& m, bool getLock = false) {
ThreadSafe ts(&mutex_m, getLock);
seedsearch += m.seedsearch;
nrange += m.nrange;
nelt += m.nelt;
Expand Down
2 changes: 1 addition & 1 deletion aligner_seed2.h
Original file line number Diff line number Diff line change
Expand Up @@ -2235,7 +2235,7 @@ class DescentAlignmentSelector {
}
AlnScore asc(
-dr.sink().bestPenalty(), // numeric score
dr.query().length() - edits.size(),
(int)(dr.query().length() - edits.size()),
(int)edits.size(), // # edits
ns, // # Ns
ngap); // # gaps
Expand Down
4 changes: 2 additions & 2 deletions aligner_sw_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ struct SwMetrics {
* object. This is the only safe way to update a SwMetrics shared
* by multiple threads.
*/
void merge(const SwMetrics& r, bool getLock = false) {
ThreadSafe ts(&mutex_m, getLock);
void merge(const SwMetrics& r) {
ThreadSafe ts(mutex_m);
sws += r.sws;
sws10 += r.sws10;
sws5 += r.sws5;
Expand Down
20 changes: 10 additions & 10 deletions aligner_sw_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,11 @@ void SwDriver::extend(

// Have to do both because whether we can get through an N depends on
// which direction we're coming in
bool fwContains = ebwtFw.contains(tmp_rdseq_);
tmp_rdseq_.reverse();
bool bwContains = ebwtBw != NULL && ebwtBw->contains(tmp_rdseq_);
tmp_rdseq_.reverse();
assert(fwContains || bwContains);
// bool fwContains = ebwtFw.contains(tmp_rdseq_);
// tmp_rdseq_.reverse();
// bool bwContains = ebwtBw != NULL && ebwtBw->contains(tmp_rdseq_);
// tmp_rdseq_.reverse();
// assert(fwContains || bwContains);
}
#endif
ASSERT_ONLY(tmp_rdseq_.reverse());
Expand Down Expand Up @@ -471,11 +471,11 @@ void SwDriver::extend(

// Have to do both because whether we can get through an N depends on
// which direction we're coming in
bool fwContains = ebwtFw.contains(tmp_rdseq_);
tmp_rdseq_.reverse();
bool bwContains = ebwtBw != NULL && ebwtBw->contains(tmp_rdseq_);
tmp_rdseq_.reverse();
assert(fwContains || bwContains);
// bool fwContains = ebwtFw.contains(tmp_rdseq_);
// tmp_rdseq_.reverse();
// bool bwContains = ebwtBw != NULL && ebwtBw->contains(tmp_rdseq_);
// tmp_rdseq_.reverse();
// assert(fwContains || bwContains);
}
#endif
assert_lt(nlex, rdlen);
Expand Down
3 changes: 1 addition & 2 deletions aligner_swsse.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ struct SSEMetrics {
corerej = nrej = 0;
}

void merge(const SSEMetrics& o, bool getLock = false) {
ThreadSafe ts(&mutex_m, getLock);
void merge(const SSEMetrics& o) {
dp += o.dp;
dpsat += o.dpsat;
dpfail += o.dpfail;
Expand Down
2 changes: 1 addition & 1 deletion aligner_swsse_ee_i16.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1362,7 +1362,7 @@ bool SwAligner::backtraceNucleotidesEnd2EndSseI16(
int readq = (*qu_)[row];
assert_leq(col, origCol);
// Get score in this cell
bool empty, reportedThru, canMoveThru, branch = false;
bool empty = false, reportedThru, canMoveThru, branch = false;
int cur = SSEMatrix::H;
if(!d.mat_.reset_[row]) {
d.mat_.resetRow(row);
Expand Down
2 changes: 1 addition & 1 deletion aligner_swsse_loc_i16.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1721,7 +1721,7 @@ bool SwAligner::backtraceNucleotidesLocalSseI16(
int readq = (*qu_)[row];
assert_leq(col, origCol);
// Get score in this cell
bool empty, reportedThru, canMoveThru, branch = false;
bool empty = false, reportedThru, canMoveThru, branch = false;
int cur = SSEMatrix::H;
if(!d.mat_.reset_[row]) {
d.mat_.resetRow(row);
Expand Down
2 changes: 1 addition & 1 deletion aligner_swsse_loc_u8.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1719,7 +1719,7 @@ bool SwAligner::backtraceNucleotidesLocalSseU8(
int readq = (*qu_)[row];
assert_leq(col, origCol);
// Get score in this cell
bool empty, reportedThru, canMoveThru, branch = false;
bool empty = false, reportedThru, canMoveThru, branch = false;
int cur = SSEMatrix::H;
if(!d.mat_.reset_[row]) {
d.mat_.resetRow(row);
Expand Down
8 changes: 4 additions & 4 deletions aln_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ struct ReportingMetrics {
* into this object. This is the only safe way to update a
* ReportingMetrics shared by multiple threads.
*/
void merge(const ReportingMetrics& met, bool getLock = false) {
ThreadSafe ts(&mutex_m, getLock);
void merge(const ReportingMetrics& met) {
ThreadSafe ts(mutex_m);
nread += met.nread;

npaired += met.npaired;
Expand Down Expand Up @@ -847,8 +847,8 @@ class AlnSink {
/**
* Merge given metrics in with ours by summing all individual metrics.
*/
void mergeMetrics(const ReportingMetrics& met, bool getLock = true) {
met_.merge(met, getLock);
void mergeMetrics(const ReportingMetrics& met) {
met_.merge(met);
}

/**
Expand Down
Loading