Spaces:
Runtime error
Runtime error
"""Default classes for Comm and CommManager, for usage in IPython. | |
""" | |
# Copyright (c) IPython Development Team. | |
# Distributed under the terms of the Modified BSD License. | |
import logging | |
import uuid | |
from traitlets.utils.importstring import import_item | |
import comm | |
logger = logging.getLogger("Comm") | |
class BaseComm: | |
"""Class for communicating between a Frontend and a Kernel | |
Must be subclassed with a publish_msg method implementation which | |
sends comm messages through the iopub channel. | |
""" | |
def __init__( | |
self, | |
target_name="comm", | |
data=None, | |
metadata=None, | |
buffers=None, | |
comm_id=None, | |
primary=True, | |
target_module=None, | |
topic=None, | |
_open_data=None, | |
_close_data=None, | |
**kwargs, | |
): | |
super().__init__(**kwargs) | |
self.comm_id = comm_id if comm_id else uuid.uuid4().hex | |
self.primary = primary | |
self.target_name = target_name | |
self.target_module = target_module | |
self.topic = topic if topic else ("comm-%s" % self.comm_id).encode("ascii") | |
self._open_data = _open_data if _open_data else {} | |
self._close_data = _close_data if _close_data else {} | |
self._msg_callback = None | |
self._close_callback = None | |
self._closed = True | |
if self.primary: | |
# I am primary, open my peer. | |
self.open(data=data, metadata=metadata, buffers=buffers) | |
else: | |
self._closed = False | |
def publish_msg(self, msg_type, data=None, metadata=None, buffers=None, **keys): | |
raise NotImplementedError("publish_msg Comm method is not implemented") | |
def __del__(self): | |
"""trigger close on gc""" | |
self.close(deleting=True) | |
# publishing messages | |
def open(self, data=None, metadata=None, buffers=None): # noqa | |
"""Open the frontend-side version of this comm""" | |
if data is None: | |
data = self._open_data | |
comm_manager = comm.get_comm_manager() | |
if comm_manager is None: | |
raise RuntimeError("Comms cannot be opened without a comm_manager.") | |
comm_manager.register_comm(self) | |
try: | |
self.publish_msg( | |
"comm_open", | |
data=data, | |
metadata=metadata, | |
buffers=buffers, | |
target_name=self.target_name, | |
target_module=self.target_module, | |
) | |
self._closed = False | |
except Exception: | |
comm_manager.unregister_comm(self) | |
raise | |
def close(self, data=None, metadata=None, buffers=None, deleting=False): | |
"""Close the frontend-side version of this comm""" | |
if self._closed: | |
# only close once | |
return | |
self._closed = True | |
if data is None: | |
data = self._close_data | |
self.publish_msg( | |
"comm_close", | |
data=data, | |
metadata=metadata, | |
buffers=buffers, | |
) | |
if not deleting: | |
# If deleting, the comm can't be registered | |
comm.get_comm_manager().unregister_comm(self) | |
def send(self, data=None, metadata=None, buffers=None): | |
"""Send a message to the frontend-side version of this comm""" | |
self.publish_msg( | |
"comm_msg", | |
data=data, | |
metadata=metadata, | |
buffers=buffers, | |
) | |
# registering callbacks | |
def on_close(self, callback): | |
"""Register a callback for comm_close | |
Will be called with the `data` of the close message. | |
Call `on_close(None)` to disable an existing callback. | |
""" | |
self._close_callback = callback | |
def on_msg(self, callback): | |
"""Register a callback for comm_msg | |
Will be called with the `data` of any comm_msg messages. | |
Call `on_msg(None)` to disable an existing callback. | |
""" | |
self._msg_callback = callback | |
# handling of incoming messages | |
def handle_close(self, msg): | |
"""Handle a comm_close message""" | |
logger.debug("handle_close[%s](%s)", self.comm_id, msg) | |
if self._close_callback: | |
self._close_callback(msg) | |
def handle_msg(self, msg): | |
"""Handle a comm_msg message""" | |
logger.debug("handle_msg[%s](%s)", self.comm_id, msg) | |
if self._msg_callback: | |
from IPython import get_ipython # type:ignore | |
shell = get_ipython() | |
if shell: | |
shell.events.trigger("pre_execute") | |
self._msg_callback(msg) | |
if shell: | |
shell.events.trigger("post_execute") | |
class CommManager: | |
"""Default CommManager singleton implementation for Comms in the Kernel""" | |
# Public APIs | |
def __init__(self): | |
self.comms = {} | |
self.targets = {} | |
def register_target(self, target_name, f): | |
"""Register a callable f for a given target name | |
f will be called with two arguments when a comm_open message is received with `target`: | |
- the Comm instance | |
- the `comm_open` message itself. | |
f can be a Python callable or an import string for one. | |
""" | |
if isinstance(f, str): | |
f = import_item(f) | |
self.targets[target_name] = f | |
def unregister_target(self, target_name, f): | |
"""Unregister a callable registered with register_target""" | |
return self.targets.pop(target_name) | |
def register_comm(self, comm): | |
"""Register a new comm""" | |
comm_id = comm.comm_id | |
self.comms[comm_id] = comm | |
return comm_id | |
def unregister_comm(self, comm): | |
"""Unregister a comm, and close its counterpart""" | |
# unlike get_comm, this should raise a KeyError | |
comm = self.comms.pop(comm.comm_id) | |
def get_comm(self, comm_id): | |
"""Get a comm with a particular id | |
Returns the comm if found, otherwise None. | |
This will not raise an error, | |
it will log messages if the comm cannot be found. | |
""" | |
try: | |
return self.comms[comm_id] | |
except KeyError: | |
logger.warning("No such comm: %s", comm_id) | |
if logger.isEnabledFor(logging.DEBUG): | |
# don't create the list of keys if debug messages aren't enabled | |
logger.debug("Current comms: %s", list(self.comms.keys())) | |
# Message handlers | |
def comm_open(self, stream, ident, msg): | |
"""Handler for comm_open messages""" | |
from comm import create_comm | |
content = msg["content"] | |
comm_id = content["comm_id"] | |
target_name = content["target_name"] | |
f = self.targets.get(target_name, None) | |
comm = create_comm( | |
comm_id=comm_id, | |
primary=False, | |
target_name=target_name, | |
) | |
self.register_comm(comm) | |
if f is None: | |
logger.error("No such comm target registered: %s", target_name) | |
else: | |
try: | |
f(comm, msg) | |
return | |
except Exception: | |
logger.error("Exception opening comm with target: %s", target_name, exc_info=True) | |
# Failure. | |
try: | |
comm.close() | |
except Exception: | |
logger.error( | |
"""Could not close comm during `comm_open` failure | |
clean-up. The comm may not have been opened yet.""", | |
exc_info=True, | |
) | |
def comm_msg(self, stream, ident, msg): | |
"""Handler for comm_msg messages""" | |
content = msg["content"] | |
comm_id = content["comm_id"] | |
comm = self.get_comm(comm_id) | |
if comm is None: | |
return | |
try: | |
comm.handle_msg(msg) | |
except Exception: | |
logger.error("Exception in comm_msg for %s", comm_id, exc_info=True) | |
def comm_close(self, stream, ident, msg): | |
"""Handler for comm_close messages""" | |
content = msg["content"] | |
comm_id = content["comm_id"] | |
comm = self.get_comm(comm_id) | |
if comm is None: | |
return | |
self.comms[comm_id]._closed = True | |
del self.comms[comm_id] | |
try: | |
comm.handle_close(msg) | |
except Exception: | |
logger.error("Exception in comm_close for %s", comm_id, exc_info=True) | |
__all__ = ["CommManager", "BaseComm"] | |