neovim.msgpack_rpc.event_loop module
Event loop abstraction subpackage.
Tries to use pyuv as a backend, falling back to the asyncio implementation.
"""Event loop abstraction subpackage.
Tries to use pyuv as a backend, falling back to the asyncio implementation.
"""
try:
# libuv is fully implemented in C, use it when available
from .uv import UvEventLoop
EventLoop = UvEventLoop
except ImportError:
# asyncio(trollius on python 2) is pure python and should be more portable
# across python implementations
from .asyncio import AsyncioEventLoop
EventLoop = AsyncioEventLoop
__all__ = ('EventLoop')
Classes
class EventLoop
BaseEventLoop
subclass that uses asyncio
as a backend.
class AsyncioEventLoop(BaseEventLoop, asyncio.Protocol,
asyncio.SubprocessProtocol):
"""`BaseEventLoop` subclass that uses `asyncio` as a backend."""
def connection_made(self, transport):
"""Used to signal `asyncio.Protocol` of a successful connection."""
self._transport = transport
if isinstance(transport, asyncio.SubprocessTransport):
self._transport = transport.get_pipe_transport(0)
def connection_lost(self, exc):
"""Used to signal `asyncio.Protocol` of a lost connection."""
self._on_error(exc.args[0] if exc else 'EOF')
def data_received(self, data):
"""Used to signal `asyncio.Protocol` of incoming data."""
if self._on_data:
self._on_data(data)
return
self._queued_data.append(data)
def pipe_connection_lost(self, fd, exc):
"""Used to signal `asyncio.SubprocessProtocol` of a lost connection."""
self._on_error(exc.args[0] if exc else 'EOF')
def pipe_data_received(self, fd, data):
"""Used to signal `asyncio.SubprocessProtocol` of incoming data."""
if fd == 2: # stderr fd number
self._on_stderr(data)
elif self._on_data:
self._on_data(data)
else:
self._queued_data.append(data)
def process_exited(self):
"""Used to signal `asyncio.SubprocessProtocol` when the child exits."""
self._on_error('EOF')
def _init(self):
self._loop = loop_cls()
self._queued_data = deque()
self._fact = lambda: self
def _connect_tcp(self, address, port):
coroutine = self._loop.create_connection(self._fact, address, port)
self._loop.run_until_complete(coroutine)
def _connect_socket(self, path):
if os.name == 'nt':
coroutine = self._loop.create_pipe_connection(self._fact, path)
else:
coroutine = self._loop.create_unix_connection(self._fact, path)
self._loop.run_until_complete(coroutine)
def _connect_stdio(self):
coroutine = self._loop.connect_read_pipe(self._fact, sys.stdin)
self._loop.run_until_complete(coroutine)
coroutine = self._loop.connect_write_pipe(self._fact, sys.stdout)
self._loop.run_until_complete(coroutine)
def _connect_child(self, argv):
self._child_watcher = asyncio.get_child_watcher()
self._child_watcher.attach_loop(self._loop)
coroutine = self._loop.subprocess_exec(self._fact, *argv)
self._loop.run_until_complete(coroutine)
def _start_reading(self):
pass
def _send(self, data):
self._transport.write(data)
def _run(self):
while self._queued_data:
self._on_data(self._queued_data.popleft())
self._loop.run_forever()
def _stop(self):
self._loop.stop()
def _threadsafe_call(self, fn):
self._loop.call_soon_threadsafe(fn)
def _setup_signals(self, signals):
if os.name == 'nt':
# add_signal_handler is not supported in win32
self._signals = []
return
self._signals = list(signals)
for signum in self._signals:
self._loop.add_signal_handler(signum, self._on_signal, signum)
def _teardown_signals(self):
for signum in self._signals:
self._loop.remove_signal_handler(signum)
Ancestors (in MRO)
- EventLoop
- neovim.msgpack_rpc.event_loop.base.BaseEventLoop
- asyncio.protocols.Protocol
- asyncio.protocols.SubprocessProtocol
- asyncio.protocols.BaseProtocol
- builtins.object
Static methods
def __init__(
self, transport_type, *args)
Initialize and connect the event loop instance.
The only arguments are the transport type and transport-specific configuration, like this:
BaseEventLoop('tcp', '127.0.0.1', 7450) Traceback (most recent call last): ... AttributeError: 'BaseEventLoop' object has no attribute '_init' BaseEventLoop('socket', '/tmp/nvim-socket') Traceback (most recent call last): ... AttributeError: 'BaseEventLoop' object has no attribute '_init' BaseEventLoop('stdio') Traceback (most recent call last): ... AttributeError: 'BaseEventLoop' object has no attribute '_init' BaseEventLoop('child', ['nvim', '--embed', '-u', 'NONE']) Traceback (most recent call last): ... AttributeError: 'BaseEventLoop' object has no attribute '_init'
This calls the implementation-specific initialization
_init
, one of the _connect_*
methods(based on transport_type
)
and _start_reading()
def __init__(self, transport_type, *args):
"""Initialize and connect the event loop instance.
The only arguments are the transport type and transport-specific
configuration, like this:
>>> BaseEventLoop('tcp', '127.0.0.1', 7450)
Traceback (most recent call last):
...
AttributeError: 'BaseEventLoop' object has no attribute '_init'
>>> BaseEventLoop('socket', '/tmp/nvim-socket')
Traceback (most recent call last):
...
AttributeError: 'BaseEventLoop' object has no attribute '_init'
>>> BaseEventLoop('stdio')
Traceback (most recent call last):
...
AttributeError: 'BaseEventLoop' object has no attribute '_init'
>>> BaseEventLoop('child', ['nvim', '--embed', '-u', 'NONE'])
Traceback (most recent call last):
...
AttributeError: 'BaseEventLoop' object has no attribute '_init'
This calls the implementation-specific initialization
`_init`, one of the `_connect_*` methods(based on `transport_type`)
and `_start_reading()`
"""
self._transport_type = transport_type
self._signames = dict((k, v) for v, k in signal.__dict__.items()
if v.startswith('SIG'))
self._on_data = None
self._error = None
self._init()
getattr(self, '_connect_{}'.format(transport_type))(*args)
self._start_reading()
def connect_child(
self, argv)
Connect a new Nvim instance. Delegated to _connect_child
.
def connect_child(self, argv):
"""Connect a new Nvim instance. Delegated to `_connect_child`."""
info('Spawning a new nvim instance')
self._connect_child(argv)
def connect_socket(
self, path)
Connect to socket at path
. Delegated to _connect_socket
.
def connect_socket(self, path):
"""Connect to socket at `path`. Delegated to `_connect_socket`."""
info('Connecting to %s', path)
self._connect_socket(path)
def connect_stdio(
self)
Connect using stdin/stdout. Delegated to _connect_stdio
.
def connect_stdio(self):
"""Connect using stdin/stdout. Delegated to `_connect_stdio`."""
info('Preparing stdin/stdout for streaming data')
self._connect_stdio()
def connect_tcp(
self, address, port)
Connect to tcp/ip address
:port
. Delegated to _connect_tcp
.
def connect_tcp(self, address, port):
"""Connect to tcp/ip `address`:`port`. Delegated to `_connect_tcp`."""
info('Connecting to TCP address: %s:%d', address, port)
self._connect_tcp(address, port)
def connection_lost(
self, exc)
Used to signal asyncio.Protocol
of a lost connection.
def connection_lost(self, exc):
"""Used to signal `asyncio.Protocol` of a lost connection."""
self._on_error(exc.args[0] if exc else 'EOF')
def connection_made(
self, transport)
Used to signal asyncio.Protocol
of a successful connection.
def connection_made(self, transport):
"""Used to signal `asyncio.Protocol` of a successful connection."""
self._transport = transport
if isinstance(transport, asyncio.SubprocessTransport):
self._transport = transport.get_pipe_transport(0)
def data_received(
self, data)
Used to signal asyncio.Protocol
of incoming data.
def data_received(self, data):
"""Used to signal `asyncio.Protocol` of incoming data."""
if self._on_data:
self._on_data(data)
return
self._queued_data.append(data)
def eof_received(
self)
Called when the other end calls write_eof() or equivalent.
If this returns a false value (including None), the transport will close itself. If it returns a true value, closing the transport is up to the protocol.
def eof_received(self):
"""Called when the other end calls write_eof() or equivalent.
If this returns a false value (including None), the transport
will close itself. If it returns a true value, closing the
transport is up to the protocol.
"""
def pause_writing(
self)
Called when the transport's buffer goes over the high-water mark.
Pause and resume calls are paired -- pause_writing() is called once when the buffer goes strictly over the high-water mark (even if subsequent writes increases the buffer size even more), and eventually resume_writing() is called once when the buffer size reaches the low-water mark.
Note that if the buffer size equals the high-water mark, pause_writing() is not called -- it must go strictly over. Conversely, resume_writing() is called when the buffer size is equal or lower than the low-water mark. These end conditions are important to ensure that things go as expected when either mark is zero.
NOTE: This is the only Protocol callback that is not called through EventLoop.call_soon() -- if it were, it would have no effect when it's most needed (when the app keeps writing without yielding until pause_writing() is called).
def pause_writing(self):
"""Called when the transport's buffer goes over the high-water mark.
Pause and resume calls are paired -- pause_writing() is called
once when the buffer goes strictly over the high-water mark
(even if subsequent writes increases the buffer size even
more), and eventually resume_writing() is called once when the
buffer size reaches the low-water mark.
Note that if the buffer size equals the high-water mark,
pause_writing() is not called -- it must go strictly over.
Conversely, resume_writing() is called when the buffer size is
equal or lower than the low-water mark. These end conditions
are important to ensure that things go as expected when either
mark is zero.
NOTE: This is the only Protocol callback that is not called
through EventLoop.call_soon() -- if it were, it would have no
effect when it's most needed (when the app keeps writing
without yielding until pause_writing() is called).
"""
def pipe_connection_lost(
self, fd, exc)
Used to signal asyncio.SubprocessProtocol
of a lost connection.
def pipe_connection_lost(self, fd, exc):
"""Used to signal `asyncio.SubprocessProtocol` of a lost connection."""
self._on_error(exc.args[0] if exc else 'EOF')
def pipe_data_received(
self, fd, data)
Used to signal asyncio.SubprocessProtocol
of incoming data.
def pipe_data_received(self, fd, data):
"""Used to signal `asyncio.SubprocessProtocol` of incoming data."""
if fd == 2: # stderr fd number
self._on_stderr(data)
elif self._on_data:
self._on_data(data)
else:
self._queued_data.append(data)
def process_exited(
self)
Used to signal asyncio.SubprocessProtocol
when the child exits.
def process_exited(self):
"""Used to signal `asyncio.SubprocessProtocol` when the child exits."""
self._on_error('EOF')
def resume_writing(
self)
Called when the transport's buffer drains below the low-water mark.
See pause_writing() for details.
def resume_writing(self):
"""Called when the transport's buffer drains below the low-water mark.
See pause_writing() for details.
"""
def run(
self, data_cb)
Run the event loop.
def run(self, data_cb):
"""Run the event loop."""
if self._error:
err = self._error
if isinstance(self._error, KeyboardInterrupt):
# KeyboardInterrupt is not destructive(it may be used in
# the REPL).
# After throwing KeyboardInterrupt, cleanup the _error field
# so the loop may be started again
self._error = None
raise err
self._on_data = data_cb
if threading.current_thread() == main_thread:
self._setup_signals([signal.SIGINT, signal.SIGTERM])
debug('Entering event loop')
self._run()
debug('Exited event loop')
if threading.current_thread() == main_thread:
self._teardown_signals()
signal.signal(signal.SIGINT, default_int_handler)
self._on_data = None
def send(
self, data)
Queue data
for sending to Nvim.
def send(self, data):
"""Queue `data` for sending to Nvim."""
debug("Sending '%s'", data)
self._send(data)
def stop(
self)
Stop the event loop.
def stop(self):
"""Stop the event loop."""
self._stop()
debug('Stopped event loop')
def threadsafe_call(
self, fn)
Call a function in the event loop thread.
This is the only safe way to interact with a session from other threads.
def threadsafe_call(self, fn):
"""Call a function in the event loop thread.
This is the only safe way to interact with a session from other
threads.
"""
self._threadsafe_call(fn)
Sub-modules
neovim.msgpack_rpc.event_loop.asyncio
Event loop implementation that uses the asyncio
standard module.
The asyncio
module was added to python standard library on 3.4, and it
provides a pure python implementation of an event loop library. It is used
as a fallback in case pyuv is not available(on python implementations other
than CPy...
neovim.msgpack_rpc.event_loop.base
Common code for event loop implementations.