diff --git a/bun.lockb b/bun.lockb index 7b9cfeab..d3899d9b 100755 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/package.json b/package.json index 6ee9dfbf..e5e035e3 100644 --- a/package.json +++ b/package.json @@ -113,6 +113,7 @@ "eslint": "^8.49.0", "eslint-plugin-security": "^2.1.0", "eslint-plugin-sonarjs": "^0.23.0", + "eventsource-parser": "^1.1.2", "expect-type": "^0.16.0", "memoirist": "^0.2.0", "prettier": "^3.3.3", diff --git a/src/dynamic-handle.ts b/src/dynamic-handle.ts index 1b84cfa9..737d1c89 100644 --- a/src/dynamic-handle.ts +++ b/src/dynamic-handle.ts @@ -389,7 +389,7 @@ export const createDynamicHandler = } } - return (context.response = mapResponse(response, context.set)) + return (context.response = await mapResponse(response, context.set)) } catch (error) { if ((error as ElysiaErrors).status) set.status = (error as ElysiaErrors).status @@ -418,7 +418,7 @@ export const createDynamicErrorHandler = let response = hook.fn(errorContext as any) if (response instanceof Promise) response = await response if (response !== undefined && response !== null) - return (context.response = mapResponse(response, context.set)) + return (context.response = await mapResponse(response, context.set)) } return new Response( diff --git a/src/handler.ts b/src/handler.ts index e66bc076..802a35ba 100644 --- a/src/handler.ts +++ b/src/handler.ts @@ -117,7 +117,6 @@ export const serializeCookie = (cookies: Context['set']['cookie']) => { // return arr // } - const handleStream = async ( generator: Generator | AsyncGenerator, set?: Context['set'], @@ -126,10 +125,11 @@ const handleStream = async ( let init = generator.next() if (init instanceof Promise) init = await init - if (init.done) { + if (init?.done) { if (set) return mapResponse(init.value, set, request) return mapCompactResponse(init.value, request) } + return new Response( new ReadableStream({ @@ -146,38 +146,29 @@ const handleStream = async ( } }) - if (init.value !== undefined && init.value !== null) { - if ( - typeof init.value === "object" + if (init?.value !== undefined && init?.value !== null) + controller.enqueue( + Buffer.from( + `event: message\ndata: ${JSON.stringify(init.value)}\n\n` + ) ) - try { - controller.enqueue( - Buffer.from(JSON.stringify(init.value)) - ) - } catch { - controller.enqueue(Buffer.from(init.value.toString())) - } - else controller.enqueue(Buffer.from(init.value.toString())) - } - for await (const chunk of generator) { - if (end) break - if (chunk === undefined || chunk === null) continue + try { + for await (const chunk of generator) { + if (end) break + if (chunk === undefined || chunk === null) continue - if (typeof chunk === 'object') - try { - controller.enqueue( - Buffer.from(JSON.stringify(chunk)) + controller.enqueue( + Buffer.from( + `event: message\ndata: ${JSON.stringify(chunk)}\n\n` ) - } catch { - controller.enqueue(Buffer.from(chunk.toString())) - } - else controller.enqueue(Buffer.from(chunk.toString())) - - // Wait for the next event loop - // Otherwise the data will be mixed up - await new Promise((resolve) => - setTimeout(() => resolve(), 0) + ) + } + } catch (error: any) { + controller.enqueue( + Buffer.from( + `event: error\ndata: ${JSON.stringify(error.message || error.name || 'Error')}\n\n` + ) ) } diff --git a/test/response/stream.test.ts b/test/response/stream.test.ts index 76ff551f..065d64fe 100644 --- a/test/response/stream.test.ts +++ b/test/response/stream.test.ts @@ -1,8 +1,20 @@ import { describe, it, expect } from 'bun:test' import { req } from '../utils' +import { createParser } from 'eventsource-parser' import { Elysia } from '../../src' +function textEventStream(items: string[]) { + return items + .map((item) => `event: message\ndata: ${JSON.stringify(item)}\n\n`) + .join('') +} + +function parseTextEventStreamItem(item: string) { + const data = item.split('data: ')[1].split('\n')[0] + return JSON.parse(data) +} + describe('Stream', () => { it('handle stream', async () => { const expected = ['a', 'b', 'c'] @@ -31,7 +43,9 @@ describe('Stream', () => { reader.read().then(function pump({ done, value }): unknown { if (done) return resolve(acc) - expect(value.toString()).toBe(expected.shift()!) + expect(parseTextEventStreamItem(value.toString())).toBe( + expected.shift()! + ) acc += value.toString() return reader.read().then(pump) @@ -41,7 +55,64 @@ describe('Stream', () => { }) expect(expected).toHaveLength(0) - expect(response).toBe('abc') + expect(response).toBe(textEventStream(['a', 'b', 'c'])) + }) + it('handle errors after yield', async () => { + const app = new Elysia().get('/', async function* () { + yield 'a' + await Bun.sleep(10) + + throw new Error('an error') + }) + + const response = await app.handle(req('/')).then((x) => x.text()) + + expect(response).toBe( + 'event: message\ndata: "a"\n\nevent: error\ndata: "an error"\n\n' + ) + }) + + it('handle errors before yield when aot is false', async () => { + const app = new Elysia({ aot: false }) + .onError(({ error }) => { + return new Response(error.message) + }) + .get('/', async function* () { + throw new Error('an error xxxx') + }) + + const response = await app.handle(req('/')).then((x) => x.text()) + + expect(response).toInclude('an error') + }) + + it.todo('handle errors before yield when aot is true', async () => { + const app = new Elysia({ aot: true }) + .onError(({ error }) => { + return new Response(error.message) + }) + .get('/', async function* () { + throw new Error('an error') + }) + + const response = await app.handle(req('/')).then((x) => x.text()) + + expect(response).toInclude('an error') + }) + + it.todo('handle errors before yield with onError', async () => { + const expected = 'error expected' + const app = new Elysia() + .onError(({}) => { + return new Response(expected) + }) + .get('/', async function* () { + throw new Error('an error') + }) + + const response = await app.handle(req('/')).then((x) => x.text()) + + expect(response).toBe(expected) }) it('stop stream on canceled request', async () => { @@ -79,9 +150,13 @@ describe('Stream', () => { const { promise, resolve } = Promise.withResolvers() reader.read().then(function pump({ done, value }): unknown { - if (done) return resolve(acc) + if (done) { + return resolve(acc) + } - expect(value.toString()).toBe(expected.shift()!) + expect(parseTextEventStreamItem(value.toString())).toBe( + expected.shift()! + ) acc += value.toString() return reader.read().then(pump) @@ -91,7 +166,7 @@ describe('Stream', () => { }) expect(expected).toHaveLength(0) - expect(response).toBe('ab') + expect(response).toBe(textEventStream(['a', 'b'])) }) it('mutate set before yield is called', async () => { @@ -111,6 +186,42 @@ describe('Stream', () => { 'http://saltyaom.com' ) }) + it('handle stream with objects', async () => { + const objects = [ + { message: 'hello' }, + { response: 'world' }, + { data: [1, 2, 3] }, + { result: [4, 5, 6] } + ] + const app = new Elysia().get('/', async function* ({}) { + for (const obj of objects) { + yield obj + } + }) + + const body = await app.handle(req('/')).then((x) => x.body) + + let events = [] as any[] + const parser = createParser((event) => { + events.push(event) + }) + const { promise, resolve } = Promise.withResolvers() + const reader = body?.getReader()! + + reader.read().then(function pump({ done, value }): unknown { + if (done) { + return resolve() + } + const text = value.toString() + parser.feed(text) + return reader.read().then(pump) + }) + await promise + + expect(events.map((x) => x.data)).toEqual( + objects.map((x) => JSON.stringify(x)) + ) + }) it('mutate set before yield is called', async () => { const expected = ['a', 'b', 'c'] @@ -216,7 +327,9 @@ describe('Stream', () => { reader.read().then(function pump({ done, value }): unknown { if (done) return resolve() - expect(value.toString()).toBe(JSON.stringify(expected[i++])) + expect(parseTextEventStreamItem(value.toString())).toEqual( + expected[i++] + ) return reader.read().then(pump) })