Overview

QEMU Monitor Protocol (QMP) development library & tooling.

This package provides a fairly low-level class for communicating asynchronously with QMP protocol servers, as implemented by QEMU, the QEMU Guest Agent, and the QEMU Storage Daemon.

QMPClient provides the main functionality of this package. All errors raised by this library derive from QMPError, see qmp.error for additional detail. See qmp.events for an in-depth tutorial on managing QMP events.

Classes

QMPClient

class qemu.qmp.QMPClient(name: str | None = None)[source]

Bases: AsyncProtocol[Message], Events

Implements a QMP client connection.

QMPClient can be used to either connect or listen to a QMP server, but always acts as the QMP client.

Parameters:

name – Optional nickname for the connection, used to differentiate instances when logging.

Basic script-style usage looks like this:

import asyncio
from qemu.qmp import QMPClient

async def main():
    qmp = QMPClient('my_virtual_machine_name')
    await qmp.connect(('127.0.0.1', 1234))
    ...
    res = await qmp.execute('query-block')
    ...
    await qmp.disconnect()

asyncio.run(main())

A more advanced example that starts to take advantage of asyncio might look like this:

class Client:
    def __init__(self, name: str):
        self.qmp = QMPClient(name)

    async def watch_events(self):
        try:
            async for event in self.qmp.events:
                print(f"Event: {event['event']}")
        except asyncio.CancelledError:
            return

    async def run(self, address='/tmp/qemu.socket'):
        await self.qmp.connect(address)
        asyncio.create_task(self.watch_events())
        await self.qmp.runstate_changed.wait()
        await self.disconnect()

See qmp.events for more detail on event handling patterns.

logger: logging.Logger = <Logger qemu.qmp.qmp_client (WARNING)>

Logger object used for debugging messages.

await_greeting: bool

Whether or not to await a greeting after establishing a connection. Defaults to True; QGA servers expect this to be False.

negotiate: bool

Whether or not to perform capabilities negotiation upon connection. Implies await_greeting. Defaults to True; QGA servers expect this to be False.

property greeting: Greeting | None

The Greeting from the QMP server, if any.

Defaults to None, and will be set after a greeting is received during the connection process. It is reset at the start of each connection attempt.

async execute_msg(msg: Message) object[source]

Execute a QMP command on the server and return its value.

Parameters:

msg – The QMP Message to execute.

Returns:

The command execution return value from the server. The type of object returned depends on the command that was issued, though most in QEMU return a dict.

Raises:
  • ValueError – If the QMP Message does not have either the ‘execute’ or ‘exec-oob’ fields set.

  • ExecuteError – When the server returns an error response.

  • ExecInterruptedError – If the connection was disrupted before receiving a reply from the server.

classmethod make_execute_msg(cmd: str, arguments: Mapping[str, object] | None = None, oob: bool = False) Message[source]

Create an executable message to be sent by execute_msg later.

Parameters:
  • cmd – QMP command name.

  • arguments – Arguments (if any). Must be JSON-serializable.

  • oob – If True, execute “out of band”.

Returns:

A QMP Message that can be executed with execute_msg().

async execute(cmd: str, arguments: Mapping[str, object] | None = None, oob: bool = False) object[source]

Execute a QMP command on the server and return its value.

Parameters:
  • cmd – QMP command name.

  • arguments – Arguments (if any). Must be JSON-serializable.

  • oob

    If True, execute “out of band”.

Returns:

The command execution return value from the server. The type of object returned depends on the command that was issued, though most in QEMU return a dict.

Raises:
  • ExecuteError – When the server returns an error response.

  • ExecInterruptedError – If the connection was disrupted before receiving a reply from the server.

send_fd_scm(fd: int) None[source]

Send a file descriptor to the remote via SCM_RIGHTS.

This method does not close the file descriptor.

Parameters:

fd – The file descriptor to send to QEMU.

This is an advanced feature of QEMU where file descriptors can be passed from client to server. This is usually used as a security measure to isolate the QEMU process from being able to open its own files. See the QMP commands getfd and add-fd for more information.

See socket.socket.sendmsg for more information on the Python implementation for sending file descriptors over a UNIX socket.

async accept() None

Accept an incoming connection and begin processing message queues.

Used after a previous call to start_server() to accept an incoming connection. If this call fails, runstate is guaranteed to be set back to IDLE.

Raises:
async connect(address: str | Tuple[str, int] | socket, ssl: SSLContext | None = None) None

Connect to the server and begin processing message queues.

If this call fails, runstate is guaranteed to be set back to IDLE.

Parameters:
  • address – Address to connect to; UNIX socket path or TCP address/port.

  • ssl – SSL context to use, if any.

Raises:
  • StateError – When the Runstate is not IDLE.

  • ConnectError

    When a connection or session cannot be established.

    This exception will wrap a more concrete one. In most cases, the wrapped exception will be OSError or EOFError. If a protocol-level failure occurs while establishing a new session, the wrapped error may also be an QMPError.

