Skip to content
This repository has been archived by the owner on Jul 16, 2024. It is now read-only.

Commit

Permalink
respond with 503 on server induced interrupt (#365)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart authored Dec 17, 2023
1 parent e2c545a commit 3351136
Show file tree
Hide file tree
Showing 11 changed files with 192 additions and 59 deletions.
7 changes: 7 additions & 0 deletions .changeset/wise-ants-relate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@effect/platform-node": patch
"@effect/platform-bun": patch
"@effect/platform": patch
---

respond with 503 on server induced interrupt
36 changes: 36 additions & 0 deletions docs/platform/Http/ServerError.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ Added in v1.0.0
- [RouteNotFound (interface)](#routenotfound-interface)
- [ServeError](#serveerror)
- [ServeError (interface)](#serveerror-interface)
- [predicates](#predicates)
- [isServerError](#isservererror)
- [type id](#type-id)
- [TypeId](#typeid)
- [TypeId (type alias)](#typeid-type-alias)
- [utils](#utils)
- [HttpError (namespace)](#httperror-namespace)
- [Proto (interface)](#proto-interface)
- [ProvidedFields (type alias)](#providedfields-type-alias)
- [clientAbortFiberId](#clientabortfiberid)
- [isClientAbortCause](#isclientabortcause)

---

Expand Down Expand Up @@ -141,6 +145,18 @@ export interface ServeError extends HttpError.Proto {

Added in v1.0.0

# predicates

## isServerError

**Signature**

```ts
export declare const isServerError: (u: unknown) => u is HttpServerError
```
Added in v1.0.0
# type id
## TypeId
Expand Down Expand Up @@ -191,3 +207,23 @@ export type ProvidedFields = TypeId | "_tag" | keyof Data.Case
```
Added in v1.0.0
## clientAbortFiberId
**Signature**
```ts
export declare const clientAbortFiberId: FiberId.FiberId
```
Added in v1.0.0
## isClientAbortCause
**Signature**
```ts
export declare const isClientAbortCause: <E>(cause: Cause.Cause<E>) => boolean
```
Added in v1.0.0
1 change: 1 addition & 0 deletions docs/platform/Http/ServerRequest.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ Added in v1.0.0
```ts
export interface ServerRequest extends IncomingMessage.IncomingMessage<Error.RequestError> {
readonly [TypeId]: TypeId
readonly source: unknown
readonly url: string
readonly originalUrl: string
readonly method: Method
Expand Down
17 changes: 11 additions & 6 deletions packages/platform-bun/src/internal/http/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ export const make = (
: App.withDefaultMiddleware(respond(httpApp))) as App.Default<never, unknown>

return pipe(
Effect.all([Effect.runtime<never>(), Effect.fiberId]),
Effect.flatMap(([runtime, fiberId]) =>
Effect.runtime<never>(),
Effect.flatMap((runtime) =>
Effect.async<never, never, never>((_) => {
const runFork = Runtime.runFork(runtime)
function handler(request: Request, _server: BunServer) {
Expand All @@ -66,7 +66,7 @@ export const make = (
new ServerRequestImpl(request, resolve, reject, removeHost(request.url))
))
request.signal.addEventListener("abort", () => {
runFork(fiber.interruptAsFork(fiberId))
runFork(fiber.interruptAsFork(Error.clientAbortFiberId))
})
})
}
Expand Down Expand Up @@ -138,12 +138,17 @@ const respond = Middleware.make((httpApp) =>
),
(exit) =>
Effect.sync(() => {
const impl = request as ServerRequestImpl
if (exit._tag === "Success") {
;(request as ServerRequestImpl).resolve(makeResponse(request, exit.value))
impl.resolve(makeResponse(request, exit.value))
} else if (Cause.isInterruptedOnly(exit.cause)) {
;(request as ServerRequestImpl).resolve(new Response(undefined, { status: 499 }))
impl.resolve(
new Response(undefined, {
status: impl.source.signal.aborted ? 499 : 503
})
)
} else {
;(request as ServerRequestImpl).reject(Cause.pretty(exit.cause))
impl.reject(Cause.pretty(exit.cause))
}
})
)
Expand Down
85 changes: 45 additions & 40 deletions packages/platform-node/src/internal/http/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,29 +91,6 @@ export const make = (
)
)

const respond = Middleware.make((httpApp) =>
Effect.flatMap(ServerRequest.ServerRequest, (request) =>
Effect.tapErrorCause(
Effect.tap(
Effect.flatMap(
httpApp,
(response) => Effect.flatMap(App.preResponseHandler, (f) => f(request, response))
),
(response) => handleResponse(request, response)
),
(cause) =>
Effect.sync(() => {
const nodeResponse = (request as ServerRequestImpl).response
if (!nodeResponse.headersSent) {
nodeResponse.writeHead(Cause.isInterruptedOnly(cause) ? 499 : 500)
}
if (!nodeResponse.writableEnded) {
nodeResponse.end()
}
})
))
)

/** @internal */
export const makeHandler: {
<R, E>(httpApp: App.Default<R, E>): Effect.Effect<
Expand All @@ -133,28 +110,56 @@ export const makeHandler: {
const handledApp = middleware
? middleware(App.withDefaultMiddleware(respond(httpApp)))
: App.withDefaultMiddleware(respond(httpApp))
return Effect.map(
Effect.zip(Effect.runtime<R>(), Effect.fiberId),
([runtime, fiberId]) => {
const runFork = Runtime.runFork(runtime)
return function handler(nodeRequest: Http.IncomingMessage, nodeResponse: Http.ServerResponse) {
const fiber = runFork(
Effect.provideService(
handledApp,
ServerRequest.ServerRequest,
new ServerRequestImpl(nodeRequest, nodeResponse)
)
return Effect.map(Effect.runtime<R>(), (runtime) => {
const runFork = Runtime.runFork(runtime)
return function handler(nodeRequest: Http.IncomingMessage, nodeResponse: Http.ServerResponse) {
const fiber = runFork(
Effect.provideService(
handledApp,
ServerRequest.ServerRequest,
new ServerRequestImpl(nodeRequest, nodeResponse)
)
nodeResponse.on("close", () => {
if (!nodeResponse.writableEnded) {
runFork(fiber.interruptAsFork(fiberId))
)
nodeResponse.on("close", () => {
if (!nodeResponse.writableEnded) {
if (!nodeResponse.headersSent) {
nodeResponse.writeHead(499)
}
})
}
nodeResponse.end()
runFork(fiber.interruptAsFork(Error.clientAbortFiberId))
}
})
}
)
})
}

const respond = Middleware.make((httpApp) =>
Effect.uninterruptibleMask((restore) =>
Effect.flatMap(ServerRequest.ServerRequest, (request) =>
Effect.tapErrorCause(
restore(
Effect.tap(
Effect.flatMap(
httpApp,
(response) => Effect.flatMap(App.preResponseHandler, (f) => f(request, response))
),
(response) => handleResponse(request, response)
)
),
(cause) =>
Effect.sync(() => {
const nodeResponse = (request as ServerRequestImpl).response
if (!nodeResponse.headersSent) {
nodeResponse.writeHead(Cause.isInterruptedOnly(cause) ? 503 : 500)
}
if (!nodeResponse.writableEnded) {
nodeResponse.end()
}
})
))
)
)

class ServerRequestImpl extends IncomingMessageImpl<Error.RequestError> implements ServerRequest.ServerRequest {
readonly [ServerRequest.TypeId]: ServerRequest.TypeId

Expand Down
19 changes: 19 additions & 0 deletions packages/platform-node/test/HttpServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ import * as Platform from "@effect/platform-node/Http/Platform"
import * as HttpC from "@effect/platform-node/HttpClient"
import * as Http from "@effect/platform-node/HttpServer"
import * as NodeContext from "@effect/platform-node/NodeContext"
import * as ServerError from "@effect/platform/Http/ServerError"
import type { ServerResponse } from "@effect/platform/Http/ServerResponse"
import * as Schema from "@effect/schema/Schema"
import { Deferred, Fiber } from "effect"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"
import * as Option from "effect/Option"
Expand Down Expand Up @@ -363,4 +366,20 @@ describe("HttpServer", () => {
)
expect((body as any).parent.value.spanId).toEqual(requestSpan.spanId)
}).pipe(Effect.scoped, runPromise))

it("client abort", () =>
Effect.gen(function*(_) {
const latch = yield* _(Deferred.make<never, ServerResponse>())
yield* _(
Http.response.empty(),
Effect.delay(1000),
Http.server.serveEffect((app) => Effect.onExit(app, (exit) => Deferred.complete(latch, exit)))
)
const client = yield* _(makeClient)
const fiber = yield* _(client(HttpC.request.get("/")), Effect.fork)
yield* _(Effect.sleep(100))
yield* _(Fiber.interrupt(fiber))
const cause = yield* _(Deferred.await(latch), Effect.sandbox, Effect.flip)
expect(ServerError.isClientAbortCause(cause)).toEqual(true)
}).pipe(Effect.scoped, runPromise))
})
32 changes: 23 additions & 9 deletions packages/platform/src/Http/App.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/**
* @since 1.0.0
*/
import * as Cause from "effect/Cause"
import * as Effect from "effect/Effect"
import * as Exit from "effect/Exit"
import * as FiberRef from "effect/FiberRef"
Expand All @@ -11,7 +12,7 @@ import * as ReadonlyArray from "effect/ReadonlyArray"
import * as Runtime from "effect/Runtime"
import * as Scope from "effect/Scope"
import * as internalMiddleware from "../internal/http/middleware.js"
import type * as ServerError from "./ServerError.js"
import * as ServerError from "./ServerError.js"
import * as ServerRequest from "./ServerRequest.js"
import * as ServerResponse from "./ServerResponse.js"

Expand Down Expand Up @@ -107,16 +108,29 @@ export const withPreResponseHandler = dual<
* @category conversions
*/
export const toWebHandlerRuntime = <R>(runtime: Runtime.Runtime<R>) => {
const run = Runtime.runPromise(runtime)
const run = Runtime.runFork(runtime)
return <E>(self: Default<R, E>) => {
self = withDefaultMiddleware(self)
return (request: Request): Promise<Response> => {
const req = ServerRequest.fromWeb(request)
return run(Effect.map(
Effect.provideService(self, ServerRequest.ServerRequest, req),
(res) => ServerResponse.toWeb(res, req.method === "HEAD")
))
}
return (request: Request): Promise<Response> =>
new Promise((resolve, reject) => {
const req = ServerRequest.fromWeb(request)
const fiber = run(Effect.map(
Effect.provideService(self, ServerRequest.ServerRequest, req),
(res) => ServerResponse.toWeb(res, req.method === "HEAD")
))
request.signal.addEventListener("abort", () => {
Effect.runFork(fiber.interruptAsFork(ServerError.clientAbortFiberId))
})
fiber.addObserver((exit) => {
if (Exit.isSuccess(exit)) {
resolve(exit.value)
} else if (Cause.isInterruptedOnly(exit.cause)) {
resolve(new Response(null, { status: request.signal.aborted ? 499 : 503 }))
} else {
reject(Cause.pretty(exit.cause))
}
})
})
}
}

Expand Down
18 changes: 18 additions & 0 deletions packages/platform/src/Http/ServerError.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
/**
* @since 1.0.0
*/
import type * as Cause from "effect/Cause"
import type * as Data from "effect/Data"
import type * as FiberId from "effect/FiberId"
import * as internal from "../internal/http/serverError.js"
import type * as ServerRequest from "./ServerRequest.js"
import type * as ServerResponse from "./ServerResponse.js"
Expand Down Expand Up @@ -54,6 +56,12 @@ export interface RequestError extends HttpError.Proto {
readonly error: unknown
}

/**
* @since 1.0.0
* @category predicates
*/
export const isServerError: (u: unknown) => u is HttpServerError = internal.isServerError

/**
* @since 1.0.0
* @category error
Expand Down Expand Up @@ -109,3 +117,13 @@ export interface ServeError extends HttpError.Proto {
* @category error
*/
export const ServeError: (props: Omit<ServeError, HttpError.ProvidedFields>) => ServeError = internal.serveError

/**
* @since 1.0.0
*/
export const clientAbortFiberId: FiberId.FiberId = internal.clientAbortFiberId

/**
* @since 1.0.0
*/
export const isClientAbortCause: <E>(cause: Cause.Cause<E>) => boolean = internal.isClientAbortCause
1 change: 1 addition & 0 deletions packages/platform/src/Http/ServerRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export type TypeId = typeof TypeId
*/
export interface ServerRequest extends IncomingMessage.IncomingMessage<Error.RequestError> {
readonly [TypeId]: TypeId
readonly source: unknown
readonly url: string
readonly originalUrl: string
readonly method: Method
Expand Down
13 changes: 9 additions & 4 deletions packages/platform/src/internal/http/middleware.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import * as Cause from "effect/Cause"
import * as Effect from "effect/Effect"
import * as FiberRef from "effect/FiberRef"
import * as Function from "effect/Function"
import { constFalse, dual } from "effect/Function"
import { globalValue } from "effect/GlobalValue"
import type * as Predicate from "effect/Predicate"
import * as Headers from "../../Http/Headers.js"
import * as IncomingMessage from "../../Http/IncomingMessage.js"
import type * as Middleware from "../../Http/Middleware.js"
import * as ServerError from "../../Http/ServerError.js"
import * as ServerRequest from "../../Http/ServerRequest.js"

/** @internal */
Expand All @@ -28,11 +29,11 @@ export const withLoggerDisabled = <R, E, A>(self: Effect.Effect<R, E, A>): Effec
/** @internal */
export const currentTracerDisabledWhen = globalValue(
Symbol.for("@effect/platform/Http/Middleware/tracerDisabledWhen"),
() => FiberRef.unsafeMake<Predicate.Predicate<ServerRequest.ServerRequest>>(Function.constFalse)
() => FiberRef.unsafeMake<Predicate.Predicate<ServerRequest.ServerRequest>>(constFalse)
)

/** @internal */
export const withTracerDisabledWhen = Function.dual<
export const withTracerDisabledWhen = dual<
(
predicate: Predicate.Predicate<ServerRequest.ServerRequest>
) => <R, E, A>(effect: Effect.Effect<R, E, A>) => Effect.Effect<R, E, A>,
Expand Down Expand Up @@ -60,7 +61,11 @@ export const logger = make((httpApp) => {
Effect.annotateLogs(Effect.log(exit.cause), {
"http.method": request.method,
"http.url": request.url,
"http.status": Cause.isInterruptedOnly(exit.cause) ? 499 : 500
"http.status": Cause.isInterruptedOnly(exit.cause)
? ServerError.isClientAbortCause(exit.cause)
? 499
: 503
: 500
}) :
Effect.annotateLogs(Effect.log(""), {
"http.method": request.method,
Expand Down
Loading

0 comments on commit 3351136

Please sign in to comment.