Asyncio Protocol

Generic Asynchronous Message-based Protocol Support

This module provides a generic framework for sending and receiving messages over an asyncio stream. AsyncProtocol is an abstract class that implements the core mechanisms of a simple send/receive protocol, and is designed to be extended.

In this package, it is used as the implementation for the QMPClient class.

class qemu.qmp.protocol.Runstate(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Protocol session runstate.

IDLE = 0

Fully quiesced and disconnected.

CONNECTING = 1

In the process of connecting or establishing a session.

RUNNING = 2

Fully connected and active session.

DISCONNECTING = 3

In the process of disconnecting. Runstate may be returned to IDLE by calling disconnect().

exception qemu.qmp.protocol.ConnectError(error_message: str, exc: Exception)[source]

Bases: QMPError

Raised when the initial connection process has failed.

This Exception always wraps a “root cause” exception that can be interrogated for additional information.

For example, when connecting to a non-existent socket:

await qmp.connect('not_found.sock')
# ConnectError: Failed to establish connection:
#               [Errno 2] No such file or directory
Parameters:
  • error_message – Human-readable string describing the error.

  • exc – The root-cause exception.

error_message: str

Human-readable error string

exc: Exception

Wrapped root cause exception

exception qemu.qmp.protocol.StateError(error_message: str, state: Runstate, required: Runstate)[source]

Bases: QMPError

An API command (connect, execute, etc) was issued at an inappropriate time.

This error is raised when a command like connect() is called when the client is already connected.

Parameters:
  • error_message – Human-readable string describing the state violation.

  • state – The actual Runstate seen at the time of the violation.

  • required – The Runstate required to process this command.

qemu.qmp.protocol.require(required_state: Runstate) Callable[[F], F][source]

Decorator: protect a method so it can only be run in a certain Runstate.

Parameters:

required_state – The Runstate required to invoke this method.

Raises:

StateError – When the required Runstate is not met.

class qemu.qmp.protocol.AsyncProtocol(name: str | None = None)[source]

Bases: Generic[T]

AsyncProtocol implements a generic async message-based protocol.

This protocol assumes the basic unit of information transfer between client and server is a “message”, the details of which are left up to the implementation. It assumes the sending and receiving of these messages is full-duplex and not necessarily correlated; i.e. it supports asynchronous inbound messages.

It is designed to be extended by a specific protocol which provides the implementations for how to read and send messages. These must be defined in _do_recv() and _do_send(), respectively.

Other callbacks have a default implementation, but are intended to be either extended or overridden:

  • _establish_session:

    The base implementation starts the reader/writer tasks. A protocol implementation can override this call, inserting actions to be taken prior to starting the reader/writer tasks before the super() call; actions needing to occur afterwards can be written after the super() call.

  • _on_message:

    Actions to be performed when a message is received.

  • _cb_outbound:

    Logging/Filtering hook for all outbound messages.

  • _cb_inbound:

    Logging/Filtering hook for all inbound messages. This hook runs before _on_message().

Parameters:

name – Name used for logging messages, if any. By default, messages will log to ‘qemu.qmp.protocol’, but each individual connection can be given its own logger by giving it a name; messages will then log to ‘qemu.qmp.protocol.${name}’.

logger = <Logger qemu.qmp.protocol (WARNING)>

Logger object for debugging messages from this connection.

_limit = 65536
_dc_task: Future[None] | None

Disconnect task. The disconnect implementation runs in a task so that asynchronous disconnects (initiated by the reader/writer) are allowed to wait for the reader/writers to exit.

property name: str | None

The nickname for this connection, if any.

This name is used for differentiating instances in debug output.

property runstate: Runstate

The current Runstate of the connection.

async runstate_changed() Runstate[source]

Wait for the runstate to change, then return that Runstate.

async start_server_and_accept(address: str | Tuple[str, int], ssl: SSLContext | None = None) None[source]

Accept a connection and begin processing message queues.

If this call fails, runstate is guaranteed to be set back to IDLE. This method is precisely equivalent to calling start_server() followed by accept().

Parameters:
  • address – Address to listen on; UNIX socket path or TCP address/port.

  • ssl – SSL context to use, if any.

Raises:
  • StateError – When the Runstate is not IDLE.

  • ConnectError

    When a connection or session cannot be established.

    This exception will wrap a more concrete one. In most cases, the wrapped exception will be OSError or EOFError. If a protocol-level failure occurs while establishing a new session, the wrapped error may also be a QMPError.

async start_server(address: str | Tuple[str, int], ssl: SSLContext | None = None) None[source]

Start listening for an incoming connection, but do not wait for a peer.

This method starts listening for an incoming connection, but does not block waiting for a peer. This call will return immediately after binding and listening on a socket. A later call to accept() must be made in order to finalize the incoming connection.

Parameters:
  • address – Address to listen on; UNIX socket path or TCP address/port.

  • ssl – SSL context to use, if any.

Raises:
  • StateError – When the Runstate is not IDLE.

  • ConnectError

    When the server could not start listening on this address.

    This exception will wrap a more concrete one. In most cases, the wrapped exception will be OSError.

async accept() None[source]

Accept an incoming connection and begin processing message queues.

Used after a previous call to start_server() to accept an incoming connection. If this call fails, runstate is guaranteed to be set back to IDLE.

Raises:
async connect(address: str | Tuple[str, int] | socket, ssl: SSLContext | None = None) None[source]

Connect to the server and begin processing message queues.

If this call fails, runstate is guaranteed to be set back to IDLE.

Parameters:
  • address – Address to connect to; UNIX socket path or TCP address/port.

  • ssl – SSL context to use, if any.

Raises:
  • StateError – When the Runstate is not IDLE.

  • ConnectError

    When a connection or session cannot be established.

    This exception will wrap a more concrete one. In most cases, the wrapped exception will be OSError or EOFError. If a protocol-level failure occurs while establishing a new session, the wrapped error may also be an QMPError.

async disconnect() None[source]

Disconnect and wait for all tasks to fully stop.

If there was an exception that caused the reader/writers to terminate prematurely, it will be raised here.

Raises:

Exception – When the reader or writer terminate unexpectedly. You can expect to see EOFError if the server hangs up, or OSError for connection-related issues. If there was a QMP protocol-level problem, ProtocolError will be seen.

_session_guard(emsg: str) AsyncGenerator[None, None][source]

Async guard function used to roll back to IDLE on any error.

On any Exception, the state machine will be reset back to IDLE. Most Exceptions will be wrapped with ConnectError, but BaseException events will be left alone (This includes asyncio.CancelledError, even prior to Python 3.8).

Parameters:

error_message – Human-readable string describing what connection phase failed.

Raises:
property _runstate_event: Event
_set_state(state: Runstate) None[source]

Change the Runstate of the protocol connection.

Signals the runstate_changed event.

async _stop_server() None[source]

Stop listening for / accepting new incoming connections.

async _incoming(reader: StreamReader, writer: StreamWriter) None[source]

Accept an incoming connection and signal the upper_half.

This method does the minimum necessary to accept a single incoming connection. It signals back to the upper_half ASAP so that any errors during session initialization can occur naturally in the caller’s stack.

Parameters:
async _do_start_server(address: str | Tuple[str, int], ssl: SSLContext | None = None) None[source]

Start listening for an incoming connection, but do not wait for a peer.

This method starts listening for an incoming connection, but does not block waiting for a peer. This call will return immediately after binding and listening to a socket. A later call to accept() must be made in order to finalize the incoming connection.

Parameters:
  • address – Address to listen on; UNIX socket path or TCP address/port.

  • ssl – SSL context to use, if any.

Raises:

OSError – For stream-related errors.

async _do_accept() None[source]

Wait for and accept an incoming connection.

Requires that we have not yet accepted an incoming connection from the upper_half, but it’s OK if the server is no longer running because the bottom_half has already accepted the connection.

async _do_connect(address: str | Tuple[str, int] | socket, ssl: SSLContext | None = None) None[source]

Acting as the transport client, initiate a connection to a server.

Parameters:
  • address – Address to connect to; UNIX socket path or TCP address/port.

  • ssl – SSL context to use, if any.

Raises:

OSError – For stream-related errors.

async _establish_session() None[source]

Establish a new session.

Starts the readers/writer tasks; subclasses may perform their own negotiations here. The Runstate will be RUNNING upon successful conclusion.

_schedule_disconnect() None[source]

Initiate a disconnect; idempotent.

This method is used both in the upper-half as a direct consequence of disconnect(), and in the bottom-half in the case of unhandled exceptions in the reader/writer tasks.

It can be invoked no matter what the runstate is.

async _wait_disconnect() None[source]

Waits for a previously scheduled disconnect to finish.

This method will gather any bottom half exceptions and re-raise the one that occurred first; presuming it to be the root cause of any subsequent Exceptions. It is intended to be used in the upper half of the call chain.

Raises:

Exception – Arbitrary exception re-raised on behalf of the reader/writer.

_cleanup() None[source]

Fully reset this object to a clean state and return to IDLE.

async _bh_disconnect() None[source]

Disconnect and cancel all outstanding tasks.

It is designed to be called from its task context, _dc_task. By running in its own task, it is free to wait on any pending actions that may still need to occur in either the reader or writer tasks.

async _bh_flush_writer() None[source]
async _bh_close_stream(error_pathway: bool = False) None[source]
async _bh_loop_forever(async_fn: Callable[[], Awaitable[None]], name: str) None[source]

Run one of the bottom-half methods in a loop forever.

If the bottom half ever raises any exception, schedule a disconnect that will terminate the entire loop.

Parameters:
  • async_fn – The bottom-half method to run in a loop.

  • name – The name of this task, used for logging.

async _bh_send_message() None[source]

Wait for an outgoing message, then send it.

Designed to be run in _bh_loop_forever().

async _bh_recv_message() None[source]

Wait for an incoming message and call _on_message to route it.

Designed to be run in _bh_loop_forever().

_cb_outbound(msg: T) T[source]

Callback: outbound message hook.

This is intended for subclasses to be able to add arbitrary hooks to filter or manipulate outgoing messages. The base implementation does nothing but log the message without any manipulation of the message.

Parameters:

msg – raw outbound message

Returns:

final outbound message

_cb_inbound(msg: T) T[source]

Callback: inbound message hook.

This is intended for subclasses to be able to add arbitrary hooks to filter or manipulate incoming messages. The base implementation does nothing but log the message without any manipulation of the message.

This method does not “handle” incoming messages; it is a filter. The actual “endpoint” for incoming messages is _on_message().

Parameters:

msg – raw inbound message

Returns:

processed inbound message

async _readline() bytes[source]

Wait for a newline from the incoming reader.

This method is provided as a convenience for upper-layer protocols, as many are line-based.

This method may return a sequence of bytes without a trailing newline if EOF occurs, but some bytes were received. In this case, the next call will raise EOFError. It is assumed that the layer 5 protocol will decide if there is anything meaningful to be done with a partial message.

Raises:
  • OSError – For stream-related errors.

  • EOFError – If the reader stream is at EOF and there are no bytes to return.

Returns:

bytes, including the newline.

async _do_recv() T[source]

Abstract: Read from the stream and return a message.

Very low-level; intended to only be called by _recv().

async _recv() T[source]

Read an arbitrary protocol message.

Warning

This method is intended primarily for _bh_recv_message() to use in an asynchronous task loop. Using it outside of this loop will “steal” messages from the normal routing mechanism. It is safe to use prior to _establish_session(), but should not be used otherwise.

This method uses _do_recv() to retrieve the raw message, and then transforms it using _cb_inbound().

Returns:

A single (filtered, processed) protocol message.

_do_send(msg: T) None[source]

Abstract: Write a message to the stream.

Very low-level; intended to only be called by _send().

async _send(msg: T) None[source]

Send an arbitrary protocol message.

This method will transform any outgoing messages according to _cb_outbound().

Warning

Like _recv(), this method is intended to be called by the writer task loop that processes outgoing messages. Calling it directly may circumvent logic implemented by the caller meant to correlate outgoing and incoming messages.

Raises:

OSError – For problems with the underlying stream.

async _on_message(msg: T) None[source]

Called to handle the receipt of a new message.

Caution

This is executed from within the reader loop, so be advised that waiting on either the reader or writer task will lead to deadlock. Additionally, any unhandled exceptions will directly cause the loop to halt, so logic may be best-kept to a minimum if at all possible.

Parameters:

msg – The incoming message, already logged/filtered.