-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathindex.js
161 lines (124 loc) · 4.01 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
'use strict';
var arrayRemove = require('./lib/arrayRemove');
function PThrottler(defaultConcurrency, types) {
this._defaultConcurrency = typeof defaultConcurrency === 'number' ? defaultConcurrency : 10;
// Initialize some needed properties
this._queue = {};
this._slots = types || {};
this._executing = [];
}
// -----------------
PThrottler.prototype.enqueue = function (func, type) {
var deferred = {};
var promise;
promise = new Promise(function (resolve, reject) {
var types;
var entry;
deferred.resolve = resolve;
deferred.reject = reject;
type = type || '';
types = Array.isArray(type) ? type : [type];
entry = {
func: func,
types: types,
deferred: deferred
};
// Add the entry to all the types queues
types.forEach(function (type) {
var queue = this._queue[type] = this._queue[type] || [];
queue.push(entry);
}, this);
// Process the entry shortly later so that handlers can be attached to the returned promise
setImmediate(this._processEntry.bind(this, entry));
}.bind(this));
deferred.promise = promise;
return promise;
};
PThrottler.prototype.abort = function () {
var promises;
// Empty the whole queue
Object.keys(this._queue).forEach(function (type) {
this._queue[type] = [];
}, this);
// Wait for all pending functions to finish
promises = this._executing.map(function (entry) {
return entry.deferred.promise.catch(function () {}); // Ignore any errors
});
return Promise.all(promises)
.then(function () {});
};
// -----------------
PThrottler.prototype._processQueue = function (type) {
var queue = this._queue[type];
var length = queue ? queue.length : 0;
var x;
for (x = 0; x < length; ++x) {
if (this._processEntry(queue[x])) {
break;
}
}
};
PThrottler.prototype._processEntry = function (entry) {
var allFree = entry.types.every(this._hasSlot, this);
var promise;
// If there is a free slot for every type
if (allFree) {
// For each type
entry.types.forEach(function (type) {
// Remove entry from the queue
arrayRemove(this._queue[type], entry);
// Take slot
this._takeSlot(type);
}, this);
// Execute the function
this._executing.push(entry);
promise = new Promise(function (resolve, reject) {
Promise.resolve(entry.func())
.then(resolve, reject);
});
promise.then(
this._onFulfill.bind(this, entry, true),
this._onFulfill.bind(this, entry, false)
);
}
return allFree;
};
PThrottler.prototype._onFulfill = function (entry, ok, result) {
// Resolve/reject the deferred based on success/error of the promise
if (ok) {
entry.deferred.resolve(result);
} else {
entry.deferred.reject(result);
}
// Remove it from the executing list
arrayRemove(this._executing, entry);
// Free up slots for every type
entry.types.forEach(this._freeSlot, this);
// Find candidates for the free slots of each type
entry.types.forEach(this._processQueue, this);
};
PThrottler.prototype._hasSlot = function (type) {
var freeSlots = this._slots[type];
if (freeSlots == null) {
freeSlots = this._defaultConcurrency;
}
return freeSlots > 0;
};
PThrottler.prototype._takeSlot = function (type) {
if (this._slots[type] == null) {
this._slots[type] = this._defaultConcurrency;
} else if (!this._slots[type]) {
throw new Error('No free slots');
}
// Decrement the free slots
--this._slots[type];
};
PThrottler.prototype._freeSlot = function (type) {
if (this._slots[type] != null) {
++this._slots[type];
}
};
PThrottler.create = function (defaultConcurrency, types) {
return new PThrottler(defaultConcurrency, types);
};
module.exports = PThrottler;