diff --git a/docs/docs/api/Dispatcher.md b/docs/docs/api/Dispatcher.md index 88c3f11e7ce..e307b247ee3 100644 --- a/docs/docs/api/Dispatcher.md +++ b/docs/docs/api/Dispatcher.md @@ -969,6 +969,12 @@ Parameters: * **targets** `Array` * **error** `Error` +Emitted when the dispatcher has been disconnected from the origin. + +> **Note**: For HTTP/2, this event is also emitted when the dispatcher has received the [GOAWAY Frame](https://webconcepts.info/concepts/http2-frame-type/0x7) with an Error with the message `HTTP/2: "GOAWAY" frame received` and the code `UND_ERR_INFO`. +> Due to nature of the protocol of using binary frames, it is possible that requests gets hanging as a frame can be received between the `HEADER` and `DATA` frames. +> It is recommended to handle this event and close the dispatcher to create a new HTTP/2 session. + ### Event: `'connectionError'` Parameters: diff --git a/lib/core/util.js b/lib/core/util.js index e7b5d9c1edd..d4bcdff5ba9 100644 --- a/lib/core/util.js +++ b/lib/core/util.js @@ -1,7 +1,7 @@ 'use strict' const assert = require('node:assert') -const { kDestroyed, kBodyUsed } = require('./symbols') +const { kDestroyed, kBodyUsed, kListeners } = require('./symbols') const { IncomingMessage } = require('node:http') const stream = require('node:stream') const net = require('node:net') @@ -534,6 +534,29 @@ function parseRangeHeader (range) { : null } +function addListener (obj, name, listener) { + const listeners = (obj[kListeners] ??= []) + listeners.push([name, listener]) + obj.on(name, listener) + return obj +} + +function removeAllListeners (obj) { + for (const [name, listener] of obj[kListeners] ?? []) { + obj.removeListener(name, listener) + } + obj[kListeners] = null +} + +function errorRequest (client, request, err) { + try { + request.onError(err) + assert(request.aborted) + } catch (err) { + client.emit('error', err) + } +} + const kEnumerableProperty = Object.create(null) kEnumerableProperty.enumerable = true @@ -556,6 +579,9 @@ module.exports = { isDestroyed, headerNameToString, bufferToLowerCasedHeaderName, + addListener, + removeAllListeners, + errorRequest, parseRawHeaders, parseHeaders, parseKeepAliveTimeout, diff --git a/lib/dispatcher/client-h1.js b/lib/dispatcher/client-h1.js index 5341f7de670..d30f54525aa 100644 --- a/lib/dispatcher/client-h1.js +++ b/lib/dispatcher/client-h1.js @@ -47,7 +47,6 @@ const { kMaxRequests, kCounter, kMaxResponseSize, - kListeners, kOnError, kResume, kHTTPContext @@ -56,23 +55,11 @@ const { const constants = require('../llhttp/constants.js') const EMPTY_BUF = Buffer.alloc(0) const FastBuffer = Buffer[Symbol.species] +const addListener = util.addListener +const removeAllListeners = util.removeAllListeners let extractBody -function addListener (obj, name, listener) { - const listeners = (obj[kListeners] ??= []) - listeners.push([name, listener]) - obj.on(name, listener) - return obj -} - -function removeAllListeners (obj) { - for (const [name, listener] of obj[kListeners] ?? []) { - obj.removeListener(name, listener) - } - obj[kListeners] = null -} - async function lazyllhttp () { const llhttpWasmData = process.env.JEST_WORKER_ID ? require('../llhttp/llhttp-wasm.js') : undefined @@ -719,14 +706,14 @@ async function connectH1 (client, socket) { const requests = client[kQueue].splice(client[kRunningIdx]) for (let i = 0; i < requests.length; i++) { const request = requests[i] - errorRequest(client, request, err) + util.errorRequest(client, request, err) } } else if (client[kRunning] > 0 && err.code !== 'UND_ERR_INFO') { // Fail head of pipeline. const request = client[kQueue][client[kRunningIdx]] client[kQueue][client[kRunningIdx]++] = null - errorRequest(client, request, err) + util.errorRequest(client, request, err) } client[kPendingIdx] = client[kRunningIdx] @@ -831,15 +818,6 @@ function resumeH1 (client) { } } -function errorRequest (client, request, err) { - try { - request.onError(err) - assert(request.aborted) - } catch (err) { - client.emit('error', err) - } -} - // https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2 function shouldSendContentLength (method) { return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT' @@ -906,7 +884,7 @@ function writeH1 (client, request) { // A user agent may send a Content-Length header with 0 value, this should be allowed. if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength !== null && request.contentLength !== contentLength) { if (client[kStrictContentLength]) { - errorRequest(client, request, new RequestContentLengthMismatchError()) + util.errorRequest(client, request, new RequestContentLengthMismatchError()) return false } @@ -920,7 +898,7 @@ function writeH1 (client, request) { return } - errorRequest(client, request, err || new RequestAbortedError()) + util.errorRequest(client, request, err || new RequestAbortedError()) util.destroy(body) util.destroy(socket, new InformationalError('aborted')) @@ -929,7 +907,7 @@ function writeH1 (client, request) { try { request.onConnect(abort) } catch (err) { - errorRequest(client, request, err) + util.errorRequest(client, request, err) } if (request.aborted) { diff --git a/lib/dispatcher/client-h2.js b/lib/dispatcher/client-h2.js index c0f3d7f5d21..d4d234ce895 100644 --- a/lib/dispatcher/client-h2.js +++ b/lib/dispatcher/client-h2.js @@ -22,7 +22,6 @@ const { kSocket, kStrictContentLength, kOnError, - // HTTP2 kMaxConcurrentStreams, kHTTP2Session, kResume @@ -92,16 +91,18 @@ async function connectH2 (client, socket) { session[kOpenStreams] = 0 session[kClient] = client session[kSocket] = socket - session.on('error', onHttp2SessionError) - session.on('frameError', onHttp2FrameError) - session.on('end', onHttp2SessionEnd) - session.on('goaway', onHTTP2GoAway) - session.on('close', function () { + + util.addListener(session, 'error', onHttp2SessionError) + util.addListener(session, 'frameError', onHttp2FrameError) + util.addListener(session, 'end', onHttp2SessionEnd) + util.addListener(session, 'goaway', onHTTP2GoAway) + util.addListener(session, 'close', function () { const { [kClient]: client } = this - const err = this[kError] || new SocketError('closed', util.getSocketInfo(this)) + const err = this[kSocket][kError] || new SocketError('closed', util.getSocketInfo(this)) client[kSocket] = null + client[kHTTP2Session] = null assert(client[kPending] === 0) @@ -109,7 +110,7 @@ async function connectH2 (client, socket) { const requests = client[kQueue].splice(client[kRunningIdx]) for (let i = 0; i < requests.length; i++) { const request = requests[i] - errorRequest(client, request, err) + util.errorRequest(client, request, err) } client[kPendingIdx] = client[kRunningIdx] @@ -120,19 +121,21 @@ async function connectH2 (client, socket) { client[kResume]() }) + session.unref() client[kHTTP2Session] = session socket[kHTTP2Session] = session - socket.on('error', function (err) { + util.addListener(socket, 'error', function (err) { assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') this[kError] = err this[kClient][kOnError](err) }) - socket.on('end', function () { + + util.addListener(socket, 'end', function () { util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this))) }) @@ -172,67 +175,42 @@ function onHttp2SessionError (err) { assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') this[kSocket][kError] = err - this[kClient][kOnError](err) } function onHttp2FrameError (type, code, id) { - const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`) - if (id === 0) { + const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`) this[kSocket][kError] = err this[kClient][kOnError](err) } } function onHttp2SessionEnd () { - this.destroy(new SocketError('other side closed')) - util.destroy(this[kSocket], new SocketError('other side closed')) + const err = new SocketError('other side closed', util.getSocketInfo(this[kSocket])) + this.destroy(err) + util.destroy(this[kSocket], err) } +/** + * This is the root cause of #3011 + * We need to handle GOAWAY frames properly, and trigger the session close + * along with the socket right away + * Find a way to trigger the close cycle from here on. + */ function onHTTP2GoAway (code) { - const client = this[kClient] const err = new InformationalError(`HTTP/2: "GOAWAY" frame received with code ${code}`) - client[kSocket] = null - client[kHTTP2Session] = null - - if (client.destroyed) { - assert(this[kPending] === 0) - - // Fail entire queue. - const requests = client[kQueue].splice(client[kRunningIdx]) - for (let i = 0; i < requests.length; i++) { - const request = requests[i] - errorRequest(this, request, err) - } - } else if (client[kRunning] > 0) { - // Fail head of pipeline. - const request = client[kQueue][client[kRunningIdx]] - client[kQueue][client[kRunningIdx]++] = null - errorRequest(client, request, err) - } - - client[kPendingIdx] = client[kRunningIdx] - - assert(client[kRunning] === 0) - - client.emit('disconnect', - client[kUrl], - [client], - err - ) - - client[kResume]() -} + // We need to trigger the close cycle right away + // We need to destroy the session and the socket + // Requests should be failed with the error after the current one is handled + this[kSocket][kError] = err + this[kClient][kOnError](err) -function errorRequest (client, request, err) { - try { - request.onError(err) - assert(request.aborted) - } catch (err) { - client.emit('error', err) - } + this.unref() + // We send the GOAWAY frame response as no error + this.destroy() + util.destroy(this[kSocket], err) } // https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2 @@ -245,7 +223,8 @@ function writeH2 (client, request) { const { body, method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request if (upgrade) { - errorRequest(client, request, new Error('Upgrade not supported for H2')) + util.errorRequest(client, request, new Error('Upgrade not supported for H2')) + return false } if (request.aborted) { @@ -297,10 +276,10 @@ function writeH2 (client, request) { } } - errorRequest(client, request, err) + util.errorRequest(client, request, err) }) } catch (err) { - errorRequest(client, request, err) + util.errorRequest(client, request, err) } if (method === 'CONNECT') { @@ -375,7 +354,7 @@ function writeH2 (client, request) { // A user agent may send a Content-Length header with 0 value, this should be allowed. if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength != null && request.contentLength !== contentLength) { if (client[kStrictContentLength]) { - errorRequest(client, request, new RequestContentLengthMismatchError()) + util.errorRequest(client, request, new RequestContentLengthMismatchError()) return false } @@ -417,7 +396,7 @@ function writeH2 (client, request) { // as there's no value to keep it open. if (request.aborted || request.completed) { const err = new RequestAbortedError() - errorRequest(client, request, err) + util.errorRequest(client, request, err) util.destroy(stream, err) return } @@ -451,13 +430,12 @@ function writeH2 (client, request) { } const err = new InformationalError('HTTP/2: stream half-closed (remote)') - errorRequest(client, request, err) + util.errorRequest(client, request, err) util.destroy(stream, err) }) stream.once('close', () => { session[kOpenStreams] -= 1 - // TODO(HTTP/2): unref only if current streams count is 0 if (session[kOpenStreams] === 0) { session.unref() } @@ -466,13 +444,14 @@ function writeH2 (client, request) { stream.once('error', function (err) { if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) { session[kOpenStreams] -= 1 + util.errorRequest(client, request, err) util.destroy(stream, err) } }) stream.once('frameError', (type, code) => { const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`) - errorRequest(client, request, err) + util.errorRequest(client, request, err) if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) { session[kOpenStreams] -= 1 diff --git a/lib/dispatcher/client.js b/lib/dispatcher/client.js index 60e68135549..a8be043a92f 100644 --- a/lib/dispatcher/client.js +++ b/lib/dispatcher/client.js @@ -338,7 +338,7 @@ class Client extends DispatcherBase { const requests = this[kQueue].splice(this[kPendingIdx]) for (let i = 0; i < requests.length; i++) { const request = requests[i] - errorRequest(this, request, err) + util.errorRequest(this, request, err) } const callback = () => { @@ -378,7 +378,7 @@ function onError (client, err) { const requests = client[kQueue].splice(client[kRunningIdx]) for (let i = 0; i < requests.length; i++) { const request = requests[i] - errorRequest(client, request, err) + util.errorRequest(client, request, err) } assert(client[kSize] === 0) } @@ -502,7 +502,7 @@ async function connect (client) { assert(client[kRunning] === 0) while (client[kPending] > 0 && client[kQueue][client[kPendingIdx]].servername === client[kServerName]) { const request = client[kQueue][client[kPendingIdx]++] - errorRequest(client, request, err) + util.errorRequest(client, request, err) } } else { onError(client, err) @@ -609,13 +609,4 @@ function _resume (client, sync) { } } -function errorRequest (client, request, err) { - try { - request.onError(err) - assert(request.aborted) - } catch (err) { - client.emit('error', err) - } -} - module.exports = Client diff --git a/test/http2.js b/test/http2.js index a4ecc1ac3e8..c6f9d01cb91 100644 --- a/test/http2.js +++ b/test/http2.js @@ -1381,3 +1381,64 @@ test('#2364 - Concurrent aborts', async t => { await t.completed }) + +test('#3046 - GOAWAY Frame', { only: true }, async t => { + const server = createSecureServer(pem) + + server.on('stream', (stream, headers) => { + setTimeout(() => { + if (stream.closed) return + stream.end('Hello World') + }, 100) + + stream.respond({ + 'content-type': 'text/plain; charset=utf-8', + 'x-custom-h2': 'hello', + ':status': 200 + }) + }) + + server.on('session', session => { + setTimeout(() => { + session.goaway() + }, 50) + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + + t = tspl(t, { plan: 7 }) + after(() => server.close()) + after(() => client.close()) + + client.on('disconnect', (url, disconnectClient, err) => { + t.ok(url instanceof URL) + t.deepStrictEqual(disconnectClient, [client]) + t.strictEqual(err.message, 'HTTP/2: "GOAWAY" frame received with code 0') + }) + + client.request({ + path: '/', + method: 'GET', + headers: { + 'x-my-header': 'foo' + } + }, (err, response) => { + t.ifError(err) + t.strictEqual(response.headers['content-type'], 'text/plain; charset=utf-8') + t.strictEqual(response.headers['x-custom-h2'], 'hello') + t.strictEqual(response.statusCode, 200) + // We stop the sent the GOAWAY frame before the body is sent, as we received the GOAWAY frame + // before the DATA one, the body will be empty + response.body.dump() + }) + + await t.completed +})