Top

neovim.msgpack_rpc.event_loop module

Event loop abstraction subpackage.

Tries to use pyuv as a backend, falling back to the asyncio implementation.

"""Event loop abstraction subpackage.

Tries to use pyuv as a backend, falling back to the asyncio implementation.
"""
try:
    # libuv is fully implemented in C, use it when available
    from .uv import UvEventLoop
    EventLoop = UvEventLoop
except ImportError:
    # asyncio(trollius on python 2) is pure python and should be more portable
    # across python implementations
    from .asyncio import AsyncioEventLoop
    EventLoop = AsyncioEventLoop


__all__ = ('EventLoop')

Classes

class EventLoop

BaseEventLoop subclass that uses asyncio as a backend.

class AsyncioEventLoop(BaseEventLoop, asyncio.Protocol,
                       asyncio.SubprocessProtocol):

    """`BaseEventLoop` subclass that uses `asyncio` as a backend."""

    def connection_made(self, transport):
        """Used to signal `asyncio.Protocol` of a successful connection."""
        self._transport = transport
        if isinstance(transport, asyncio.SubprocessTransport):
            self._transport = transport.get_pipe_transport(0)

    def connection_lost(self, exc):
        """Used to signal `asyncio.Protocol` of a lost connection."""
        self._on_error(exc.args[0] if exc else 'EOF')

    def data_received(self, data):
        """Used to signal `asyncio.Protocol` of incoming data."""
        if self._on_data:
            self._on_data(data)
            return
        self._queued_data.append(data)

    def pipe_connection_lost(self, fd, exc):
        """Used to signal `asyncio.SubprocessProtocol` of a lost connection."""
        self._on_error(exc.args[0] if exc else 'EOF')

    def pipe_data_received(self, fd, data):
        """Used to signal `asyncio.SubprocessProtocol` of incoming data."""
        if fd == 2:  # stderr fd number
            self._on_stderr(data)
        elif self._on_data:
            self._on_data(data)
        else:
            self._queued_data.append(data)

    def process_exited(self):
        """Used to signal `asyncio.SubprocessProtocol` when the child exits."""
        self._on_error('EOF')

    def _init(self):
        self._loop = loop_cls()
        self._queued_data = deque()
        self._fact = lambda: self

    def _connect_tcp(self, address, port):
        coroutine = self._loop.create_connection(self._fact, address, port)
        self._loop.run_until_complete(coroutine)

    def _connect_socket(self, path):
        if os.name == 'nt':
            coroutine = self._loop.create_pipe_connection(self._fact, path)
        else:
            coroutine = self._loop.create_unix_connection(self._fact, path)
        self._loop.run_until_complete(coroutine)

    def _connect_stdio(self):
        coroutine = self._loop.connect_read_pipe(self._fact, sys.stdin)
        self._loop.run_until_complete(coroutine)
        coroutine = self._loop.connect_write_pipe(self._fact, sys.stdout)
        self._loop.run_until_complete(coroutine)

    def _connect_child(self, argv):
        self._child_watcher = asyncio.get_child_watcher()
        self._child_watcher.attach_loop(self._loop)
        coroutine = self._loop.subprocess_exec(self._fact, *argv)
        self._loop.run_until_complete(coroutine)

    def _start_reading(self):
        pass

    def _send(self, data):
        self._transport.write(data)

    def _run(self):
        while self._queued_data:
            self._on_data(self._queued_data.popleft())
        self._loop.run_forever()

    def _stop(self):
        self._loop.stop()

    def _threadsafe_call(self, fn):
        self._loop.call_soon_threadsafe(fn)

    def _setup_signals(self, signals):
        if os.name == 'nt':
            # add_signal_handler is not supported in win32
            self._signals = []
            return

        self._signals = list(signals)
        for signum in self._signals:
            self._loop.add_signal_handler(signum, self._on_signal, signum)

    def _teardown_signals(self):
        for signum in self._signals:
            self._loop.remove_signal_handler(signum)

Ancestors (in MRO)

Static methods

def __init__(

self, transport_type, *args)

Initialize and connect the event loop instance.

The only arguments are the transport type and transport-specific configuration, like this:

