queue.js 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. 'use strict'
  2. /* eslint-disable no-var */
  3. var reusify = require('reusify')
  4. function fastqueue (context, worker, _concurrency) {
  5. if (typeof context === 'function') {
  6. _concurrency = worker
  7. worker = context
  8. context = null
  9. }
  10. if (!(_concurrency >= 1)) {
  11. throw new Error('fastqueue concurrency must be equal to or greater than 1')
  12. }
  13. var cache = reusify(Task)
  14. var queueHead = null
  15. var queueTail = null
  16. var _running = 0
  17. var errorHandler = null
  18. var self = {
  19. push: push,
  20. drain: noop,
  21. saturated: noop,
  22. pause: pause,
  23. paused: false,
  24. get concurrency () {
  25. return _concurrency
  26. },
  27. set concurrency (value) {
  28. if (!(value >= 1)) {
  29. throw new Error('fastqueue concurrency must be equal to or greater than 1')
  30. }
  31. _concurrency = value
  32. if (self.paused) return
  33. for (; queueHead && _running < _concurrency;) {
  34. _running++
  35. release()
  36. }
  37. },
  38. running: running,
  39. resume: resume,
  40. idle: idle,
  41. length: length,
  42. getQueue: getQueue,
  43. unshift: unshift,
  44. empty: noop,
  45. kill: kill,
  46. killAndDrain: killAndDrain,
  47. error: error,
  48. abort: abort
  49. }
  50. return self
  51. function running () {
  52. return _running
  53. }
  54. function pause () {
  55. self.paused = true
  56. }
  57. function length () {
  58. var current = queueHead
  59. var counter = 0
  60. while (current) {
  61. current = current.next
  62. counter++
  63. }
  64. return counter
  65. }
  66. function getQueue () {
  67. var current = queueHead
  68. var tasks = []
  69. while (current) {
  70. tasks.push(current.value)
  71. current = current.next
  72. }
  73. return tasks
  74. }
  75. function resume () {
  76. if (!self.paused) return
  77. self.paused = false
  78. if (queueHead === null) {
  79. _running++
  80. release()
  81. return
  82. }
  83. for (; queueHead && _running < _concurrency;) {
  84. _running++
  85. release()
  86. }
  87. }
  88. function idle () {
  89. return _running === 0 && self.length() === 0
  90. }
  91. function push (value, done) {
  92. var current = cache.get()
  93. current.context = context
  94. current.release = release
  95. current.value = value
  96. current.callback = done || noop
  97. current.errorHandler = errorHandler
  98. if (_running >= _concurrency || self.paused) {
  99. if (queueTail) {
  100. queueTail.next = current
  101. queueTail = current
  102. } else {
  103. queueHead = current
  104. queueTail = current
  105. self.saturated()
  106. }
  107. } else {
  108. _running++
  109. worker.call(context, current.value, current.worked)
  110. }
  111. }
  112. function unshift (value, done) {
  113. var current = cache.get()
  114. current.context = context
  115. current.release = release
  116. current.value = value
  117. current.callback = done || noop
  118. current.errorHandler = errorHandler
  119. if (_running >= _concurrency || self.paused) {
  120. if (queueHead) {
  121. current.next = queueHead
  122. queueHead = current
  123. } else {
  124. queueHead = current
  125. queueTail = current
  126. self.saturated()
  127. }
  128. } else {
  129. _running++
  130. worker.call(context, current.value, current.worked)
  131. }
  132. }
  133. function release (holder) {
  134. if (holder) {
  135. cache.release(holder)
  136. }
  137. var next = queueHead
  138. if (next && _running <= _concurrency) {
  139. if (!self.paused) {
  140. if (queueTail === queueHead) {
  141. queueTail = null
  142. }
  143. queueHead = next.next
  144. next.next = null
  145. worker.call(context, next.value, next.worked)
  146. if (queueTail === null) {
  147. self.empty()
  148. }
  149. } else {
  150. _running--
  151. }
  152. } else if (--_running === 0) {
  153. self.drain()
  154. }
  155. }
  156. function kill () {
  157. queueHead = null
  158. queueTail = null
  159. self.drain = noop
  160. }
  161. function killAndDrain () {
  162. queueHead = null
  163. queueTail = null
  164. self.drain()
  165. self.drain = noop
  166. }
  167. function abort () {
  168. var current = queueHead
  169. queueHead = null
  170. queueTail = null
  171. while (current) {
  172. var next = current.next
  173. var callback = current.callback
  174. var errorHandler = current.errorHandler
  175. var val = current.value
  176. var context = current.context
  177. // Reset the task state
  178. current.value = null
  179. current.callback = noop
  180. current.errorHandler = null
  181. // Call error handler if present
  182. if (errorHandler) {
  183. errorHandler(new Error('abort'), val)
  184. }
  185. // Call callback with error
  186. callback.call(context, new Error('abort'))
  187. // Release the task back to the pool
  188. current.release(current)
  189. current = next
  190. }
  191. self.drain = noop
  192. }
  193. function error (handler) {
  194. errorHandler = handler
  195. }
  196. }
  197. function noop () {}
  198. function Task () {
  199. this.value = null
  200. this.callback = noop
  201. this.next = null
  202. this.release = noop
  203. this.context = null
  204. this.errorHandler = null
  205. var self = this
  206. this.worked = function worked (err, result) {
  207. var callback = self.callback
  208. var errorHandler = self.errorHandler
  209. var val = self.value
  210. self.value = null
  211. self.callback = noop
  212. if (self.errorHandler) {
  213. errorHandler(err, val)
  214. }
  215. callback.call(self.context, err, result)
  216. self.release(self)
  217. }
  218. }
  219. function queueAsPromised (context, worker, _concurrency) {
  220. if (typeof context === 'function') {
  221. _concurrency = worker
  222. worker = context
  223. context = null
  224. }
  225. function asyncWrapper (arg, cb) {
  226. worker.call(this, arg)
  227. .then(function (res) {
  228. cb(null, res)
  229. }, cb)
  230. }
  231. var queue = fastqueue(context, asyncWrapper, _concurrency)
  232. var pushCb = queue.push
  233. var unshiftCb = queue.unshift
  234. queue.push = push
  235. queue.unshift = unshift
  236. queue.drained = drained
  237. return queue
  238. function push (value) {
  239. var p = new Promise(function (resolve, reject) {
  240. pushCb(value, function (err, result) {
  241. if (err) {
  242. reject(err)
  243. return
  244. }
  245. resolve(result)
  246. })
  247. })
  248. // Let's fork the promise chain to
  249. // make the error bubble up to the user but
  250. // not lead to a unhandledRejection
  251. p.catch(noop)
  252. return p
  253. }
  254. function unshift (value) {
  255. var p = new Promise(function (resolve, reject) {
  256. unshiftCb(value, function (err, result) {
  257. if (err) {
  258. reject(err)
  259. return
  260. }
  261. resolve(result)
  262. })
  263. })
  264. // Let's fork the promise chain to
  265. // make the error bubble up to the user but
  266. // not lead to a unhandledRejection
  267. p.catch(noop)
  268. return p
  269. }
  270. function drained () {
  271. var p = new Promise(function (resolve) {
  272. process.nextTick(function () {
  273. if (queue.idle()) {
  274. resolve()
  275. } else {
  276. var previousDrain = queue.drain
  277. queue.drain = function () {
  278. if (typeof previousDrain === 'function') previousDrain()
  279. resolve()
  280. queue.drain = previousDrain
  281. }
  282. }
  283. })
  284. })
  285. return p
  286. }
  287. }
  288. module.exports = fastqueue
  289. module.exports.promise = queueAsPromised