Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new hooks API #3054

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 96 additions & 22 deletions lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

const {
InvalidArgumentError,
NotSupportedError
NotSupportedError,
AbortError
} = require('./errors')
const assert = require('node:assert')
const { parseHeaders } = require('./util')
const {
isValidHTTPToken,
isValidHeaderChar,
Expand All @@ -26,6 +28,44 @@ const invalidPathRegex = /[^\u0021-\u00ff]/

const kHandler = Symbol('handler')

class Controller {
#paused = false
#reason = null
#abort
#resume

constructor (abort, resume) {
this.#abort = abort
this.#resume = resume
}

pause () {
this.#paused = true
}

resume () {
this.#paused = false
this.#resume?.()
}

abort (reason) {
this.#reason = reason ?? new AbortError()
this.#abort?.(this.#reason)
}

get paused () {
return this.#paused
}

get aborted () {
return this.#reason !== null
}

get reason () {
return this.#reason
}
}

class Request {
constructor (origin, {
path,
Expand Down Expand Up @@ -91,6 +131,8 @@ class Request {

this.abort = null

this.controller = null

if (body == null) {
this.body = null
} else if (isStream(body)) {
Expand Down Expand Up @@ -192,12 +234,11 @@ class Request {
}

onBodySent (chunk) {
if (this[kHandler].onBodySent) {
try {
return this[kHandler].onBodySent(chunk)
} catch (err) {
this.abort(err)
}
try {
this[kHandler].onRequestData?.(chunk)
this[kHandler].onBodySent?.(chunk)
} catch (err) {
this.abort(err)
}
}

Expand All @@ -206,41 +247,61 @@ class Request {
channels.bodySent.publish({ request: this })
}

if (this[kHandler].onRequestSent) {
try {
return this[kHandler].onRequestSent()
} catch (err) {
this.abort(err)
}
try {
this[kHandler].onRequestEnd?.()
this[kHandler].onRequestSent?.()
} catch (err) {
this.abort(err)
}
}

onConnect (abort) {
assert(!this.aborted)
assert(!this.completed)
assert(!this.controller)

if (this.error) {
abort(this.error)
} else {
this.abort = abort
return this[kHandler].onConnect(abort)
return this[kHandler].onConnect?.(abort)
}
}

onResponseStarted () {
return this[kHandler].onResponseStarted?.()
onResponseStarted (resume) {
assert(!this.aborted)
assert(!this.completed)

this.controller = new Controller(this.abort, resume)
try {
this[kHandler].onResponseStart?.(this.controller)

this[kHandler].onResponseStarted?.()

return !this.controller.paused
} catch (err) {
this.abort(err)
}
}

onHeaders (statusCode, headers, resume, statusText) {
onHeaders (headers, statusCode, statusText) {
assert(!this.aborted)
assert(!this.completed)
assert(this.controller)

if (channels.headers.hasSubscribers) {
channels.headers.publish({ request: this, response: { statusCode, headers, statusText } })
}

try {
return this[kHandler].onHeaders(statusCode, headers, resume, statusText)
this[kHandler].onResponseHeaders?.(parseHeaders(headers), statusCode, statusText)
ronag marked this conversation as resolved.
Show resolved Hide resolved

const controller = this.controller
if (this[kHandler].onHeaders?.(statusCode, headers, () => controller.resume(), statusText || '') === false) {
this.controller.pause()
}

return !this.controller.paused
metcoder95 marked this conversation as resolved.
Show resolved Hide resolved
} catch (err) {
this.abort(err)
}
Expand All @@ -249,9 +310,16 @@ class Request {
onData (chunk) {
assert(!this.aborted)
assert(!this.completed)
assert(this.controller)

try {
return this[kHandler].onData(chunk)
this[kHandler].onResponseData?.(chunk)

if (this[kHandler].onData?.(chunk) === false) {
this.controller.pause()
}

return !this.controller.paused
} catch (err) {
this.abort(err)
return false
Expand All @@ -270,13 +338,17 @@ class Request {

assert(!this.aborted)

this.controller = null
this.completed = true
if (channels.trailers.hasSubscribers) {
channels.trailers.publish({ request: this, trailers })
}

try {
return this[kHandler].onComplete(trailers)
this[kHandler].onResponseTrailers?.(parseHeaders(trailers))
this[kHandler].onResponseEnd?.()

this[kHandler].onComplete(trailers)
} catch (err) {
// TODO (fix): This might be a bad idea?
this.onError(err)
Expand All @@ -293,6 +365,8 @@ class Request {
if (this.aborted) {
return
}

this.controller = null
this.aborted = true

return this[kHandler].onError(error)
Expand All @@ -301,12 +375,12 @@ class Request {
onFinally () {
if (this.errorHandler) {
this.body.off('error', this.errorHandler)
this.errorHandler = null
this.errorHandler = undefined
}

if (this.endHandler) {
this.body.off('end', this.endHandler)
this.endHandler = null
this.endHandler = undefined
}
}

Expand Down
7 changes: 5 additions & 2 deletions lib/dispatcher/client-h1.js
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,10 @@ class Parser {
if (!request) {
return -1
}
request.onResponseStarted()

const pause = request.onResponseStarted(this.resume) === false

return pause ? constants.ERROR.PAUSED : 0
}

onHeaderField (buf) {
Expand Down Expand Up @@ -504,7 +507,7 @@ class Parser {
socket[kReset] = true
}

const pause = request.onHeaders(statusCode, headers, this.resume, statusText) === false
const pause = request.onHeaders(headers, statusCode, statusText) === false

if (request.aborted) {
return -1
Expand Down
6 changes: 3 additions & 3 deletions lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,6 @@ function writeH2 (client, request) {

stream.once('response', headers => {
const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
request.onResponseStarted()

// Due to the stream nature, it is possible we face a race condition
// where the stream has been assigned, but the request has been aborted
// the request remains in-flight and headers hasn't been received yet
Expand All @@ -423,7 +421,9 @@ function writeH2 (client, request) {
return
}

if (request.onHeaders(Number(statusCode), parseH2Headers(realHeaders), stream.resume.bind(stream), '') === false) {
request.onResponseStarted(stream.resume.bind(stream))

if (request.onHeaders(parseH2Headers(realHeaders), Number(statusCode), '') === false) {
stream.pause()
}

Expand Down
40 changes: 40 additions & 0 deletions types/dispatcher.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,15 @@ declare namespace Dispatcher {
context: object;
}
export type StreamFactory = (data: StreamFactoryData) => Writable;
export interface Controller {
readonly aborted: boolean;
readonly reason: Error | null;
readonly paused: boolean;

pause(): void;
resume(): void;
abort(reason: Error): void;
}
export interface DispatchHandlers {
/** Invoked before request is dispatched on socket. May be invoked multiple times when a request is retried when the request at the head of the pipeline fails. */
onConnect?(abort: () => void): void;
Expand All @@ -232,6 +241,37 @@ declare namespace Dispatcher {
onComplete?(trailers: string[] | null): void;
/** Invoked when a body chunk is sent to the server. May be invoked multiple times for chunked requests */
onBodySent?(chunkSize: number, totalBytesSent: number): void;

// New API

/** Invoked when handler is destroyed */
onDestroy?(): void;

/** Invoked after request is starting to be processed */
onRequestStart?(controller: Controller): void;
/** Invoked after headers data is sent */
onRequestHeaders?(headers: Record<string, string>): void;
/** Invoked after payload data is sent. */
onRequestData?(chunk: Buffer | string): void;
/** Invoked after trailers data is sent */
onRequestTrailers?(trailers: Record<string, string>): void;
/** Invoked after request has finished sending */
onRequestEnd?(): void;
/** Invoked after request has failed */
onRequestError?(err: Error): void;

/** Invoked after response is starting to be processed */
onResponseStart?(controller: Controller): void;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're not passing a controller here.
Also, in the onRequestStart above, you don't pass it so I would remove the comment at all.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we have 2 different controllers, one for the request and one for the response, they may run in parallel.

/** Invoked after headers data has been received */
onResponseHeaders?(headers: Record<string, string>, statusCode: number, statusText?: string): void;
/** Invoked after response payload data is received. */
onResponseData?(chunk: Buffer | string): void;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, how are we planning for the hook to hint the controller about backpressure, as it seems it does not receives the controller?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the user saves a reference to the controller from the start hook. By I guess we could always send in the controller?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair, we can keep it simple and keep the controller for that

/** Invoked after trailers data has been received */
onResponseTrailers?(trailers: Record<string, string>): void;
/** Invoked after response has finished */
onResponseEnd?(): void;
/** Invoked after request has failed */
onResponseError?(err: Error): void;
}
export type PipelineHandler = (data: PipelineHandlerData) => Readable;
export type HttpMethod = 'GET' | 'HEAD' | 'POST' | 'PUT' | 'DELETE' | 'CONNECT' | 'OPTIONS' | 'TRACE' | 'PATCH';
Expand Down
Loading