async disconnect() None

Disconnect and wait for all tasks to fully stop.

If there was an exception that caused the reader/writers to terminate prematurely, it will be raised here.

Raises:

Exception – When the reader or writer terminate unexpectedly. You can expect to see EOFError if the server hangs up, or OSError for connection-related issues. If there was a QMP protocol-level problem, ProtocolError will be seen.

listen(*listeners: EventListener) Iterator[None]

Context manager: Temporarily listen with an EventListener.

Accepts one or more EventListener objects and registers them, activating them for the duration of the context block.

EventListener objects will have any pending events in their FIFO queue cleared upon exiting the context block, when they are deactivated.

Parameters:

*listeners – One or more EventListeners to activate.

Raises:

ListenerError – If the given listener(s) are already active.

listener(names: str | Iterable[str] | None = (), event_filter: Callable[[Message], bool] | None = None) Iterator[EventListener]

Context manager: Temporarily listen with a new EventListener.

Creates an EventListener object and registers it, activating it for the duration of the context block.

Parameters:
  • names – One or more names of events to listen for. When not provided, listen for ALL events.

  • event_filter – An optional event filtering function. When names are also provided, this acts as a secondary filter.

Returns:

The newly created and active EventListener.

property name: str | None

The nickname for this connection, if any.

This name is used for differentiating instances in debug output.

register_listener(listener: EventListener) None

Register and activate an EventListener.

Parameters:

listener – The listener to activate.

Raises:

ListenerError – If the given listener is already registered.

remove_listener(listener: EventListener) None

Unregister and deactivate an EventListener.

The removed listener will have its pending events cleared via clear(). The listener can be re-registered later when desired.

Parameters:

listener – The listener to deactivate.

Raises:

ListenerError – If the given listener is not registered.

property runstate: Runstate

The current Runstate of the connection.

async runstate_changed() Runstate

Wait for the runstate to change, then return that Runstate.

async start_server(address: str | Tuple[str, int], ssl: SSLContext | None = None) None

Start listening for an incoming connection, but do not wait for a peer.

This method starts listening for an incoming connection, but does not block waiting for a peer. This call will return immediately after binding and listening on a socket. A later call to accept() must be made in order to finalize the incoming connection.

Parameters:
  • address – Address to listen on; UNIX socket path or TCP address/port.

  • ssl – SSL context to use, if any.

Raises:
  • StateError – When the Runstate is not IDLE.

  • ConnectError

    When the server could not start listening on this address.

    This exception will wrap a more concrete one. In most cases, the wrapped exception will be OSError.

async start_server_and_accept(address: str | Tuple[str, int], ssl: SSLContext | None = None) None

Accept a connection and begin processing message queues.

If this call fails, runstate is guaranteed to be set back to IDLE. This method is precisely equivalent to calling start_server() followed by accept().

Parameters:
  • address – Address to listen on; UNIX socket path or TCP address/port.

  • ssl – SSL context to use, if any.

Raises:
  • StateError – When the Runstate is not IDLE.

  • ConnectError

    When a connection or session cannot be established.

    This exception will wrap a more concrete one. In most cases, the wrapped exception will be OSError or EOFError. If a protocol-level failure occurs while establishing a new session, the wrapped error may also be a QMPError.

events: EventListener

Default, all-events EventListener. See qmp.events for more info.

Message

class qemu.qmp.Message(value: bytes | Mapping[str, object] = b'{}', *, eager: bool = True)[source]

Bases: MutableMapping[str, object]

Represents a single QMP protocol message.

QMP uses JSON objects as its basic communicative unit; so this Python object is a MutableMapping. It may be instantiated from either another mapping (like a dict), or from raw bytes that still need to be deserialized.

Once instantiated, it may be treated like any other MutableMapping:

>>> msg = Message(b'{"hello": "world"}')
>>> assert msg['hello'] == 'world'
>>> msg['id'] = 'foobar'
>>> print(msg)
{
  "hello": "world",
  "id": "foobar"
}

It can be converted to bytes:

>>> msg = Message({"hello": "world"})
>>> print(bytes(msg))
b'{"hello":"world","id":"foobar"}'

Or back into a garden-variety dict:

>>> dict(msg)
{'hello': 'world'}

Or pretty-printed:

>>> print(str(msg))
{
  "hello": "world"
}
Parameters:
  • value – Initial value, if any.

  • eager – When True, attempt to serialize or deserialize the initial value immediately, so that conversion exceptions are raised during the call to __init__().

EventListener

class qemu.qmp.EventListener(names: str | Iterable[str] | None = None, event_filter: Callable[[Message], bool] | None = None)[source]

Selectively listens for events with runtime configurable filtering.

