Skip to content
This repository has been archived by the owner on Jul 16, 2024. It is now read-only.

Commit

Permalink
add decode option to worker runner
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Dec 15, 2023
1 parent 5ba1476 commit 9f19478
Show file tree
Hide file tree
Showing 13 changed files with 66 additions and 66 deletions.
8 changes: 8 additions & 0 deletions .changeset/new-maps-chew.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@effect/platform-browser": patch
"@effect/platform-node": patch
"@effect/platform-bun": patch
"@effect/platform": patch
---

add decode option to worker runner
2 changes: 1 addition & 1 deletion docs/platform-browser/WorkerRunner.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Added in v1.0.0
```ts
export declare const layer: <I, R, E, O>(
process: (request: I) => Stream.Stream<R, E, O>,
options?: Runner.Runner.Options<E, O> | undefined
options?: Runner.Runner.Options<I, E, O> | undefined
) => Layer.Layer<R, WorkerError, never>
```
Expand Down
2 changes: 1 addition & 1 deletion docs/platform-bun/WorkerRunner.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Added in v1.0.0
```ts
export declare const layer: <I, R, E, O>(
process: (request: I) => Stream.Stream<R, E, O>,
options?: Runner.Runner.Options<E, O> | undefined
options?: Runner.Runner.Options<I, E, O> | undefined
) => Layer.Layer<R, WorkerError, never>
```
Expand Down
2 changes: 1 addition & 1 deletion docs/platform-node/WorkerRunner.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Added in v1.0.0
```ts
export declare const layer: <I, R, E, O>(
process: (request: I) => Stream.Stream<R, E, O>,
options?: Runner.Runner.Options<E, O> | undefined
options?: Runner.Runner.Options<I, E, O> | undefined
) => Layer.Layer<R, WorkerError, never>
```
Expand Down
11 changes: 6 additions & 5 deletions docs/platform/WorkerRunner.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Added in v1.0.0
```ts
export declare const make: <I, R, E, O>(
process: (request: I) => Stream.Stream<R, E, O> | Effect.Effect<R, E, O>,
options?: Runner.Options<E, O> | undefined
options?: Runner.Options<I, E, O> | undefined
) => Effect.Effect<PlatformRunner | R | Scope.Scope, WorkerError, void>
```
Expand Down Expand Up @@ -91,7 +91,7 @@ Added in v1.0.0
```ts
export declare const layer: <I, R, E, O>(
process: (request: I) => Stream.Stream<R, E, O> | Effect.Effect<R, E, O>,
options?: Runner.Options<E, O> | undefined
options?: Runner.Options<I, E, O> | undefined
) => Layer.Layer<PlatformRunner | R, WorkerError, never>
```
Expand Down Expand Up @@ -181,9 +181,10 @@ Added in v1.0.0
**Signature**

```ts
export interface Options<E, O> {
readonly encodeOutput?: (message: O) => Effect.Effect<never, WorkerError, unknown>
readonly encodeError?: (message: E) => Effect.Effect<never, WorkerError, unknown>
export interface Options<I, E, O> {
readonly decode?: (message: unknown) => Effect.Effect<never, WorkerError, I>
readonly encodeOutput?: (request: I, message: O) => Effect.Effect<never, WorkerError, unknown>
readonly encodeError?: (request: I, error: E) => Effect.Effect<never, WorkerError, unknown>
readonly transfers?: (message: O | E) => ReadonlyArray<unknown>
}
```
Expand Down
2 changes: 1 addition & 1 deletion packages/platform-browser/src/WorkerRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export const layerPlatform: Layer.Layer<never, never, Runner.PlatformRunner> = i
*/
export const layer: <I, R, E, O>(
process: (request: I) => Stream.Stream<R, E, O>,
options?: Runner.Runner.Options<E, O> | undefined
options?: Runner.Runner.Options<I, E, O> | undefined
) => Layer.Layer<R, WorkerError, never> = internal.layer

/**
Expand Down
2 changes: 1 addition & 1 deletion packages/platform-browser/src/internal/workerRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ export const layerPlatform = Layer.succeed(Runner.PlatformRunner, platformRunner
/** @internal */
export const layer = <I, R, E, O>(
process: (request: I) => Stream.Stream<R, E, O>,
options?: Runner.Runner.Options<E, O>
options?: Runner.Runner.Options<I, E, O>
): Layer.Layer<R, WorkerError, never> => Layer.provide(Runner.layer(process, options), layerPlatform)

/** @internal */
Expand Down
2 changes: 1 addition & 1 deletion packages/platform-bun/src/WorkerRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export const layerPlatform: Layer.Layer<never, never, Runner.PlatformRunner> = i
*/
export const layer: <I, R, E, O>(
process: (request: I) => Stream.Stream<R, E, O>,
options?: Runner.Runner.Options<E, O> | undefined
options?: Runner.Runner.Options<I, E, O> | undefined
) => Layer.Layer<R, WorkerError, never> = internal.layer

