Asyncio Protocol
Generic Asynchronous Message-based Protocol Support
This module provides a generic framework for sending and receiving
messages over an asyncio stream. AsyncProtocol
is an abstract class
that implements the core mechanisms of a simple send/receive protocol,
and is designed to be extended.
In this package, it is used as the implementation for the QMPClient
class.
- class qemu.qmp.protocol.Runstate(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
Enum
Protocol session runstate.
- IDLE = 0
Fully quiesced and disconnected.
- CONNECTING = 1
In the process of connecting or establishing a session.
- RUNNING = 2
Fully connected and active session.
- DISCONNECTING = 3
In the process of disconnecting. Runstate may be returned to
IDLE
by callingdisconnect()
.
- exception qemu.qmp.protocol.ConnectError(error_message: str, exc: Exception)[source]
Bases:
QMPError
Raised when the initial connection process has failed.
This Exception always wraps a “root cause” exception that can be interrogated for additional information.
For example, when connecting to a non-existent socket:
await qmp.connect('not_found.sock') # ConnectError: Failed to establish connection: # [Errno 2] No such file or directory
- Parameters:
error_message – Human-readable string describing the error.
exc – The root-cause exception.
- exception qemu.qmp.protocol.StateError(error_message: str, state: Runstate, required: Runstate)[source]
Bases:
QMPError
An API command (connect, execute, etc) was issued at an inappropriate time.
This error is raised when a command like
connect()
is called when the client is already connected.
- qemu.qmp.protocol.require(required_state: Runstate) Callable[[F], F] [source]
Decorator: protect a method so it can only be run in a certain
Runstate
.- Parameters:
required_state – The
Runstate
required to invoke this method.- Raises:
StateError – When the required
Runstate
is not met.
- class qemu.qmp.protocol.AsyncProtocol(name: str | None = None)[source]
Bases:
Generic
[T
]AsyncProtocol implements a generic async message-based protocol.
This protocol assumes the basic unit of information transfer between client and server is a “message”, the details of which are left up to the implementation. It assumes the sending and receiving of these messages is full-duplex and not necessarily correlated; i.e. it supports asynchronous inbound messages.
It is designed to be extended by a specific protocol which provides the implementations for how to read and send messages. These must be defined in
_do_recv()
and_do_send()
, respectively.Other callbacks have a default implementation, but are intended to be either extended or overridden:
_establish_session
:The base implementation starts the reader/writer tasks. A protocol implementation can override this call, inserting actions to be taken prior to starting the reader/writer tasks before the super() call; actions needing to occur afterwards can be written after the super() call.
_on_message
:Actions to be performed when a message is received.
_cb_outbound
:Logging/Filtering hook for all outbound messages.
_cb_inbound
:Logging/Filtering hook for all inbound messages. This hook runs before
_on_message()
.
- Parameters:
name – Name used for logging messages, if any. By default, messages will log to ‘qemu.qmp.protocol’, but each individual connection can be given its own logger by giving it a name; messages will then log to ‘qemu.qmp.protocol.${name}’.
- logger = <Logger qemu.qmp.protocol (WARNING)>
Logger object for debugging messages from this connection.
- _limit = 65536
- _dc_task: Future[None] | None
Disconnect task. The disconnect implementation runs in a task so that asynchronous disconnects (initiated by the reader/writer) are allowed to wait for the reader/writers to exit.
- property name: str | None
The nickname for this connection, if any.
This name is used for differentiating instances in debug output.
- async start_server_and_accept(address: str | Tuple[str, int], ssl: SSLContext | None = None) None [source]
Accept a connection and begin processing message queues.
If this call fails,
runstate
is guaranteed to be set back toIDLE
. This method is precisely equivalent to callingstart_server()
followed byaccept()
.- Parameters:
address – Address to listen on; UNIX socket path or TCP address/port.
ssl – SSL context to use, if any.
- Raises:
StateError – When the
Runstate
is notIDLE
.When a connection or session cannot be established.
This exception will wrap a more concrete one. In most cases, the wrapped exception will be
OSError
orEOFError
. If a protocol-level failure occurs while establishing a new session, the wrapped error may also be aQMPError
.
- async start_server(address: str | Tuple[str, int], ssl: SSLContext | None = None) None [source]
Start listening for an incoming connection, but do not wait for a peer.
This method starts listening for an incoming connection, but does not block waiting for a peer. This call will return immediately after binding and listening on a socket. A later call to
accept()
must be made in order to finalize the incoming connection.- Parameters:
address – Address to listen on; UNIX socket path or TCP address/port.
ssl – SSL context to use, if any.
- Raises:
StateError – When the
Runstate
is notIDLE
.When the server could not start listening on this address.
This exception will wrap a more concrete one. In most cases, the wrapped exception will be
OSError
.
- async accept() None [source]
Accept an incoming connection and begin processing message queues.
Used after a previous call to
start_server()
to accept an incoming connection. If this call fails,runstate
is guaranteed to be set back toIDLE
.- Raises:
StateError – When the
Runstate
is notCONNECTING
.QMPError – When
start_server()
was not called first.When a connection or session cannot be established.
This exception will wrap a more concrete one. In most cases, the wrapped exception will be
OSError
orEOFError
. If a protocol-level failure occurs while establishing a new session, the wrapped error may also be anQMPError
.
- async connect(address: str | Tuple[str, int] | socket, ssl: SSLContext | None = None) None [source]
Connect to the server and begin processing message queues.
If this call fails,
runstate
is guaranteed to be set back toIDLE
.- Parameters:
address – Address to connect to; UNIX socket path or TCP address/port.
ssl – SSL context to use, if any.
- Raises:
StateError – When the
Runstate
is notIDLE
.When a connection or session cannot be established.
This exception will wrap a more concrete one. In most cases, the wrapped exception will be
OSError
orEOFError
. If a protocol-level failure occurs while establishing a new session, the wrapped error may also be anQMPError
.
- async disconnect() None [source]
Disconnect and wait for all tasks to fully stop.
If there was an exception that caused the reader/writers to terminate prematurely, it will be raised here.
- Raises:
Exception – When the reader or writer terminate unexpectedly. You can expect to see
EOFError
if the server hangs up, orOSError
for connection-related issues. If there was a QMP protocol-level problem,ProtocolError
will be seen.
- _session_guard(emsg: str) AsyncGenerator[None, None] [source]
Async guard function used to roll back to
IDLE
on any error.On any Exception, the state machine will be reset back to
IDLE
. Most Exceptions will be wrapped withConnectError
, butBaseException
events will be left alone (This includes asyncio.CancelledError, even prior to Python 3.8).- Parameters:
error_message – Human-readable string describing what connection phase failed.
- Raises:
BaseException – When
BaseException
occurs in the guarded block.ConnectError – When any other error is encountered in the guarded block.
- property _runstate_event: Event
- _set_state(state: Runstate) None [source]
Change the
Runstate
of the protocol connection.Signals the
runstate_changed
event.
- async _incoming(reader: StreamReader, writer: StreamWriter) None [source]
Accept an incoming connection and signal the upper_half.
This method does the minimum necessary to accept a single incoming connection. It signals back to the upper_half ASAP so that any errors during session initialization can occur naturally in the caller’s stack.
- Parameters:
reader – Incoming
asyncio.StreamReader
writer – Incoming
asyncio.StreamWriter
- async _do_start_server(address: str | Tuple[str, int], ssl: SSLContext | None = None) None [source]
Start listening for an incoming connection, but do not wait for a peer.
This method starts listening for an incoming connection, but does not block waiting for a peer. This call will return immediately after binding and listening to a socket. A later call to accept() must be made in order to finalize the incoming connection.
- Parameters:
address – Address to listen on; UNIX socket path or TCP address/port.
ssl – SSL context to use, if any.
- Raises:
OSError – For stream-related errors.
- async _do_accept() None [source]
Wait for and accept an incoming connection.
Requires that we have not yet accepted an incoming connection from the upper_half, but it’s OK if the server is no longer running because the bottom_half has already accepted the connection.
- async _do_connect(address: str | Tuple[str, int] | socket, ssl: SSLContext | None = None) None [source]
Acting as the transport client, initiate a connection to a server.
- Parameters:
address – Address to connect to; UNIX socket path or TCP address/port.
ssl – SSL context to use, if any.
- Raises:
OSError – For stream-related errors.
- async _establish_session() None [source]
Establish a new session.
Starts the readers/writer tasks; subclasses may perform their own negotiations here. The Runstate will be RUNNING upon successful conclusion.
- _schedule_disconnect() None [source]
Initiate a disconnect; idempotent.
This method is used both in the upper-half as a direct consequence of
disconnect()
, and in the bottom-half in the case of unhandled exceptions in the reader/writer tasks.It can be invoked no matter what the
runstate
is.
- async _wait_disconnect() None [source]
Waits for a previously scheduled disconnect to finish.
This method will gather any bottom half exceptions and re-raise the one that occurred first; presuming it to be the root cause of any subsequent Exceptions. It is intended to be used in the upper half of the call chain.
- Raises:
Exception – Arbitrary exception re-raised on behalf of the reader/writer.
- async _bh_disconnect() None [source]
Disconnect and cancel all outstanding tasks.
It is designed to be called from its task context,
_dc_task
. By running in its own task, it is free to wait on any pending actions that may still need to occur in either the reader or writer tasks.
- async _bh_loop_forever(async_fn: Callable[[], Awaitable[None]], name: str) None [source]
Run one of the bottom-half methods in a loop forever.
If the bottom half ever raises any exception, schedule a disconnect that will terminate the entire loop.
- Parameters:
async_fn – The bottom-half method to run in a loop.
name – The name of this task, used for logging.
- async _bh_send_message() None [source]
Wait for an outgoing message, then send it.
Designed to be run in
_bh_loop_forever()
.
- async _bh_recv_message() None [source]
Wait for an incoming message and call
_on_message
to route it.Designed to be run in
_bh_loop_forever()
.
- _cb_outbound(msg: T) T [source]
Callback: outbound message hook.
This is intended for subclasses to be able to add arbitrary hooks to filter or manipulate outgoing messages. The base implementation does nothing but log the message without any manipulation of the message.
- Parameters:
msg – raw outbound message
- Returns:
final outbound message
- _cb_inbound(msg: T) T [source]
Callback: inbound message hook.
This is intended for subclasses to be able to add arbitrary hooks to filter or manipulate incoming messages. The base implementation does nothing but log the message without any manipulation of the message.
This method does not “handle” incoming messages; it is a filter. The actual “endpoint” for incoming messages is
_on_message()
.- Parameters:
msg – raw inbound message
- Returns:
processed inbound message
- async _readline() bytes [source]
Wait for a newline from the incoming reader.
This method is provided as a convenience for upper-layer protocols, as many are line-based.
This method may return a sequence of bytes without a trailing newline if EOF occurs, but some bytes were received. In this case, the next call will raise
EOFError
. It is assumed that the layer 5 protocol will decide if there is anything meaningful to be done with a partial message.
- async _do_recv() T [source]
Abstract: Read from the stream and return a message.
Very low-level; intended to only be called by
_recv()
.
- async _recv() T [source]
Read an arbitrary protocol message.
Warning
This method is intended primarily for
_bh_recv_message()
to use in an asynchronous task loop. Using it outside of this loop will “steal” messages from the normal routing mechanism. It is safe to use prior to_establish_session()
, but should not be used otherwise.This method uses
_do_recv()
to retrieve the raw message, and then transforms it using_cb_inbound()
.- Returns:
A single (filtered, processed) protocol message.
- _do_send(msg: T) None [source]
Abstract: Write a message to the stream.
Very low-level; intended to only be called by
_send()
.
- async _send(msg: T) None [source]
Send an arbitrary protocol message.
This method will transform any outgoing messages according to
_cb_outbound()
.Warning
Like
_recv()
, this method is intended to be called by the writer task loop that processes outgoing messages. Calling it directly may circumvent logic implemented by the caller meant to correlate outgoing and incoming messages.- Raises:
OSError – For problems with the underlying stream.
- async _on_message(msg: T) None [source]
Called to handle the receipt of a new message.
Caution
This is executed from within the reader loop, so be advised that waiting on either the reader or writer task will lead to deadlock. Additionally, any unhandled exceptions will directly cause the loop to halt, so logic may be best-kept to a minimum if at all possible.
- Parameters:
msg – The incoming message, already logged/filtered.