This class is designed to be directly usable for the most common cases, but it can be extended to provide more rigorous control.

Parameters:
  • names – One or more names of events to listen for. When not provided, listen for ALL events.

  • event_filter – An optional event filtering function. When names are also provided, this acts as a secondary filter.

When names and event_filter are both provided, the names will be filtered first, and then the filter function will be called second. The event filter function can assume that the format of the event is a known format.

names: Set[str]

Primary event filter, based on one or more event names.

event_filter: Callable[[Message], bool] | None

Optional, secondary event filter.

property history: Tuple[Message, ...]

A read-only history of all events seen so far.

This represents every event, including those not yet witnessed via get() or async for. It persists between clear() calls and is immutable.

accept(event: Message) bool[source]

Determine if this listener accepts this event.

This method determines which events will appear in the stream. The default implementation simply checks the event against the list of names and the event_filter to decide if this EventListener accepts a given event. It can be overridden/extended to provide custom listener behavior.

User code is not expected to need to invoke this method.

Parameters:

event – The event under consideration.

Returns:

True, if this listener accepts this event.

async put(event: Message) None[source]

Conditionally put a new event into the FIFO queue.

This method is not designed to be invoked from user code, and it should not need to be overridden. It is a public interface so that QMPClient has an interface by which it can inform registered listeners of new events.

The event will be put into the queue if accept() returns True.

Parameters:

event – The new event to put into the FIFO queue.

async get() Message[source]

Wait for the very next event in this stream.

If one is already available, return that one.

empty() bool[source]

Return True if there are no pending events.

clear() List[Message][source]

Clear this listener of all pending events.

Called when an EventListener is being unregistered, this clears the pending FIFO queue synchronously. It can be also be used to manually clear any pending events, if desired.

Returns:

The cleared events, if any.

Warning

Take care when discarding events. Cleared events will be silently tossed on the floor. All events that were ever accepted by this listener are visible in history().

Runstate

class qemu.qmp.Runstate(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Protocol session runstate.

IDLE = 0

Fully quiesced and disconnected.

CONNECTING = 1

In the process of connecting or establishing a session.

RUNNING = 2

Fully connected and active session.

DISCONNECTING = 3

In the process of disconnecting. Runstate may be returned to IDLE by calling disconnect().

Exceptions

exception qemu.qmp.QMPError[source]

Bases: Exception

Abstract error class for all errors originating from this package.

exception qemu.qmp.StateError(error_message: str, state: Runstate, required: Runstate)[source]

Bases: QMPError

An API command (connect, execute, etc) was issued at an inappropriate time.

This error is raised when a command like connect() is called when the client is already connected.

Parameters:
  • error_message – Human-readable string describing the state violation.

  • state – The actual Runstate seen at the time of the violation.

  • required – The Runstate required to process this command.

exception qemu.qmp.ConnectError(error_message: str, exc: Exception)[source]

Bases: QMPError

Raised when the initial connection process has failed.

This Exception always wraps a “root cause” exception that can be interrogated for additional information.

For example, when connecting to a non-existent socket:

await qmp.connect('not_found.sock')
# ConnectError: Failed to establish connection:
#               [Errno 2] No such file or directory
Parameters:
  • error_message – Human-readable string describing the error.

  • exc – The root-cause exception.

error_message: str

Human-readable error string

exc: Exception

Wrapped root cause exception

exception qemu.qmp.ExecuteError(error_response: ErrorResponse, sent: Message, received: Message)[source]

Bases: QMPError

Exception raised by QMPClient.execute() on RPC failure.

This exception is raised when the server received, interpreted, and replied to a command successfully; but the command itself returned a failure status.

For example:

await qmp.execute('block-dirty-bitmap-add',
                  {'node': 'foo', 'name': 'my_bitmap'})
# qemu.qmp.qmp_client.ExecuteError:
#     Cannot find device='foo' nor node-name='foo'
Parameters:
  • error_response – The RPC error response object.

  • sent – The sent RPC message that caused the failure.

  • received – The raw RPC error reply received.

sent: Message

The sent Message that caused the failure

received: Message

The received Message that indicated failure

error: ErrorResponse

The parsed error response

property error_class: str

The QMP error class

exception qemu.qmp.ExecInterruptedError[source]

Bases: QMPError

Exception raised by execute() (et al) when an RPC is interrupted.

This error is raised when an execute() statement could not be completed. This can occur because the connection itself was terminated before a reply was received. The true cause of the interruption will be available via disconnect().

The QMP protocol does not make it possible to know if a command succeeded or failed after such an event; the client will need to query the server to determine the state of the server on a case-by-case basis.

For example, ECONNRESET might look like this:

try:
    await qmp.execute('query-block')
    # ExecInterruptedError: Disconnected
except ExecInterruptedError:
    await qmp.disconnect()
    # ConnectionResetError: [Errno 104] Connection reset by peer