From 335113601c238104eb2e331d26b5e463bde80dff Mon Sep 17 00:00:00 2001 From: Tim Date: Mon, 18 Dec 2023 09:53:07 +1300 Subject: [PATCH] respond with 503 on server induced interrupt (#365) --- .changeset/wise-ants-relate.md | 7 ++ docs/platform/Http/ServerError.ts.md | 36 ++++++++ docs/platform/Http/ServerRequest.ts.md | 1 + .../platform-bun/src/internal/http/server.ts | 17 ++-- .../platform-node/src/internal/http/server.ts | 85 ++++++++++--------- .../platform-node/test/HttpServer.test.ts | 19 +++++ packages/platform/src/Http/App.ts | 32 +++++-- packages/platform/src/Http/ServerError.ts | 18 ++++ packages/platform/src/Http/ServerRequest.ts | 1 + .../platform/src/internal/http/middleware.ts | 13 ++- .../platform/src/internal/http/serverError.ts | 22 +++++ 11 files changed, 192 insertions(+), 59 deletions(-) create mode 100644 .changeset/wise-ants-relate.md diff --git a/.changeset/wise-ants-relate.md b/.changeset/wise-ants-relate.md new file mode 100644 index 00000000..30189902 --- /dev/null +++ b/.changeset/wise-ants-relate.md @@ -0,0 +1,7 @@ +--- +"@effect/platform-node": patch +"@effect/platform-bun": patch +"@effect/platform": patch +--- + +respond with 503 on server induced interrupt diff --git a/docs/platform/Http/ServerError.ts.md b/docs/platform/Http/ServerError.ts.md index 3d7b5375..647fd91c 100644 --- a/docs/platform/Http/ServerError.ts.md +++ b/docs/platform/Http/ServerError.ts.md @@ -22,6 +22,8 @@ Added in v1.0.0 - [RouteNotFound (interface)](#routenotfound-interface) - [ServeError](#serveerror) - [ServeError (interface)](#serveerror-interface) +- [predicates](#predicates) + - [isServerError](#isservererror) - [type id](#type-id) - [TypeId](#typeid) - [TypeId (type alias)](#typeid-type-alias) @@ -29,6 +31,8 @@ Added in v1.0.0 - [HttpError (namespace)](#httperror-namespace) - [Proto (interface)](#proto-interface) - [ProvidedFields (type alias)](#providedfields-type-alias) + - [clientAbortFiberId](#clientabortfiberid) + - [isClientAbortCause](#isclientabortcause) --- @@ -141,6 +145,18 @@ export interface ServeError extends HttpError.Proto { Added in v1.0.0 +# predicates + +## isServerError + +**Signature** + +```ts +export declare const isServerError: (u: unknown) => u is HttpServerError +``` + +Added in v1.0.0 + # type id ## TypeId @@ -191,3 +207,23 @@ export type ProvidedFields = TypeId | "_tag" | keyof Data.Case ``` Added in v1.0.0 + +## clientAbortFiberId + +**Signature** + +```ts +export declare const clientAbortFiberId: FiberId.FiberId +``` + +Added in v1.0.0 + +## isClientAbortCause + +**Signature** + +```ts +export declare const isClientAbortCause: (cause: Cause.Cause) => boolean +``` + +Added in v1.0.0 diff --git a/docs/platform/Http/ServerRequest.ts.md b/docs/platform/Http/ServerRequest.ts.md index 9064eab3..ee2e8b28 100644 --- a/docs/platform/Http/ServerRequest.ts.md +++ b/docs/platform/Http/ServerRequest.ts.md @@ -95,6 +95,7 @@ Added in v1.0.0 ```ts export interface ServerRequest extends IncomingMessage.IncomingMessage { readonly [TypeId]: TypeId + readonly source: unknown readonly url: string readonly originalUrl: string readonly method: Method diff --git a/packages/platform-bun/src/internal/http/server.ts b/packages/platform-bun/src/internal/http/server.ts index b2de9920..3278cc40 100644 --- a/packages/platform-bun/src/internal/http/server.ts +++ b/packages/platform-bun/src/internal/http/server.ts @@ -54,8 +54,8 @@ export const make = ( : App.withDefaultMiddleware(respond(httpApp))) as App.Default return pipe( - Effect.all([Effect.runtime(), Effect.fiberId]), - Effect.flatMap(([runtime, fiberId]) => + Effect.runtime(), + Effect.flatMap((runtime) => Effect.async((_) => { const runFork = Runtime.runFork(runtime) function handler(request: Request, _server: BunServer) { @@ -66,7 +66,7 @@ export const make = ( new ServerRequestImpl(request, resolve, reject, removeHost(request.url)) )) request.signal.addEventListener("abort", () => { - runFork(fiber.interruptAsFork(fiberId)) + runFork(fiber.interruptAsFork(Error.clientAbortFiberId)) }) }) } @@ -138,12 +138,17 @@ const respond = Middleware.make((httpApp) => ), (exit) => Effect.sync(() => { + const impl = request as ServerRequestImpl if (exit._tag === "Success") { - ;(request as ServerRequestImpl).resolve(makeResponse(request, exit.value)) + impl.resolve(makeResponse(request, exit.value)) } else if (Cause.isInterruptedOnly(exit.cause)) { - ;(request as ServerRequestImpl).resolve(new Response(undefined, { status: 499 })) + impl.resolve( + new Response(undefined, { + status: impl.source.signal.aborted ? 499 : 503 + }) + ) } else { - ;(request as ServerRequestImpl).reject(Cause.pretty(exit.cause)) + impl.reject(Cause.pretty(exit.cause)) } }) ) diff --git a/packages/platform-node/src/internal/http/server.ts b/packages/platform-node/src/internal/http/server.ts index 27e870dd..17a907b8 100644 --- a/packages/platform-node/src/internal/http/server.ts +++ b/packages/platform-node/src/internal/http/server.ts @@ -91,29 +91,6 @@ export const make = ( ) ) -const respond = Middleware.make((httpApp) => - Effect.flatMap(ServerRequest.ServerRequest, (request) => - Effect.tapErrorCause( - Effect.tap( - Effect.flatMap( - httpApp, - (response) => Effect.flatMap(App.preResponseHandler, (f) => f(request, response)) - ), - (response) => handleResponse(request, response) - ), - (cause) => - Effect.sync(() => { - const nodeResponse = (request as ServerRequestImpl).response - if (!nodeResponse.headersSent) { - nodeResponse.writeHead(Cause.isInterruptedOnly(cause) ? 499 : 500) - } - if (!nodeResponse.writableEnded) { - nodeResponse.end() - } - }) - )) -) - /** @internal */ export const makeHandler: { (httpApp: App.Default): Effect.Effect< @@ -133,28 +110,56 @@ export const makeHandler: { const handledApp = middleware ? middleware(App.withDefaultMiddleware(respond(httpApp))) : App.withDefaultMiddleware(respond(httpApp)) - return Effect.map( - Effect.zip(Effect.runtime(), Effect.fiberId), - ([runtime, fiberId]) => { - const runFork = Runtime.runFork(runtime) - return function handler(nodeRequest: Http.IncomingMessage, nodeResponse: Http.ServerResponse) { - const fiber = runFork( - Effect.provideService( - handledApp, - ServerRequest.ServerRequest, - new ServerRequestImpl(nodeRequest, nodeResponse) - ) + return Effect.map(Effect.runtime(), (runtime) => { + const runFork = Runtime.runFork(runtime) + return function handler(nodeRequest: Http.IncomingMessage, nodeResponse: Http.ServerResponse) { + const fiber = runFork( + Effect.provideService( + handledApp, + ServerRequest.ServerRequest, + new ServerRequestImpl(nodeRequest, nodeResponse) ) - nodeResponse.on("close", () => { - if (!nodeResponse.writableEnded) { - runFork(fiber.interruptAsFork(fiberId)) + ) + nodeResponse.on("close", () => { + if (!nodeResponse.writableEnded) { + if (!nodeResponse.headersSent) { + nodeResponse.writeHead(499) } - }) - } + nodeResponse.end() + runFork(fiber.interruptAsFork(Error.clientAbortFiberId)) + } + }) } - ) + }) } +const respond = Middleware.make((httpApp) => + Effect.uninterruptibleMask((restore) => + Effect.flatMap(ServerRequest.ServerRequest, (request) => + Effect.tapErrorCause( + restore( + Effect.tap( + Effect.flatMap( + httpApp, + (response) => Effect.flatMap(App.preResponseHandler, (f) => f(request, response)) + ), + (response) => handleResponse(request, response) + ) + ), + (cause) => + Effect.sync(() => { + const nodeResponse = (request as ServerRequestImpl).response + if (!nodeResponse.headersSent) { + nodeResponse.writeHead(Cause.isInterruptedOnly(cause) ? 503 : 500) + } + if (!nodeResponse.writableEnded) { + nodeResponse.end() + } + }) + )) + ) +) + class ServerRequestImpl extends IncomingMessageImpl implements ServerRequest.ServerRequest { readonly [ServerRequest.TypeId]: ServerRequest.TypeId diff --git a/packages/platform-node/test/HttpServer.test.ts b/packages/platform-node/test/HttpServer.test.ts index 123e923d..2b4b3560 100644 --- a/packages/platform-node/test/HttpServer.test.ts +++ b/packages/platform-node/test/HttpServer.test.ts @@ -3,7 +3,10 @@ import * as Platform from "@effect/platform-node/Http/Platform" import * as HttpC from "@effect/platform-node/HttpClient" import * as Http from "@effect/platform-node/HttpServer" import * as NodeContext from "@effect/platform-node/NodeContext" +import * as ServerError from "@effect/platform/Http/ServerError" +import type { ServerResponse } from "@effect/platform/Http/ServerResponse" import * as Schema from "@effect/schema/Schema" +import { Deferred, Fiber } from "effect" import * as Effect from "effect/Effect" import * as Layer from "effect/Layer" import * as Option from "effect/Option" @@ -363,4 +366,20 @@ describe("HttpServer", () => { ) expect((body as any).parent.value.spanId).toEqual(requestSpan.spanId) }).pipe(Effect.scoped, runPromise)) + + it("client abort", () => + Effect.gen(function*(_) { + const latch = yield* _(Deferred.make()) + yield* _( + Http.response.empty(), + Effect.delay(1000), + Http.server.serveEffect((app) => Effect.onExit(app, (exit) => Deferred.complete(latch, exit))) + ) + const client = yield* _(makeClient) + const fiber = yield* _(client(HttpC.request.get("/")), Effect.fork) + yield* _(Effect.sleep(100)) + yield* _(Fiber.interrupt(fiber)) + const cause = yield* _(Deferred.await(latch), Effect.sandbox, Effect.flip) + expect(ServerError.isClientAbortCause(cause)).toEqual(true) + }).pipe(Effect.scoped, runPromise)) }) diff --git a/packages/platform/src/Http/App.ts b/packages/platform/src/Http/App.ts index 8b986c84..9a1d36d0 100644 --- a/packages/platform/src/Http/App.ts +++ b/packages/platform/src/Http/App.ts @@ -1,6 +1,7 @@ /** * @since 1.0.0 */ +import * as Cause from "effect/Cause" import * as Effect from "effect/Effect" import * as Exit from "effect/Exit" import * as FiberRef from "effect/FiberRef" @@ -11,7 +12,7 @@ import * as ReadonlyArray from "effect/ReadonlyArray" import * as Runtime from "effect/Runtime" import * as Scope from "effect/Scope" import * as internalMiddleware from "../internal/http/middleware.js" -import type * as ServerError from "./ServerError.js" +import * as ServerError from "./ServerError.js" import * as ServerRequest from "./ServerRequest.js" import * as ServerResponse from "./ServerResponse.js" @@ -107,16 +108,29 @@ export const withPreResponseHandler = dual< * @category conversions */ export const toWebHandlerRuntime = (runtime: Runtime.Runtime) => { - const run = Runtime.runPromise(runtime) + const run = Runtime.runFork(runtime) return (self: Default) => { self = withDefaultMiddleware(self) - return (request: Request): Promise => { - const req = ServerRequest.fromWeb(request) - return run(Effect.map( - Effect.provideService(self, ServerRequest.ServerRequest, req), - (res) => ServerResponse.toWeb(res, req.method === "HEAD") - )) - } + return (request: Request): Promise => + new Promise((resolve, reject) => { + const req = ServerRequest.fromWeb(request) + const fiber = run(Effect.map( + Effect.provideService(self, ServerRequest.ServerRequest, req), + (res) => ServerResponse.toWeb(res, req.method === "HEAD") + )) + request.signal.addEventListener("abort", () => { + Effect.runFork(fiber.interruptAsFork(ServerError.clientAbortFiberId)) + }) + fiber.addObserver((exit) => { + if (Exit.isSuccess(exit)) { + resolve(exit.value) + } else if (Cause.isInterruptedOnly(exit.cause)) { + resolve(new Response(null, { status: request.signal.aborted ? 499 : 503 })) + } else { + reject(Cause.pretty(exit.cause)) + } + }) + }) } } diff --git a/packages/platform/src/Http/ServerError.ts b/packages/platform/src/Http/ServerError.ts index b5c77147..2f51a175 100644 --- a/packages/platform/src/Http/ServerError.ts +++ b/packages/platform/src/Http/ServerError.ts @@ -1,7 +1,9 @@ /** * @since 1.0.0 */ +import type * as Cause from "effect/Cause" import type * as Data from "effect/Data" +import type * as FiberId from "effect/FiberId" import * as internal from "../internal/http/serverError.js" import type * as ServerRequest from "./ServerRequest.js" import type * as ServerResponse from "./ServerResponse.js" @@ -54,6 +56,12 @@ export interface RequestError extends HttpError.Proto { readonly error: unknown } +/** + * @since 1.0.0 + * @category predicates + */ +export const isServerError: (u: unknown) => u is HttpServerError = internal.isServerError + /** * @since 1.0.0 * @category error @@ -109,3 +117,13 @@ export interface ServeError extends HttpError.Proto { * @category error */ export const ServeError: (props: Omit) => ServeError = internal.serveError + +/** + * @since 1.0.0 + */ +export const clientAbortFiberId: FiberId.FiberId = internal.clientAbortFiberId + +/** + * @since 1.0.0 + */ +export const isClientAbortCause: (cause: Cause.Cause) => boolean = internal.isClientAbortCause diff --git a/packages/platform/src/Http/ServerRequest.ts b/packages/platform/src/Http/ServerRequest.ts index 6a9a5d9a..c936e149 100644 --- a/packages/platform/src/Http/ServerRequest.ts +++ b/packages/platform/src/Http/ServerRequest.ts @@ -42,6 +42,7 @@ export type TypeId = typeof TypeId */ export interface ServerRequest extends IncomingMessage.IncomingMessage { readonly [TypeId]: TypeId + readonly source: unknown readonly url: string readonly originalUrl: string readonly method: Method diff --git a/packages/platform/src/internal/http/middleware.ts b/packages/platform/src/internal/http/middleware.ts index 8307211e..6053810d 100644 --- a/packages/platform/src/internal/http/middleware.ts +++ b/packages/platform/src/internal/http/middleware.ts @@ -1,12 +1,13 @@ import * as Cause from "effect/Cause" import * as Effect from "effect/Effect" import * as FiberRef from "effect/FiberRef" -import * as Function from "effect/Function" +import { constFalse, dual } from "effect/Function" import { globalValue } from "effect/GlobalValue" import type * as Predicate from "effect/Predicate" import * as Headers from "../../Http/Headers.js" import * as IncomingMessage from "../../Http/IncomingMessage.js" import type * as Middleware from "../../Http/Middleware.js" +import * as ServerError from "../../Http/ServerError.js" import * as ServerRequest from "../../Http/ServerRequest.js" /** @internal */ @@ -28,11 +29,11 @@ export const withLoggerDisabled = (self: Effect.Effect): Effec /** @internal */ export const currentTracerDisabledWhen = globalValue( Symbol.for("@effect/platform/Http/Middleware/tracerDisabledWhen"), - () => FiberRef.unsafeMake>(Function.constFalse) + () => FiberRef.unsafeMake>(constFalse) ) /** @internal */ -export const withTracerDisabledWhen = Function.dual< +export const withTracerDisabledWhen = dual< ( predicate: Predicate.Predicate ) => (effect: Effect.Effect) => Effect.Effect, @@ -60,7 +61,11 @@ export const logger = make((httpApp) => { Effect.annotateLogs(Effect.log(exit.cause), { "http.method": request.method, "http.url": request.url, - "http.status": Cause.isInterruptedOnly(exit.cause) ? 499 : 500 + "http.status": Cause.isInterruptedOnly(exit.cause) + ? ServerError.isClientAbortCause(exit.cause) + ? 499 + : 503 + : 500 }) : Effect.annotateLogs(Effect.log(""), { "http.method": request.method, diff --git a/packages/platform/src/internal/http/serverError.ts b/packages/platform/src/internal/http/serverError.ts index d147ce33..7d85f00b 100644 --- a/packages/platform/src/internal/http/serverError.ts +++ b/packages/platform/src/internal/http/serverError.ts @@ -1,4 +1,9 @@ +import * as Cause from "effect/Cause" import * as Data from "effect/Data" +import * as FiberId from "effect/FiberId" +import { globalValue } from "effect/GlobalValue" +import * as Option from "effect/Option" +import * as Predicate from "effect/Predicate" import type * as Error from "../../Http/ServerError.js" /** @internal */ @@ -13,6 +18,9 @@ const make = (tag: A["_tag"]) => (props: Omit Predicate.hasProperty(u, TypeId) + /** @internal */ export const requestError = make("RequestError") @@ -24,3 +32,17 @@ export const routeNotFound = make("RouteNotFound") /** @internal */ export const serveError = make("ServeError") + +/** @internal */ +export const clientAbortFiberId = globalValue( + "@effect/platform/Http/ServerError/clientAbortFiberId", + () => FiberId.runtime(-499, 0) +) + +/** @internal */ +export const isClientAbortCause = (cause: Cause.Cause): boolean => + Cause.reduce( + cause, + false, + (_, cause) => cause._tag === "Interrupt" && cause.fiberId === clientAbortFiberId ? Option.some(true) : Option.none() + )