Skip to content

Commit ba7c7ba

Browse files
committed
Merge pull request #772 from justincy/queue-concurrency
Queue concurrency
2 parents 540e579 + 02b715c commit ba7c7ba

File tree

2 files changed

+60
-77
lines changed

2 files changed

+60
-77
lines changed

lib/async.js

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -838,8 +838,21 @@
838838
if (q.tasks.length === q.concurrency) {
839839
q.saturated();
840840
}
841-
async.setImmediate(q.process);
842841
});
842+
async.setImmediate(q.process);
843+
}
844+
function _next(q, tasks) {
845+
return function(){
846+
workers -= 1;
847+
var args = arguments;
848+
_arrayEach(tasks, function (task) {
849+
task.callback.apply(task, args);
850+
});
851+
if (q.tasks.length + workers === 0) {
852+
q.drain();
853+
}
854+
q.process();
855+
};
843856
}
844857

845858
var workers = 0;
@@ -863,32 +876,22 @@
863876
},
864877
process: function () {
865878
if (!q.paused && workers < q.concurrency && q.tasks.length) {
866-
var tasks = payload ?
867-
q.tasks.splice(0, payload) :
868-
q.tasks.splice(0, q.tasks.length);
869-
870-
var data = _map(tasks, function (task) {
871-
return task.data;
872-
});
873-
874-
if (q.tasks.length === 0) {
875-
q.empty();
876-
}
877-
workers += 1;
878-
var cb = only_once(next);
879-
worker(data, cb);
880-
}
881-
882-
function next() {
883-
workers -= 1;
884-
var args = arguments;
885-
_arrayEach(tasks, function (task) {
886-
task.callback.apply(task, args);
887-
});
888-
if (q.tasks.length + workers === 0) {
889-
q.drain();
879+
while(workers < q.concurrency && q.tasks.length){
880+
var tasks = payload ?
881+
q.tasks.splice(0, payload) :
882+
q.tasks.splice(0, q.tasks.length);
883+
884+
var data = _map(tasks, function (task) {
885+
return task.data;
886+
});
887+
888+
if (q.tasks.length === 0) {
889+
q.empty();
890+
}
891+
workers += 1;
892+
var cb = only_once(_next(q, tasks));
893+
worker(data, cb);
890894
}
891-
q.process();
892895
}
893896
},
894897
length: function () {

test/test-async.js

Lines changed: 31 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -2878,59 +2878,39 @@ exports['queue'] = {
28782878
});
28792879
},
28802880

2881+
// The original queue implementation allowed the concurrency to be changed only
2882+
// on the same event loop during which a task was added to the queue. This
2883+
// test attempts to be a more rubust test.
2884+
// Start with a concurrency of 1. Wait until a leter event loop and change
2885+
// the concurrency to 2. Wait again for a later loop then verify the concurrency.
2886+
// Repeat that one more time by chaning the concurrency to 5.
28812887
'changing concurrency': function (test) {
2882-
var call_order = [],
2883-
delays = [40,20,60,20];
2884-
2885-
// worker1: --1-2---3-4
2886-
// order of completion: 1,2,3,4
2887-
2888-
var q = async.queue(function (task, callback) {
2889-
setTimeout(function () {
2890-
call_order.push('process ' + task);
2891-
callback('error', 'arg');
2892-
}, delays.splice(0,1)[0]);
2893-
}, 2);
2894-
2895-
q.push(1, function (err, arg) {
2896-
test.equal(err, 'error');
2897-
test.equal(arg, 'arg');
2898-
test.equal(q.length(), 3);
2899-
call_order.push('callback ' + 1);
2900-
});
2901-
q.push(2, function (err, arg) {
2902-
test.equal(err, 'error');
2903-
test.equal(arg, 'arg');
2904-
test.equal(q.length(), 2);
2905-
call_order.push('callback ' + 2);
2906-
});
2907-
q.push(3, function (err, arg) {
2908-
test.equal(err, 'error');
2909-
test.equal(arg, 'arg');
2910-
test.equal(q.length(), 1);
2911-
call_order.push('callback ' + 3);
2912-
});
2913-
q.push(4, function (err, arg) {
2914-
test.equal(err, 'error');
2915-
test.equal(arg, 'arg');
2916-
test.equal(q.length(), 0);
2917-
call_order.push('callback ' + 4);
2918-
});
2919-
test.equal(q.length(), 4);
2920-
test.equal(q.concurrency, 2);
2921-
q.concurrency = 1;
2922-
2923-
setTimeout(function () {
2924-
test.same(call_order, [
2925-
'process 1', 'callback 1',
2926-
'process 2', 'callback 2',
2927-
'process 3', 'callback 3',
2928-
'process 4', 'callback 4'
2929-
]);
2930-
test.equal(q.concurrency, 1);
2931-
test.equal(q.length(), 0);
2888+
2889+
var q = async.queue(function(task, callback){
2890+
setTimeout(function(){
2891+
callback();
2892+
}, 100);
2893+
}, 1);
2894+
2895+
for(var i = 0; i < 50; i++){
2896+
q.push('');
2897+
}
2898+
2899+
q.drain = function(){
29322900
test.done();
2933-
}, 250);
2901+
};
2902+
2903+
setTimeout(function(){
2904+
test.equal(q.concurrency, 1);
2905+
q.concurrency = 2;
2906+
setTimeout(function(){
2907+
test.equal(q.running(), 2);
2908+
q.concurrency = 5;
2909+
setTimeout(function(){
2910+
test.equal(q.running(), 5);
2911+
}, 500);
2912+
}, 500);
2913+
}, 500);
29342914
},
29352915

29362916
'push without callback': function (test) {

0 commit comments

Comments
 (0)