/**
Expand Down
2 changes: 1 addition & 1 deletion packages/platform-bun/src/internal/workerRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export const layerPlatform = Layer.succeed(Runner.PlatformRunner, platformRunner
/** @internal */
export const layer = <I, R, E, O>(
process: (request: I) => Stream.Stream<R, E, O>,
options?: Runner.Runner.Options<E, O>
options?: Runner.Runner.Options<I, E, O>
): Layer.Layer<R, WorkerError, never> => Layer.provide(Runner.layer(process, options), layerPlatform)

/** @internal */
Expand Down
2 changes: 1 addition & 1 deletion packages/platform-node/src/WorkerRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export const layerPlatform: Layer.Layer<never, never, Runner.PlatformRunner> = i
*/
export const layer: <I, R, E, O>(
process: (request: I) => Stream.Stream<R, E, O>,
options?: Runner.Runner.Options<E, O> | undefined
options?: Runner.Runner.Options<I, E, O> | undefined
) => Layer.Layer<R, WorkerError, never> = internal.layer

/**
Expand Down
2 changes: 1 addition & 1 deletion packages/platform-node/src/internal/workerRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export const layerPlatform = Layer.succeed(Runner.PlatformRunner, platformRunner
/** @internal */
export const layer = <I, R, E, O>(
process: (request: I) => Stream.Stream<R, E, O>,
options?: Runner.Runner.Options<E, O>
options?: Runner.Runner.Options<I, E, O>
): Layer.Layer<R, WorkerError, never> => Layer.provide(Runner.layer(process, options), layerPlatform)

