neovim.msgpack_rpc.msgpack_stream module
Msgpack handling in the event loop pipeline.
"""Msgpack handling in the event loop pipeline."""
import logging
from msgpack import Packer, Unpacker
from ..compat import unicode_errors_default
logger = logging.getLogger(__name__)
debug, info, warn = (logger.debug, logger.info, logger.warning,)
class MsgpackStream(object):
"""Two-way msgpack stream that wraps a event loop byte stream.
This wraps the event loop interface for reading/writing bytes and
exposes an interface for reading/writing msgpack documents.
"""
def __init__(self, event_loop):
"""Wrap `event_loop` on a msgpack-aware interface."""
self._event_loop = event_loop
self._packer = Packer(unicode_errors=unicode_errors_default)
self._unpacker = Unpacker()
self._message_cb = None
def threadsafe_call(self, fn):
"""Wrapper around `BaseEventLoop.threadsafe_call`."""
self._event_loop.threadsafe_call(fn)
def send(self, msg):
"""Queue `msg` for sending to Nvim."""
debug('sent %s', msg)
self._event_loop.send(self._packer.pack(msg))
def run(self, message_cb):
"""Run the event loop to receive messages from Nvim.
While the event loop is running, `message_cb` will be called whenever
a message has been successfully parsed from the input stream.
"""
self._message_cb = message_cb
self._event_loop.run(self._on_data)
self._message_cb = None
def stop(self):
"""Stop the event loop."""
self._event_loop.stop()
def _on_data(self, data):
self._unpacker.feed(data)
while True:
try:
debug('waiting for message...')
msg = next(self._unpacker)
debug('received message: %s', msg)
self._message_cb(msg)
except StopIteration:
debug('unpacker needs more data...')
break
Module variables
var logger
var unicode_errors_default
Classes
class MsgpackStream
Two-way msgpack stream that wraps a event loop byte stream.
This wraps the event loop interface for reading/writing bytes and exposes an interface for reading/writing msgpack documents.
class MsgpackStream(object):
"""Two-way msgpack stream that wraps a event loop byte stream.
This wraps the event loop interface for reading/writing bytes and
exposes an interface for reading/writing msgpack documents.
"""
def __init__(self, event_loop):
"""Wrap `event_loop` on a msgpack-aware interface."""
self._event_loop = event_loop
self._packer = Packer(unicode_errors=unicode_errors_default)
self._unpacker = Unpacker()
self._message_cb = None
def threadsafe_call(self, fn):
"""Wrapper around `BaseEventLoop.threadsafe_call`."""
self._event_loop.threadsafe_call(fn)
def send(self, msg):
"""Queue `msg` for sending to Nvim."""
debug('sent %s', msg)
self._event_loop.send(self._packer.pack(msg))
def run(self, message_cb):
"""Run the event loop to receive messages from Nvim.
While the event loop is running, `message_cb` will be called whenever
a message has been successfully parsed from the input stream.
"""
self._message_cb = message_cb
self._event_loop.run(self._on_data)
self._message_cb = None
def stop(self):
"""Stop the event loop."""
self._event_loop.stop()
def _on_data(self, data):
self._unpacker.feed(data)
while True:
try:
debug('waiting for message...')
msg = next(self._unpacker)
debug('received message: %s', msg)
self._message_cb(msg)
except StopIteration:
debug('unpacker needs more data...')
break
Ancestors (in MRO)
- MsgpackStream
- builtins.object
Static methods
def __init__(
self, event_loop)
Wrap event_loop
on a msgpack-aware interface.
def __init__(self, event_loop):
"""Wrap `event_loop` on a msgpack-aware interface."""
self._event_loop = event_loop
self._packer = Packer(unicode_errors=unicode_errors_default)
self._unpacker = Unpacker()
self._message_cb = None
def run(
self, message_cb)
Run the event loop to receive messages from Nvim.
While the event loop is running, message_cb
will be called whenever
a message has been successfully parsed from the input stream.
def run(self, message_cb):
"""Run the event loop to receive messages from Nvim.
While the event loop is running, `message_cb` will be called whenever
a message has been successfully parsed from the input stream.
"""
self._message_cb = message_cb
self._event_loop.run(self._on_data)
self._message_cb = None
def send(
self, msg)
Queue msg
for sending to Nvim.
def send(self, msg):
"""Queue `msg` for sending to Nvim."""
debug('sent %s', msg)
self._event_loop.send(self._packer.pack(msg))
def stop(
self)
Stop the event loop.
def stop(self):
"""Stop the event loop."""
self._event_loop.stop()
def threadsafe_call(
self, fn)
Wrapper around BaseEventLoop.threadsafe_call
.
def threadsafe_call(self, fn):
"""Wrapper around `BaseEventLoop.threadsafe_call`."""
self._event_loop.threadsafe_call(fn)