diff --git a/ipyparallel/_async.py b/ipyparallel/_async.py index bbf05dce..489f53ca 100644 --- a/ipyparallel/_async.py +++ b/ipyparallel/_async.py @@ -8,6 +8,8 @@ from tornado.ioloop import IOLoop +from ipyparallel.util import _OutputProducingThread as Thread + def _asyncio_run(coro): """Like asyncio.run, but works when there's no event loop""" @@ -41,7 +43,7 @@ def _in_thread(self, async_f, *args, **kwargs): """Run an async function in a background thread""" if self._async_thread is None: self._loop_started = threading.Event() - self._async_thread = threading.Thread(target=self._thread_main, daemon=True) + self._async_thread = Thread(target=self._thread_main, daemon=True) self._async_thread.start() self._loop_started.wait(timeout=5) diff --git a/ipyparallel/client/client.py b/ipyparallel/client/client.py index 7675c32b..893b5169 100644 --- a/ipyparallel/client/client.py +++ b/ipyparallel/client/client.py @@ -15,7 +15,7 @@ from functools import partial from getpass import getpass from pprint import pprint -from threading import Event, Thread, current_thread +from threading import Event, current_thread import jupyter_client.session import zmq @@ -49,6 +49,7 @@ import ipyparallel as ipp from ipyparallel import error, serialize, util from ipyparallel.serialize import PrePickled, Reference +from ipyparallel.util import _OutputProducingThread as Thread from .asyncresult import AsyncHubResult, AsyncResult from .futures import MessageFuture, multi_future diff --git a/ipyparallel/cluster/launcher.py b/ipyparallel/cluster/launcher.py index 23fa5ebe..325858e8 100644 --- a/ipyparallel/cluster/launcher.py +++ b/ipyparallel/cluster/launcher.py @@ -42,6 +42,7 @@ from traitlets.config.configurable import LoggingConfigurable from ..traitlets import entry_points +from ..util import _OutputProducingThread as Thread from ..util import shlex_join from ._winhpcjob import IPControllerJob, IPControllerTask, IPEngineSetJob, IPEngineTask from .shellcmd import ShellCommandSend @@ -524,7 +525,7 @@ def _start_waiting(self): # ensure self.loop is accessed on the main thread before waiting self.loop self._stop_waiting = threading.Event() - self._wait_thread = threading.Thread( + self._wait_thread = Thread( target=self._wait, daemon=True, name=f"wait(pid={self.pid})" ) self._wait_thread.start() @@ -583,7 +584,7 @@ def _stream_file(self, path): time.sleep(0.1) def _start_streaming(self): - self._stream_thread = t = threading.Thread( + self._stream_thread = t = Thread( target=partial(self._stream_file, self.output_file), name=f"Stream Output {self.identifier}", daemon=True, @@ -1352,7 +1353,7 @@ def _start_waiting(self): # ensure self.loop is accessed on the main thread before waiting self.loop self._stop_waiting = threading.Event() - self._wait_thread = threading.Thread( + self._wait_thread = Thread( target=self._wait, daemon=True, name=f"wait(host={self.location}, pid={self.pid})", diff --git a/ipyparallel/util.py b/ipyparallel/util.py index 2ecbfbd9..8e6fb259 100644 --- a/ipyparallel/util.py +++ b/ipyparallel/util.py @@ -13,8 +13,9 @@ import sys import warnings from datetime import datetime, timezone -from functools import lru_cache +from functools import lru_cache, partial from signal import SIGABRT, SIGINT, SIGTERM, signal +from threading import Thread, current_thread from types import FunctionType import traitlets @@ -804,3 +805,34 @@ def connect( socket.setsockopt(zmq.CURVE_SECRETKEY, curve_secretkey) socket.setsockopt(zmq.CURVE_PUBLICKEY, curve_publickey) return socket.connect(url) + + +def _detach_thread_output(ident=None): + """undo thread-parent mapping in ipykernel#1186""" + # disable ipykernel's association of thread output with the cell that + # spawned the thread. + # there should be a public API for this... + if ident is None: + ident = current_thread().ident + for stream in (sys.stdout, sys.stderr): + for name in ("_thread_to_parent", "_thread_to_parent_header"): + mapping = getattr(stream, name, None) + if mapping: + mapping.pop(ident, None) + + +class _OutputProducingThread(Thread): + """ + Subclass Thread to workaround bug in ipykernel + associating thread output with wrong Cell + + See https://github.com/ipython/ipykernel/issues/1289 + """ + + def __init__(self, target, **kwargs): + wrapped_target = partial(self._wrapped_target, target) + super().__init__(target=wrapped_target, **kwargs) + + def _wrapped_target(self, target, *args, **kwargs): + _detach_thread_output(self.ident) + return target(*args, **kwargs)