/** @internal */
Expand Down
11 changes: 6 additions & 5 deletions packages/platform/src/WorkerRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ export declare namespace Runner {
* @since 1.0.0
* @category models
*/
export interface Options<E, O> {
readonly encodeOutput?: (message: O) => Effect.Effect<never, WorkerError, unknown>
readonly encodeError?: (message: E) => Effect.Effect<never, WorkerError, unknown>
export interface Options<I, E, O> {
readonly decode?: (message: unknown) => Effect.Effect<never, WorkerError, I>
readonly encodeOutput?: (request: I, message: O) => Effect.Effect<never, WorkerError, unknown>
readonly encodeError?: (request: I, error: E) => Effect.Effect<never, WorkerError, unknown>
readonly transfers?: (message: O | E) => ReadonlyArray<unknown>
}
}
Expand All @@ -82,7 +83,7 @@ export declare namespace Runner {
*/
export const make: <I, R, E, O>(
process: (request: I) => Stream.Stream<R, E, O> | Effect.Effect<R, E, O>,
options?: Runner.Options<E, O> | undefined
options?: Runner.Options<I, E, O> | undefined
) => Effect.Effect<Scope.Scope | R | PlatformRunner, WorkerError, void> = internal.make

/**
Expand All @@ -91,7 +92,7 @@ export const make: <I, R, E, O>(
*/
export const layer: <I, R, E, O>(
process: (request: I) => Stream.Stream<R, E, O> | Effect.Effect<R, E, O>,
options?: Runner.Options<E, O> | undefined
options?: Runner.Options<I, E, O> | undefined
) => Layer.Layer<R | PlatformRunner, WorkerError, never> = internal.layer

/**
Expand Down
84 changes: 37 additions & 47 deletions packages/platform/src/internal/workerRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@ import * as Context from "effect/Context"
import * as Effect from "effect/Effect"
import * as Either from "effect/Either"
import * as Fiber from "effect/Fiber"
import { pipe } from "effect/Function"
import { identity, pipe } from "effect/Function"
import * as Layer from "effect/Layer"
import * as Option from "effect/Option"
import * as Predicate from "effect/Predicate"
import * as Queue from "effect/Queue"
import type * as Scope from "effect/Scope"
import * as Stream from "effect/Stream"
import * as Transferable from "../Transferable.js"
import type * as Worker from "../Worker.js"
import type * as WorkerError from "../WorkerError.js"
import * as WorkerError from "../WorkerError.js"
import type * as WorkerRunner from "../WorkerRunner.js"

/** @internal */
Expand All @@ -31,7 +30,7 @@ export const PlatformRunner = Context.Tag<WorkerRunner.PlatformRunner>(
/** @internal */
export const make = <I, R, E, O>(
process: (request: I) => Stream.Stream<R, E, O> | Effect.Effect<R, E, O>,
options?: WorkerRunner.Runner.Options<E, O>
options?: WorkerRunner.Runner.Options<I, E, O>
) =>
Effect.gen(function*(_) {
const platform = yield* _(PlatformRunner)
Expand All @@ -41,6 +40,15 @@ export const make = <I, R, E, O>(

yield* _(
Queue.take(backing.queue),
options?.decode ?
Effect.flatMap((req): Effect.Effect<never, WorkerError.WorkerError, Worker.Worker.Request<I>> => {
if (req[1] === 1) {
return Effect.succeed(req)
}

return Effect.map(options.decode!(req[2]), (data) => [req[0], req[1], data])
}) :
identity,
Effect.tap((req) => {
const id = req[0]
if (req[1] === 1) {
Expand All @@ -58,7 +66,7 @@ export const make = <I, R, E, O>(
onLeft: (error) => {
const transfers = options?.transfers ? options.transfers(error) : undefined
return pipe(
options?.encodeError ? options.encodeError(error) : Effect.succeed(error),
options?.encodeError ? options.encodeError(req[2], error) : Effect.succeed(error),
Effect.flatMap((payload) => backing.send([id, 2, payload as any], transfers)),
Effect.catchAllCause((cause) => backing.send([id, 3, Cause.squash(cause)]))
)
Expand All @@ -68,7 +76,7 @@ export const make = <I, R, E, O>(
onSuccess: (data) => {
const transfers = options?.transfers ? options.transfers(data) : undefined
return pipe(
options?.encodeOutput ? options.encodeOutput(data) : Effect.succeed(data),
options?.encodeOutput ? options.encodeOutput(req[2], data) : Effect.succeed(data),
Effect.flatMap((payload) => backing.send([id, 0, [payload]], transfers)),
Effect.catchAllCause((cause) => backing.send([id, 3, Cause.squash(cause)]))
)
Expand All @@ -90,7 +98,7 @@ export const make = <I, R, E, O>(
if (options?.transfers) {
transfers.push(...options.transfers(data))
}
return Effect.orDie(options.encodeOutput!(data))
return Effect.orDie(options.encodeOutput!(req[2], data))
}),
(payload) => backing.send([id, 0, payload], transfers)
)
Expand All @@ -102,7 +110,7 @@ export const make = <I, R, E, O>(
onLeft: (error) => {
const transfers = options?.transfers ? options.transfers(error) : undefined
return pipe(
options?.encodeError ? options.encodeError(error) : Effect.succeed(error),
options?.encodeError ? options.encodeError(req[2], error) : Effect.succeed(error),
Effect.flatMap((payload) => backing.send([id, 2, payload as any], transfers)),
Effect.catchAllCause((cause) => backing.send([id, 3, Cause.squash(cause)]))
)
Expand All @@ -129,7 +137,7 @@ export const make = <I, R, E, O>(
/** @internal */
export const layer = <I, R, E, O>(
process: (request: I) => Stream.Stream<R, E, O> | Effect.Effect<R, E, O>,
options?: WorkerRunner.Runner.Options<E, O>
options?: WorkerRunner.Runner.Options<I, E, O>
): Layer.Layer<WorkerRunner.PlatformRunner | R, WorkerError.WorkerError, never> =>
Layer.scopedDiscard(make(process, options))

Expand All @@ -153,46 +161,28 @@ export const makeSerialized = <
WorkerError.WorkerError,
void
> => {
const parseRequest = Schema.decode(schema)
const effectTags = new Set<string>()
return make((request: I) => {
if (Predicate.hasProperty(request, "_tag") && effectTags.has(request._tag as string)) {
return Effect.flatMap(parseRequest(request), (request: A) => {
const handler =
(handlers as unknown as Record<string, (req: unknown) => Effect.Effect<never, any, any>>)[request._tag]
if (!handler) {
return Effect.dieMessage(`No handler for ${request._tag}`)
}
const encodeSuccess = Schema.encode(Serializable.successSchema(request as any))
return pipe(
Effect.matchEffect(handler(request), {
onFailure: (error) => Effect.flatMap(Serializable.serializeFailure(request as any, error), Effect.fail),
onSuccess: encodeSuccess
})
)
})
}

return Stream.flatMap(parseRequest(request), (request: A) => {
const handler =
(handlers as unknown as Record<string, (req: unknown) => Stream.Stream<never, any, any>>)[request._tag]
if (!handler) {
return Stream.dieMessage(`No handler for ${request._tag}`)
}
const encodeSuccess = Schema.encode(Serializable.successSchema(request as any))
const stream = handler(request)
if (Effect.isEffect(stream)) {
effectTags.add(request._tag)
}
return pipe(
stream,
Stream.catchAll((error) => Effect.flatMap(Serializable.serializeFailure(request as any, error), Effect.fail)),
Stream.mapEffect(encodeSuccess)
)
})
}, {
const parseRequest = Schema.parse(schema)
return make((request: A) => (handlers as any)[request._tag](request), {
transfers(message) {
return Transferable.get(message)
},
decode(message) {
return Effect.mapError(
parseRequest(message),
(error) => WorkerError.WorkerError("decode", error)
)
},
encodeError(request, message) {
return Effect.mapError(
Serializable.serializeFailure(request as any, message),
(error) => WorkerError.WorkerError("encode", error)
)
},
encodeOutput(request, message) {
return Effect.mapError(
Serializable.serializeSuccess(request as any, message),
(error) => WorkerError.WorkerError("encode", error)
)
}
})
}
Expand Down

0 comments on commit 9f19478

Please sign in to comment.