BaseEventLoop('tcp', '127.0.0.1', 7450) Traceback (most recent call last): ... AttributeError: 'BaseEventLoop' object has no attribute '_init' BaseEventLoop('socket', '/tmp/nvim-socket') Traceback (most recent call last): ... AttributeError: 'BaseEventLoop' object has no attribute '_init' BaseEventLoop('stdio') Traceback (most recent call last): ... AttributeError: 'BaseEventLoop' object has no attribute '_init' BaseEventLoop('child', ['nvim', '--embed', '-u', 'NONE']) Traceback (most recent call last): ... AttributeError: 'BaseEventLoop' object has no attribute '_init'

This calls the implementation-specific initialization _init, one of the _connect_* methods(based on transport_type) and _start_reading()

def __init__(self, transport_type, *args):
    """Initialize and connect the event loop instance.
    The only arguments are the transport type and transport-specific
    configuration, like this:
    >>> BaseEventLoop('tcp', '127.0.0.1', 7450)
    Traceback (most recent call last):
        ...
    AttributeError: 'BaseEventLoop' object has no attribute '_init'
    >>> BaseEventLoop('socket', '/tmp/nvim-socket')
    Traceback (most recent call last):
        ...
    AttributeError: 'BaseEventLoop' object has no attribute '_init'
    >>> BaseEventLoop('stdio')
    Traceback (most recent call last):
        ...
    AttributeError: 'BaseEventLoop' object has no attribute '_init'
    >>> BaseEventLoop('child', ['nvim', '--embed', '-u', 'NONE'])
    Traceback (most recent call last):
        ...
    AttributeError: 'BaseEventLoop' object has no attribute '_init'
    This calls the implementation-specific initialization
    `_init`, one of the `_connect_*` methods(based on `transport_type`)
    and `_start_reading()`
    """
    self._transport_type = transport_type
    self._signames = dict((k, v) for v, k in signal.__dict__.items()
                          if v.startswith('SIG'))
    self._on_data = None
    self._error = None
    self._init()
    getattr(self, '_connect_{}'.format(transport_type))(*args)
    self._start_reading()

def connect_child(

self, argv)

Connect a new Nvim instance. Delegated to _connect_child.

def connect_child(self, argv):
    """Connect a new Nvim instance. Delegated to `_connect_child`."""
    info('Spawning a new nvim instance')
    self._connect_child(argv)

def connect_socket(

self, path)

Connect to socket at path. Delegated to _connect_socket.

def connect_socket(self, path):
    """Connect to socket at `path`. Delegated to `_connect_socket`."""
    info('Connecting to %s', path)
    self._connect_socket(path)

def connect_stdio(

self)

Connect using stdin/stdout. Delegated to _connect_stdio.

def connect_stdio(self):
    """Connect using stdin/stdout. Delegated to `_connect_stdio`."""
    info('Preparing stdin/stdout for streaming data')
    self._connect_stdio()

def connect_tcp(

self, address, port)

Connect to tcp/ip address:port. Delegated to _connect_tcp.

def connect_tcp(self, address, port):
    """Connect to tcp/ip `address`:`port`. Delegated to `_connect_tcp`."""
    info('Connecting to TCP address: %s:%d', address, port)
    self._connect_tcp(address, port)

def connection_lost(

self, exc)

Used to signal asyncio.Protocol of a lost connection.

def connection_lost(self, exc):
    """Used to signal `asyncio.Protocol` of a lost connection."""
    self._on_error(exc.args[0] if exc else 'EOF')

def connection_made(

self, transport)

Used to signal asyncio.Protocol of a successful connection.

def connection_made(self, transport):
    """Used to signal `asyncio.Protocol` of a successful connection."""
    self._transport = transport
    if isinstance(transport, asyncio.SubprocessTransport):
        self._transport = transport.get_pipe_transport(0)

def data_received(

self, data)

Used to signal asyncio.Protocol of incoming data.

def data_received(self, data):
    """Used to signal `asyncio.Protocol` of incoming data."""
    if self._on_data:
        self._on_data(data)
        return
    self._queued_data.append(data)

def eof_received(

self)

Called when the other end calls write_eof() or equivalent.

If this returns a false value (including None), the transport will close itself. If it returns a true value, closing the transport is up to the protocol.

def eof_received(self):
    """Called when the other end calls write_eof() or equivalent.
    If this returns a false value (including None), the transport
    will close itself.  If it returns a true value, closing the
    transport is up to the protocol.
    """

def pause_writing(

self)

Called when the transport's buffer goes over the high-water mark.

