From 5bb1e07a6058a54af437d3814e6b705f573f552e Mon Sep 17 00:00:00 2001 From: Lee Byron Date: Thu, 6 Jun 2024 11:56:07 -0700 Subject: [PATCH 1/5] Editorial changes for Event Streams This makes a few changes to the Subscriptions section where we're talking about event streams in an attempt to make it more clear about what's going on. - Revised variable names from "fieldStream" to "sourceStream" to make it easier to trace variables through algorithms. - Rewrote the "Event Streams" definition to be more clear about "emit" keyword and have clear paragraphs on completion and cancellation. - Rewrote the `MapSourceToResponseEvent` algorithm to be a correctly formatted algorithm with a return statement at the end. Introduced a new "When" keyword to describe event subscriptions. Added explicit sections on passing back cancellation (discussed in WG) as well as completion with error (not discussed, but I realized was also left ambiguous) --- spec/Section 6 -- Execution.md | 48 ++++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/spec/Section 6 -- Execution.md b/spec/Section 6 -- Execution.md index 5b8594e30..58ced496a 100644 --- a/spec/Section 6 -- Execution.md +++ b/spec/Section 6 -- Execution.md @@ -217,14 +217,18 @@ chat room ID is the "topic" and each "publish" contains the sender and text. **Event Streams** -An event stream represents a sequence of discrete events over time which can be -observed. As an example, a "Pub-Sub" system may produce an event stream when -"subscribing to a topic", with an event occurring on that event stream for each -"publish" to that topic. Event streams may produce an infinite sequence of -events or may complete at any point. Event streams may complete in response to -an error or simply because no more events will occur. An observer may at any -point decide to stop observing an event stream by cancelling it, after which it -must receive no more events from that event stream. +An event stream represents a sequence of discrete emitted events over time which +can be observed. As an example, a "Pub-Sub" system may produce an event stream +when "subscribing to a topic", with an event emitted for each "publish" to that +topic. + +Event streams may complete at any point, often because no further events will +occur. Event streams may emit an infinite sequence of events, in which they may +never complete. If an event stream encounters an error, it must complete with +that error. + +An observer may at any point decide to stop observing an event stream by +cancelling it. When an event stream is cancelled, it must complete. **Supporting Subscriptions at Scale** @@ -268,10 +272,10 @@ CreateSourceEventStream(subscription, schema, variableValues, initialValue): - Let {field} be the first entry in {fields}. - Let {argumentValues} be the result of {CoerceArgumentValues(subscriptionType, field, variableValues)}. -- Let {fieldStream} be the result of running +- Let {sourceStream} be the result of running {ResolveFieldEventStream(subscriptionType, initialValue, fieldName, argumentValues)}. -- Return {fieldStream}. +- Return {sourceStream}. ResolveFieldEventStream(subscriptionType, rootValue, fieldName, argumentValues): @@ -292,12 +296,18 @@ subscription _selection set_ using that event as a root value. MapSourceToResponseEvent(sourceStream, subscription, schema, variableValues): -- Return a new event stream {responseStream} which yields events as follows: - - For each {event} on {sourceStream}: - - Let {response} be the result of running - {ExecuteSubscriptionEvent(subscription, schema, variableValues, event)}. - - Yield an event containing {response}. - - When {sourceStream} completes: complete {responseStream}. +- Let {responseStream} be a new event stream. +- When {sourceStream} emits {event}: + - Let {response} be the result of running + {ExecuteSubscriptionEvent(subscription, schema, variableValues, event)}. + - Emit {response} on {responseStream}. +- When {sourceStream} completes normally: + - Complete {responseStream} normally. +- When {sourceStream} completes with {error}: + - Complete {responseStream} with {error}. +- When {responseStream} is cancelled: + - Cancel {sourceStream}. +- Return {responseStream}. ExecuteSubscriptionEvent(subscription, schema, variableValues, initialValue): @@ -317,9 +327,9 @@ Note: The {ExecuteSubscriptionEvent()} algorithm is intentionally similar to #### Unsubscribe Unsubscribe cancels the Response Stream when a client no longer wishes to -receive payloads for a subscription. This may in turn also cancel the Source -Stream. This is also a good opportunity to clean up any other resources used by -the subscription. +receive payloads for a subscription. This in turn also cancels the Source +Stream, which is a good opportunity to clean up any other resources used by the +subscription. Unsubscribe(responseStream): From 75f10e0a9ea07920b6a6ceb6ec0009aa5be974c7 Mon Sep 17 00:00:00 2001 From: Lee Byron Date: Tue, 11 Jun 2024 10:31:29 -0700 Subject: [PATCH 2/5] feedback and use definition syntax --- spec/Section 2 -- Language.md | 4 +-- spec/Section 6 -- Execution.md | 53 +++++++++++++++++++++------------- 2 files changed, 35 insertions(+), 22 deletions(-) diff --git a/spec/Section 2 -- Language.md b/spec/Section 2 -- Language.md index 2fe3a5a31..76b5fadcb 100644 --- a/spec/Section 2 -- Language.md +++ b/spec/Section 2 -- Language.md @@ -288,8 +288,8 @@ There are three types of operations that GraphQL models: - query - a read-only fetch. - mutation - a write followed by a fetch. -- subscription - a long-lived request that fetches data in response to source - events. +- subscription - a long-lived request that fetches data in response to a + sequence of events over time. Each operation is represented by an optional operation name and a _selection set_. diff --git a/spec/Section 6 -- Execution.md b/spec/Section 6 -- Execution.md index 58ced496a..793b142e6 100644 --- a/spec/Section 6 -- Execution.md +++ b/spec/Section 6 -- Execution.md @@ -164,7 +164,7 @@ ExecuteMutation(mutation, schema, variableValues, initialValue): ### Subscription -If the operation is a subscription, the result is an event stream called the +If the operation is a subscription, the result is an _event stream_ called the "Response Stream" where each event in the event stream is the result of executing the operation for each new event on an underlying "Source Stream". @@ -217,18 +217,21 @@ chat room ID is the "topic" and each "publish" contains the sender and text. **Event Streams** -An event stream represents a sequence of discrete emitted events over time which -can be observed. As an example, a "Pub-Sub" system may produce an event stream -when "subscribing to a topic", with an event emitted for each "publish" to that -topic. +:: An _event stream_ represents a sequence of events: discrete emitted values +over time which can be observed. As an example, a "Pub-Sub" system may produce +an _event stream_ when "subscribing to a topic", with an value emitted for each +"publish" to that topic. -Event streams may complete at any point, often because no further events will -occur. Event streams may emit an infinite sequence of events, in which they may -never complete. If an event stream encounters an error, it must complete with -that error. +An _event stream_ may complete at any point, often because no further events +will occur. An _event stream_ may emit an infinite sequence of values, in which +it may never complete. If an _event stream_ encounters an error, it must +complete with that error. -An observer may at any point decide to stop observing an event stream by -cancelling it. When an event stream is cancelled, it must complete. +An observer may at any point decide to stop observing an _event stream_ by +cancelling it. When an _event stream_ is cancelled, it must complete. + +Internal user code also may cancel an _event stream_ for any reason, which would +be observed as that _event stream_ completing. **Supporting Subscriptions at Scale** @@ -254,8 +257,8 @@ service details should be chosen by the implementing service. #### Source Stream -A Source Stream represents the sequence of events, each of which will trigger a -GraphQL execution corresponding to that event. Like field value resolution, the +A Source Stream is an _event stream_ representing a sequence of root values, +each of which will trigger a GraphQL execution. Like field value resolution, the logic to create a Source Stream is application-specific. CreateSourceEventStream(subscription, schema, variableValues, initialValue): @@ -280,7 +283,7 @@ CreateSourceEventStream(subscription, schema, variableValues, initialValue): ResolveFieldEventStream(subscriptionType, rootValue, fieldName, argumentValues): - Let {resolver} be the internal function provided by {subscriptionType} for - determining the resolved event stream of a subscription field named + determining the resolved _event stream_ of a subscription field named {fieldName}. - Return the result of calling {resolver}, providing {rootValue} and {argumentValues}. @@ -291,24 +294,34 @@ operation type. #### Response Stream -Each event in the underlying Source Stream triggers execution of the -subscription _selection set_ using that event as a root value. +Each event from the underlying Source Stream triggers execution of the +subscription _selection set_ using that event's value as the {initialValue}. MapSourceToResponseEvent(sourceStream, subscription, schema, variableValues): -- Let {responseStream} be a new event stream. -- When {sourceStream} emits {event}: +- Let {responseStream} be a new _event stream_. +- When {sourceStream} emits {sourceValue}: - Let {response} be the result of running - {ExecuteSubscriptionEvent(subscription, schema, variableValues, event)}. - - Emit {response} on {responseStream}. + {ExecuteSubscriptionEvent(subscription, schema, variableValues, + sourceValue)}. + - If internal {error} was raised: + - Cancel {sourceStream}. + - Complete {responseStream} with {error}. + - Otherwise emit {response} on {responseStream}. - When {sourceStream} completes normally: - Complete {responseStream} normally. - When {sourceStream} completes with {error}: - Complete {responseStream} with {error}. - When {responseStream} is cancelled: - Cancel {sourceStream}. + - Complete {responseStream} normally. - Return {responseStream}. +Note: Since {ExecuteSubscriptionEvent()} handles all _field error_, and _request +error_ only occur during {CreateSourceEventStream()}, the only remaining error +condition handled from {ExecuteSubscriptionEvent()} are internal exceptional +errors not described by this specification. + ExecuteSubscriptionEvent(subscription, schema, variableValues, initialValue): - Let {subscriptionType} be the root Subscription type in {schema}. From 2ef903c4a4fd6a0793c0274c67a42d53ca419170 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Thu, 21 Nov 2024 15:02:58 +0000 Subject: [PATCH 3/5] Don't complete with error. --- spec/Section 6 -- Execution.md | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/spec/Section 6 -- Execution.md b/spec/Section 6 -- Execution.md index 793b142e6..7bf17dd24 100644 --- a/spec/Section 6 -- Execution.md +++ b/spec/Section 6 -- Execution.md @@ -306,12 +306,12 @@ MapSourceToResponseEvent(sourceStream, subscription, schema, variableValues): sourceValue)}. - If internal {error} was raised: - Cancel {sourceStream}. - - Complete {responseStream} with {error}. + - Perform {EmitErrorAndComplete(responseStream, error)}. - Otherwise emit {response} on {responseStream}. - When {sourceStream} completes normally: - Complete {responseStream} normally. - When {sourceStream} completes with {error}: - - Complete {responseStream} with {error}. + - Perform {EmitErrorAndComplete(responseStream, error)}. - When {responseStream} is cancelled: - Cancel {sourceStream}. - Complete {responseStream} normally. @@ -337,6 +337,13 @@ ExecuteSubscriptionEvent(subscription, schema, variableValues, initialValue): Note: The {ExecuteSubscriptionEvent()} algorithm is intentionally similar to {ExecuteQuery()} since this is how each event result is produced. +EmitErrorAndComplete(responseStream, error): + +- Let {errors} be a list containing {error}. +- Let {response} be an unordered map containing {errors}. +- Emit {response} on {responseStream}. +- Complete {responseStream} normally. + #### Unsubscribe Unsubscribe cancels the Response Stream when a client no longer wishes to From 5257c92869451ae79134ecd8e3f3b13a6005b652 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Thu, 21 Nov 2024 15:03:57 +0000 Subject: [PATCH 4/5] Don't use a new algorithm, no need to DRY --- spec/Section 6 -- Execution.md | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/spec/Section 6 -- Execution.md b/spec/Section 6 -- Execution.md index 7bf17dd24..07784f3c7 100644 --- a/spec/Section 6 -- Execution.md +++ b/spec/Section 6 -- Execution.md @@ -306,12 +306,18 @@ MapSourceToResponseEvent(sourceStream, subscription, schema, variableValues): sourceValue)}. - If internal {error} was raised: - Cancel {sourceStream}. - - Perform {EmitErrorAndComplete(responseStream, error)}. + - Let {errors} be a list containing {error}. + - Let {response} be an unordered map containing {errors}. + - Emit {response} on {responseStream}. + - Complete {responseStream} normally. - Otherwise emit {response} on {responseStream}. - When {sourceStream} completes normally: - Complete {responseStream} normally. - When {sourceStream} completes with {error}: - - Perform {EmitErrorAndComplete(responseStream, error)}. + - Let {errors} be a list containing {error}. + - Let {response} be an unordered map containing {errors}. + - Emit {response} on {responseStream}. + - Complete {responseStream} normally. - When {responseStream} is cancelled: - Cancel {sourceStream}. - Complete {responseStream} normally. @@ -337,13 +343,6 @@ ExecuteSubscriptionEvent(subscription, schema, variableValues, initialValue): Note: The {ExecuteSubscriptionEvent()} algorithm is intentionally similar to {ExecuteQuery()} since this is how each event result is produced. -EmitErrorAndComplete(responseStream, error): - -- Let {errors} be a list containing {error}. -- Let {response} be an unordered map containing {errors}. -- Emit {response} on {responseStream}. -- Complete {responseStream} normally. - #### Unsubscribe Unsubscribe cancels the Response Stream when a client no longer wishes to From f49aab72ea487dd47fc1e1f72dcf939b3d0b221e Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Thu, 21 Nov 2024 15:09:28 +0000 Subject: [PATCH 5/5] Internal errors should be passed up the chain --- spec/Section 6 -- Execution.md | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/spec/Section 6 -- Execution.md b/spec/Section 6 -- Execution.md index 07784f3c7..40abb9e1c 100644 --- a/spec/Section 6 -- Execution.md +++ b/spec/Section 6 -- Execution.md @@ -306,10 +306,7 @@ MapSourceToResponseEvent(sourceStream, subscription, schema, variableValues): sourceValue)}. - If internal {error} was raised: - Cancel {sourceStream}. - - Let {errors} be a list containing {error}. - - Let {response} be an unordered map containing {errors}. - - Emit {response} on {responseStream}. - - Complete {responseStream} normally. + - Complete {responseStream} with {error}. - Otherwise emit {response} on {responseStream}. - When {sourceStream} completes normally: - Complete {responseStream} normally.