-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add timeout support #469
base: master
Are you sure you want to change the base?
Add timeout support #469
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #469 +/- ##
=========================================
Coverage 100.00% 100.00%
=========================================
Files 4 4
Lines 475 501 +26
Branches 54 57 +3
=========================================
+ Hits 475 501 +26 ☔ View full report in Codecov by Sentry. |
tests/test_sse.py
Outdated
timeout_raised = False | ||
|
||
async def frozen_write(_data: bytes) -> None: | ||
await asyncio.sleep(42) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Problem here, is that we're tampering with the server side of the connection. Is it possible to do something with the client to simulate the hanging connection? Then we can be sure this works correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know how to reproduce hanged connection, but the test covers any time-based issues.
Also I prepared this example to make sure solution helps directly to solve the issue:
import asyncio
from datetime import datetime
from aiohttp import web
from aiohttp_sse import sse_response
TIMEOUT = 5
async def hello(request: web.Request) -> web.StreamResponse:
"""Timeout example.
How to reproduce the issue:
1. Run this example
2. Open console
3. Executed the command below and then press Ctrl+Z (cmd+Z):
curl -s -N localhost:8000/events > /dev/null
4. Try to change TIMEOUT to None and repeat the steps above.
"""
async with sse_response(request, timeout=TIMEOUT) as resp:
i = 0
try:
while resp.is_connected():
spaces = " " * 4096
data = f"Server Time : {datetime.now()} {spaces}"
i += 1
if i % 100 == 0:
print(i, data)
await resp.send(data)
await asyncio.sleep(0.01)
except Exception as exc:
print(f"Exception: {type(exc).__name__} {exc}")
finally:
print("Disconnected")
return resp
if __name__ == "__main__":
app = web.Application()
app.router.add_route("GET", "/events", hello)
web.run_app(app, host="127.0.0.1", port=8000)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, my thinking is that we should be able to do something like resp.connection.transport.pause_reading()
in the test to stop the client reading the connection. But, the test is not passing then.
I'm not yet convinced this fixes the reported issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With timeout=None
this test is not passed (as expected)
@pytest.mark.parametrize("timeout", (None, 1.0))
async def test_with_timeout(
aiohttp_client: ClientFixture,
monkeypatch: pytest.MonkeyPatch,
timeout: Optional[float],
) -> None:
"""Test write timeout.
Relates to this issue:
https://github.com/sysid/sse-starlette/issues/89
"""
sse_closed = asyncio.Event()
async def handler(request: web.Request) -> EventSourceResponse:
sse = EventSourceResponse(timeout=timeout)
sse.ping_interval = 1
await sse.prepare(request)
try:
async with sse:
i = 0
while sse.is_connected():
spaces = " " * 4096
data = f"Server Time : {datetime.now()} {spaces}"
i += 1
if i % 100 == 0:
print(i, data)
await sse.send(data)
await asyncio.sleep(0.01)
finally:
sse_closed.set()
return sse # pragma: no cover
app = web.Application()
app.router.add_route("GET", "/", handler)
client = await aiohttp_client(app)
async with client.get("/") as resp:
assert resp.status == 200
resp.connection.transport.pause_reading()
print(
f"Transport paused reading with "
f"{resp.connection.transport.pause_reading}"
)
async with asyncio.timeout(10):
await sse_closed.wait()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only thing it tests is that the status was 200?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, if I add prints:
print("A", time.time())
await self.write(buffer.getvalue().encode("utf-8")),
print("B", time.time())
I then need to add an await asyncio.sleep(0)
to the original test:
await asyncio.sleep(0)
try:
await sse.send("foo")
The send() call doesn't seem to yield, so without the sleep, the client code never runs and manages to pause the reading.
But, then my output looks like:
A 1714071179.1714509
B 1714071179.1715052
So, even after we pause reading, it's not waiting for the client...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind, I increased the amount of data sent in each message, as you did above. Now I can see it working correctly!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've pushed some changes to the test. I think that's probably good now. The assert for the connection being closed was failing, so I removed that. Feel free to play with it if you think it should work though.
I'd note from the original issue:
continued generating chunks to send on this connection, slowly saturating TCP buffers before finally simply hanging in the send call.
We are only detecting that final hang and cancelling then. As far as I can tell, the buffers must be around 10MB, so if you were sending a 100 byte message once per minute, then it'd take ~28 hours to detect the hung client and disconnect it...
What do these changes do?
Provides a way to work around issue sysid/sse-starlette#89
Are there changes in behavior for the user?
Added the ability to set a timeout for interaction with a connection.
For example, if a hung connection does not read ping messages, then after a while we can automatically disconnect it to free up resources.
Related issue number
sysid/sse-starlette#89
Checklist