Skip to content

Commit

Permalink
feat: new hooks API
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Apr 5, 2024
1 parent cae5625 commit 45e94a0
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 27 deletions.
118 changes: 96 additions & 22 deletions lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

const {
InvalidArgumentError,
NotSupportedError
NotSupportedError,
AbortError
} = require('./errors')
const assert = require('node:assert')
const { parseHeaders } = require('./util')
const {
isValidHTTPToken,
isValidHeaderChar,
Expand All @@ -26,6 +28,44 @@ const invalidPathRegex = /[^\u0021-\u00ff]/

const kHandler = Symbol('handler')

class Controller {
#paused = false
#reason = null
#abort
#resume

constructor (abort, resume) {
this.#abort = abort
this.#resume = resume
}

pause () {
this.#paused = true
}

resume () {
this.#paused = false
this.#resume?.()
}

abort (reason) {
this.#reason = reason ?? new AbortError()
this.#abort?.(this.#reason)
}

get paused () {
return this.#paused
}

get aborted () {
return this.#reason !== null
}

get reason () {
return this.#reason
}
}

class Request {
constructor (origin, {
path,
Expand Down Expand Up @@ -91,6 +131,8 @@ class Request {

this.abort = null

this.controller = null

if (body == null) {
this.body = null
} else if (isStream(body)) {
Expand Down Expand Up @@ -192,12 +234,11 @@ class Request {
}

onBodySent (chunk) {
if (this[kHandler].onBodySent) {
try {
return this[kHandler].onBodySent(chunk)
} catch (err) {
this.abort(err)
}
try {
this[kHandler].onRequestData?.(chunk)
this[kHandler].onBodySent?.(chunk)
} catch (err) {
this.abort(err)
}
}

Expand All @@ -206,41 +247,61 @@ class Request {
channels.bodySent.publish({ request: this })
}

if (this[kHandler].onRequestSent) {
try {
return this[kHandler].onRequestSent()
} catch (err) {
this.abort(err)
}
try {
this[kHandler].onRequestEnd?.()
this[kHandler].onRequestSent?.()
} catch (err) {
this.abort(err)
}
}

onConnect (abort) {
assert(!this.aborted)
assert(!this.completed)
assert(!this.controller)

if (this.error) {
abort(this.error)
} else {
this.abort = abort
return this[kHandler].onConnect(abort)
return this[kHandler].onConnect?.(abort)
}
}

onResponseStarted () {
return this[kHandler].onResponseStarted?.()
onResponseStarted (resume) {
assert(!this.aborted)
assert(!this.completed)

this.controller = new Controller(this.abort, resume)
try {
this[kHandler].onResponseStart?.(this.controller)

this[kHandler].onResponseStarted?.()

return !this.controller.paused
} catch (err) {
this.abort(err)
}
}

onHeaders (statusCode, headers, resume, statusText) {
onHeaders (headers, statusCode, statusText) {
assert(!this.aborted)
assert(!this.completed)
assert(this.controller)

if (channels.headers.hasSubscribers) {
channels.headers.publish({ request: this, response: { statusCode, headers, statusText } })
}

try {
return this[kHandler].onHeaders(statusCode, headers, resume, statusText)
this[kHandler].onResponseHeaders?.(parseHeaders(headers), statusCode, statusText)

const controller = this.controller
if (this[kHandler].onHeaders?.(statusCode, headers, () => controller.resume(), statusText || '') === false) {
this.controller.pause()
}

return !this.controller.paused
} catch (err) {
this.abort(err)
}
Expand All @@ -249,9 +310,16 @@ class Request {
onData (chunk) {
assert(!this.aborted)
assert(!this.completed)
assert(this.controller)

try {
return this[kHandler].onData(chunk)
this[kHandler].onResponseData?.(chunk)

if (this[kHandler].onData?.(chunk) === false) {
this.controller.pause()
}

return !this.controller.paused
} catch (err) {
this.abort(err)
return false
Expand All @@ -270,13 +338,17 @@ class Request {

assert(!this.aborted)

this.controller = null
this.completed = true
if (channels.trailers.hasSubscribers) {
channels.trailers.publish({ request: this, trailers })
}

try {
return this[kHandler].onComplete(trailers)
this[kHandler].onResponseTrailers?.(parseHeaders(trailers))
this[kHandler].onResponseEnd?.()

this[kHandler].onComplete(trailers)
} catch (err) {
// TODO (fix): This might be a bad idea?
this.onError(err)
Expand All @@ -293,6 +365,8 @@ class Request {
if (this.aborted) {
return
}

this.controller = null
this.aborted = true

return this[kHandler].onError(error)
Expand All @@ -301,12 +375,12 @@ class Request {
onFinally () {
if (this.errorHandler) {
this.body.off('error', this.errorHandler)
this.errorHandler = null
this.errorHandler = undefined
}

if (this.endHandler) {
this.body.off('end', this.endHandler)
this.endHandler = null
this.endHandler = undefined
}
}

Expand Down
7 changes: 5 additions & 2 deletions lib/dispatcher/client-h1.js
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,10 @@ class Parser {
if (!request) {
return -1
}
request.onResponseStarted()

const pause = request.onResponseStarted(this.resume) === false

return pause ? constants.ERROR.PAUSED : 0
}

onHeaderField (buf) {
Expand Down Expand Up @@ -504,7 +507,7 @@ class Parser {
socket[kReset] = true
}

const pause = request.onHeaders(statusCode, headers, this.resume, statusText) === false
const pause = request.onHeaders(headers, statusCode, statusText) === false

if (request.aborted) {
return -1
Expand Down
6 changes: 3 additions & 3 deletions lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,6 @@ function writeH2 (client, request) {

stream.once('response', headers => {
const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
request.onResponseStarted()

// Due to the stream nature, it is possible we face a race condition
// where the stream has been assigned, but the request has been aborted
// the request remains in-flight and headers hasn't been received yet
Expand All @@ -423,7 +421,9 @@ function writeH2 (client, request) {
return
}

if (request.onHeaders(Number(statusCode), parseH2Headers(realHeaders), stream.resume.bind(stream), '') === false) {
request.onResponseStarted(stream.resume.bind(stream))

if (request.onHeaders(parseH2Headers(realHeaders), Number(statusCode), '') === false) {
stream.pause()
}

Expand Down
35 changes: 35 additions & 0 deletions types/dispatcher.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,15 @@ declare namespace Dispatcher {
context: object;
}
export type StreamFactory = (data: StreamFactoryData) => Writable;
export interface Controller {
readonly aborted: boolean;
readonly reason: Error | null;
readonly paused: boolean;

pause(): void;
resume(): void;
abort(reason: Error): void;
}
export interface DispatchHandlers {
/** Invoked before request is dispatched on socket. May be invoked multiple times when a request is retried when the request at the head of the pipeline fails. */
onConnect?(abort: () => void): void;
Expand All @@ -232,6 +241,32 @@ declare namespace Dispatcher {
onComplete?(trailers: string[] | null): void;
/** Invoked when a body chunk is sent to the server. May be invoked multiple times for chunked requests */
onBodySent?(chunkSize: number, totalBytesSent: number): void;

// New API

/** Invoked after request is starting to be processed */
onRequestStart?(controller: Controller): void;
/** Invoked after headers data is sent */
onRequestHeaders?(headers: Record<string, string>): void;
/** Invoked after payload data is sent. */
onRequestData?(chunk: Buffer | string): void;
/** Invoked after request has finished sending */
onRequestEnd?(): void;
/** Invoked after request has failed */
onRequestError?(err: Error): void;

/** Invoked after response is starting to be processed */
onResponseStart?(controller: Controller): void;
/** Invoked after headers data has been received */
onResponseHeaders?(headers: Record<string, string>, statusCode: number, statusText?: string): void;
/** Invoked after response payload data is received. */
onResponseData?(chunk: Buffer | string): void;
/** Invoked after trailers data has been received */
onResponseTrailers?(trailers: Record<string, string>): void;
/** Invoked after response has finished */
onResponseEnd?(): void;
/** Invoked after request has failed */
onResponseError?(err: Error): void;
}
export type PipelineHandler = (data: PipelineHandlerData) => Readable;
export type HttpMethod = 'GET' | 'HEAD' | 'POST' | 'PUT' | 'DELETE' | 'CONNECT' | 'OPTIONS' | 'TRACE' | 'PATCH';
Expand Down

0 comments on commit 45e94a0

Please sign in to comment.