Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

detach io thread output from creation cell #905

Merged
merged 2 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ipyparallel/_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

from tornado.ioloop import IOLoop

from ipyparallel.util import _OutputProducingThread as Thread


def _asyncio_run(coro):
"""Like asyncio.run, but works when there's no event loop"""
Expand Down Expand Up @@ -41,7 +43,7 @@ def _in_thread(self, async_f, *args, **kwargs):
"""Run an async function in a background thread"""
if self._async_thread is None:
self._loop_started = threading.Event()
self._async_thread = threading.Thread(target=self._thread_main, daemon=True)
self._async_thread = Thread(target=self._thread_main, daemon=True)
self._async_thread.start()
self._loop_started.wait(timeout=5)

Expand Down
3 changes: 2 additions & 1 deletion ipyparallel/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from functools import partial
from getpass import getpass
from pprint import pprint
from threading import Event, Thread, current_thread
from threading import Event, current_thread

import jupyter_client.session
import zmq
Expand Down Expand Up @@ -49,6 +49,7 @@
import ipyparallel as ipp
from ipyparallel import error, serialize, util
from ipyparallel.serialize import PrePickled, Reference
from ipyparallel.util import _OutputProducingThread as Thread

from .asyncresult import AsyncHubResult, AsyncResult
from .futures import MessageFuture, multi_future
Expand Down
7 changes: 4 additions & 3 deletions ipyparallel/cluster/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from traitlets.config.configurable import LoggingConfigurable

from ..traitlets import entry_points
from ..util import _OutputProducingThread as Thread
from ..util import shlex_join
from ._winhpcjob import IPControllerJob, IPControllerTask, IPEngineSetJob, IPEngineTask
from .shellcmd import ShellCommandSend
Expand Down Expand Up @@ -524,7 +525,7 @@ def _start_waiting(self):
# ensure self.loop is accessed on the main thread before waiting
self.loop
self._stop_waiting = threading.Event()
self._wait_thread = threading.Thread(
self._wait_thread = Thread(
target=self._wait, daemon=True, name=f"wait(pid={self.pid})"
)
self._wait_thread.start()
Expand Down Expand Up @@ -583,7 +584,7 @@ def _stream_file(self, path):
time.sleep(0.1)

def _start_streaming(self):
self._stream_thread = t = threading.Thread(
self._stream_thread = t = Thread(
target=partial(self._stream_file, self.output_file),
name=f"Stream Output {self.identifier}",
daemon=True,
Expand Down Expand Up @@ -1352,7 +1353,7 @@ def _start_waiting(self):
# ensure self.loop is accessed on the main thread before waiting
self.loop
self._stop_waiting = threading.Event()
self._wait_thread = threading.Thread(
self._wait_thread = Thread(
target=self._wait,
daemon=True,
name=f"wait(host={self.location}, pid={self.pid})",
Expand Down
34 changes: 33 additions & 1 deletion ipyparallel/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
import sys
import warnings
from datetime import datetime, timezone
from functools import lru_cache
from functools import lru_cache, partial
from signal import SIGABRT, SIGINT, SIGTERM, signal
from threading import Thread, current_thread
from types import FunctionType

import traitlets
Expand Down Expand Up @@ -804,3 +805,34 @@ def connect(
socket.setsockopt(zmq.CURVE_SECRETKEY, curve_secretkey)
socket.setsockopt(zmq.CURVE_PUBLICKEY, curve_publickey)
return socket.connect(url)


def _detach_thread_output(ident=None):
"""undo thread-parent mapping in ipykernel#1186"""
# disable ipykernel's association of thread output with the cell that
# spawned the thread.
# there should be a public API for this...
if ident is None:
ident = current_thread().ident
for stream in (sys.stdout, sys.stderr):
for name in ("_thread_to_parent", "_thread_to_parent_header"):
mapping = getattr(stream, name, None)
if mapping:
mapping.pop(ident, None)


class _OutputProducingThread(Thread):
"""
Subclass Thread to workaround bug in ipykernel
associating thread output with wrong Cell

See https://github.com/ipython/ipykernel/issues/1289
"""

def __init__(self, target, **kwargs):
wrapped_target = partial(self._wrapped_target, target)
super().__init__(target=wrapped_target, **kwargs)

def _wrapped_target(self, target, *args, **kwargs):
_detach_thread_output(self.ident)
return target(*args, **kwargs)
Loading