Skip to content

Commit

Permalink
fix: add optional concurrency parameter to Task.sequence
Browse files Browse the repository at this point in the history
  • Loading branch information
tdreyno committed Oct 6, 2021
1 parent 1a9f785 commit cac31fe
Show file tree
Hide file tree
Showing 8 changed files with 2,821 additions and 3,760 deletions.
4 changes: 2 additions & 2 deletions .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# See here for image contents: https://github.com/microsoft/vscode-dev-containers/tree/v0.154.2/containers/typescript-node/.devcontainer/base.Dockerfile

# [Choice] Node.js version: 14, 12, 10
ARG VARIANT="14-buster"
# [Choice] Node.js version: 16, 14
ARG VARIANT=16
FROM mcr.microsoft.com/vscode/devcontainers/typescript-node:0-${VARIANT}

# [Optional] Uncomment this section to install additional OS packages.
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
node: ["16", "14", "12"]
node: ["16", "14"]
name: Node ${{ matrix.node }}
steps:
- uses: actions/checkout@master
Expand Down
3 changes: 3 additions & 0 deletions docs/task-static.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ Creates a task that will always run an array of tasks **serially**. The result i
All task error and value types must be the same. If you need different types for the items of the array, consider simply chaining the tasks together in order.
Additionally, a second parameter can be added to allow concurrency. The default concurreny is `1` task at a time, but may be increased. Setting this concurrency to Infinity is the same a `Task.all`.
{% tabs %}
{% tab title="Usage" %}
Expand All @@ -210,6 +212,7 @@ const task: Task<never, number[]> = Task.sequence([of(5), of(10)])
```typescript
type sequence = <E, S>(
tasksOrPromises: Array<Task<E, S> | Promise<S>>,
maxConcurrent = 1,
) => Task<E, S[]>
```
Expand Down
16 changes: 8 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"prepare": "husky install"
},
"engines": {
"node": ">=10"
"node": ">=14"
},
"@pika/pack": {
"pipeline": [
Expand Down Expand Up @@ -49,20 +49,20 @@
]
},
"volta": {
"node": "14.11.0",
"node": "14.17.0",
"yarn": "1.22.4"
},
"devDependencies": {
"@commitlint/cli": "^13.1.0",
"@commitlint/config-conventional": "^13.2.0",
"@commitlint/prompt": "^12.1.4",
"@commitlint/prompt": "^13.2.0",
"@pika/pack": "^0.5.0",
"@pika/plugin-build-node": "^0.9.2",
"@pika/plugin-build-web": "^0.9.2",
"@pika/plugin-ts-standard-pkg": "^0.9.2",
"@semantic-release/changelog": "^5.0.1",
"@semantic-release/git": "^9.0.0",
"@semantic-release/github": "^7.2.3",
"@semantic-release/changelog": "^6.0.0",
"@semantic-release/git": "^10.0.0",
"@semantic-release/github": "^8.0.1",
"@types/jest": "^27.0.2",
"@types/react": "^17.0.26",
"@typescript-eslint/eslint-plugin": "^4.31.1",
Expand All @@ -71,13 +71,13 @@
"cz-conventional-changelog": "3.3.0",
"eslint": "^7.32.0",
"eslint-config-prettier": "^8.3.0",
"eslint-plugin-prettier": "^3.4.0",
"eslint-plugin-prettier": "^4.0.0",
"gzip-size-cli": "^5.0.0",
"husky": "^7.0.1",
"jest": "^27.0.6",
"lint-staged": "^11.1.2",
"prettier": "^2.3.2",
"semantic-release": "^17.4.7",
"semantic-release": "^18.0.0",
"terser": "^5.9.0",
"ts-jest": "^27.0.5",
"ts-toolbelt": "^9.6.0",
Expand Down
98 changes: 57 additions & 41 deletions src/Task/Task.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* eslint-disable @typescript-eslint/no-misused-promises, @typescript-eslint/no-non-null-assertion, @typescript-eslint/no-use-before-define */
import { constant, identity, range, Validation } from "../util"
import { constant, drop, identity, range, take, Validation } from "../util"

export type Reject<E> = (error: E) => void
export type Resolve<S> = (result: S) => void
Expand Down Expand Up @@ -393,41 +393,7 @@ export const firstSuccess = <E, S>(tasks: Array<Task<E, S>>): Task<E[], S> =>
* @param tasks The tasks to run in parallel.
*/
export const all = <E, S>(tasks: Array<Task<E, S>>): Task<E, S[]> =>
tasks.length === 0
? of([])
: new Task<E, S[]>((reject, resolve) => {
let isDone = false
let runningTasks = tasks.length

const results: S[] = []

return tasks.map((task, i) =>
task.fork(
(error: E) => {
if (isDone) {
return
}

isDone = true

reject(error)
},
(result: S) => {
if (isDone) {
return
}

runningTasks -= 1

results[i] = result

if (runningTasks === 0) {
resolve(results)
}
},
),
)
})
sequence(tasks, Infinity)

/**
* Given an array of task which return a result, return a new task which returns an array of successful results.
Expand Down Expand Up @@ -494,11 +460,61 @@ export const zipWith = <E, E2, S, S2, V>(
* Given an array of task which return a result, return a new task which results an array of results.
* @param tasks The tasks to run in sequence.
*/
export const sequence = <E, S>(tasks: Array<Task<E, S>>): Task<E, S[]> =>
tasks.reduce(
(sum, task) => chain(list => map(result => [...list, result], task), sum),
succeed([]) as Task<E, S[]>,
)
export const sequence = <E, S>(
tasks: Array<Task<E, S>>,
maxConcurrent = 1,
): Task<E, S[]> =>
new Task((reject, resolve) => {
let isDone = false

type TaskPosition = [Task<E, S>, number]

let queue = tasks.map<TaskPosition>((task, i) => [task, i])
const inflight = new Set<Task<E, S>>()
const results: S[] = []

const enqueue = () => {
if (isDone) {
return
}

if (queue.length <= 0 && inflight.size <= 0) {
isDone = true
resolve(results)
return
}

const howMany = Math.min(queue.length, maxConcurrent - inflight.size)

const readyTasks = take(howMany, queue)
queue = drop(howMany, queue)

readyTasks.forEach(([task, i]) => {
inflight.add(task)

task.fork(
(error: E) => {
if (isDone) {
return
}

isDone = true

reject(error)
},
(result: S) => {
results[i] = result

inflight.delete(task)

enqueue()
},
)
})
}

enqueue()
})

/**
* Given a task, swap the error and success values.
Expand Down
5 changes: 4 additions & 1 deletion src/Task/__tests__/sequence.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ describe("sequence", () => {
const resolve = jest.fn()
const reject = jest.fn()

sequence([succeedIn(100, "A"), succeedIn(100, "B")]).fork(reject, resolve)
sequence([succeedIn(100, "A"), succeedIn(100, "B")], 1).fork(
reject,
resolve,
)

jest.advanceTimersByTime(100)

Expand Down
6 changes: 6 additions & 0 deletions src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,9 @@ export const failedValidation = <E>(error: E): Validation<E, never> => ({
success: false,
error,
})

export const take = <T>(count: number, arr: Array<T>): Array<T> =>
!isFinite(count) ? [...arr] : arr.slice(0, count)

export const drop = <T>(count: number, arr: Array<T>): Array<T> =>
!isFinite(count) ? [] : arr.splice(count)
Loading

0 comments on commit cac31fe

Please sign in to comment.