Pause and resume calls are paired -- pause_writing() is called once when the buffer goes strictly over the high-water mark (even if subsequent writes increases the buffer size even more), and eventually resume_writing() is called once when the buffer size reaches the low-water mark.

Note that if the buffer size equals the high-water mark, pause_writing() is not called -- it must go strictly over. Conversely, resume_writing() is called when the buffer size is equal or lower than the low-water mark. These end conditions are important to ensure that things go as expected when either mark is zero.

NOTE: This is the only Protocol callback that is not called through EventLoop.call_soon() -- if it were, it would have no effect when it's most needed (when the app keeps writing without yielding until pause_writing() is called).

def pause_writing(self):
    """Called when the transport's buffer goes over the high-water mark.
    Pause and resume calls are paired -- pause_writing() is called
    once when the buffer goes strictly over the high-water mark
    (even if subsequent writes increases the buffer size even
    more), and eventually resume_writing() is called once when the
    buffer size reaches the low-water mark.
    Note that if the buffer size equals the high-water mark,
    pause_writing() is not called -- it must go strictly over.
    Conversely, resume_writing() is called when the buffer size is
    equal or lower than the low-water mark.  These end conditions
    are important to ensure that things go as expected when either
    mark is zero.
    NOTE: This is the only Protocol callback that is not called
    through EventLoop.call_soon() -- if it were, it would have no
    effect when it's most needed (when the app keeps writing
    without yielding until pause_writing() is called).
    """

def pipe_connection_lost(

self, fd, exc)

Used to signal asyncio.SubprocessProtocol of a lost connection.

def pipe_connection_lost(self, fd, exc):
    """Used to signal `asyncio.SubprocessProtocol` of a lost connection."""
    self._on_error(exc.args[0] if exc else 'EOF')

def pipe_data_received(

self, fd, data)

Used to signal asyncio.SubprocessProtocol of incoming data.

def pipe_data_received(self, fd, data):
    """Used to signal `asyncio.SubprocessProtocol` of incoming data."""
    if fd == 2:  # stderr fd number
        self._on_stderr(data)
    elif self._on_data:
        self._on_data(data)
    else:
        self._queued_data.append(data)

def process_exited(

self)

Used to signal asyncio.SubprocessProtocol when the child exits.

def process_exited(self):
    """Used to signal `asyncio.SubprocessProtocol` when the child exits."""
    self._on_error('EOF')

def resume_writing(

self)

Called when the transport's buffer drains below the low-water mark.

See pause_writing() for details.

def resume_writing(self):
    """Called when the transport's buffer drains below the low-water mark.
    See pause_writing() for details.
    """

def run(

self, data_cb)

Run the event loop.

def run(self, data_cb):
    """Run the event loop."""
    if self._error:
        err = self._error
        if isinstance(self._error, KeyboardInterrupt):
            # KeyboardInterrupt is not destructive(it may be used in
            # the REPL).
            # After throwing KeyboardInterrupt, cleanup the _error field
            # so the loop may be started again
            self._error = None
        raise err
    self._on_data = data_cb
    if threading.current_thread() == main_thread:
        self._setup_signals([signal.SIGINT, signal.SIGTERM])
    debug('Entering event loop')
    self._run()
    debug('Exited event loop')
    if threading.current_thread() == main_thread:
        self._teardown_signals()
        signal.signal(signal.SIGINT, default_int_handler)
    self._on_data = None

def send(

self, data)

Queue data for sending to Nvim.

def send(self, data):
    """Queue `data` for sending to Nvim."""
    debug("Sending '%s'", data)
    self._send(data)

def stop(

self)

Stop the event loop.

def stop(self):
    """Stop the event loop."""
    self._stop()
    debug('Stopped event loop')

def threadsafe_call(

self, fn)

Call a function in the event loop thread.

This is the only safe way to interact with a session from other threads.

def threadsafe_call(self, fn):
    """Call a function in the event loop thread.
    This is the only safe way to interact with a session from other
    threads.
    """
    self._threadsafe_call(fn)

Sub-modules

neovim.msgpack_rpc.event_loop.asyncio

Event loop implementation that uses the asyncio standard module.

The asyncio module was added to python standard library on 3.4, and it provides a pure python implementation of an event loop library. It is used as a fallback in case pyuv is not available(on python implementations other than CPy...

neovim.msgpack_rpc.event_loop.base

Common code for event loop implementations.