Package sharklocal

sharklocal — Local control library for Shark robot vacuums.

Designed for use with Home Assistant integrations. Supports both REST and MQTT transports with YAML-configurable action mappings per vacuum model.

Basic usage::

from sharklocal import VacuumClient

async with VacuumClient(
    "192.168.1.100",
    rest_mapping="sharkiq_v1",
    mqtt_mapping="sharkiq_v1",
) as vacuum:
    status = await vacuum.get_status()
    print(status.mode, status.battery_level)
    await vacuum.start_cleaning()

Direct transport access::

from sharklocal import RESTVacuumClient, load_rest_mapping

mapping = load_rest_mapping("sharkiq_v1")
client = RESTVacuumClient("192.168.1.100", mapping)
status = await client.call("get_status")
await client.close()

Sub-modules

sharklocal.client

Unified VacuumClient with automatic transport selection.

sharklocal.exceptions

Custom exceptions for the sharklocal library.

sharklocal.mappings

Mapping loader utilities for REST and MQTT transport configurations.

sharklocal.models

Data models for vacuum state and device information.

sharklocal.mqtt_client

Async MQTT client for local vacuum control.

sharklocal.protobuf

Pure-Python protobuf decoder for SharkIQ MQTT messages …

sharklocal.rest_client

Async REST client for local vacuum control.

Functions

def list_mqtt_mappings(search_paths: Optional[List[Union[str, Path]]] = None) ‑> List[str]
Expand source code
def list_mqtt_mappings(
    search_paths: Optional[List[Union[str, Path]]] = None,
) -> List[str]:
    """Return names of all available MQTT mappings."""
    names: set[str] = set()
    for directory in [_BUILTIN_MQTT_DIR, *[Path(p) for p in (search_paths or [])]]:
        if directory.is_dir():
            names.update(p.stem for p in directory.glob("*.yaml"))
    return sorted(names)

Return names of all available MQTT mappings.

def list_rest_mappings(search_paths: Optional[List[Union[str, Path]]] = None) ‑> List[str]
Expand source code
def list_rest_mappings(
    search_paths: Optional[List[Union[str, Path]]] = None,
) -> List[str]:
    """Return names of all available REST mappings."""
    names: set[str] = set()
    for directory in [_BUILTIN_REST_DIR, *[Path(p) for p in (search_paths or [])]]:
        if directory.is_dir():
            names.update(p.stem for p in directory.glob("*.yaml"))
    return sorted(names)

Return names of all available REST mappings.

def load_mqtt_mapping(name: str, search_paths: Optional[List[Union[str, Path]]] = None) ‑> MQTTMappingConfig
Expand source code
def load_mqtt_mapping(
    name: str,
    search_paths: Optional[List[Union[str, Path]]] = None,
) -> MQTTMappingConfig:
    """Load an MQTT mapping by name.

    Searches the built-in mappings directory first, then any additional
    paths provided in *search_paths*.

    Args:
        name: Mapping filename stem (e.g. ``"sharkiq_v1"``).
        search_paths: Optional list of additional directories to search.

    Raises:
        MappingNotFoundError: If no matching YAML file is found.
    """
    for directory in [_BUILTIN_MQTT_DIR, *[Path(p) for p in (search_paths or [])]]:
        candidate = directory / f"{name}.yaml"
        if candidate.is_file():
            with open(candidate) as f:
                data = yaml.safe_load(f)
            return MQTTMappingConfig.from_dict(data)
    raise MappingNotFoundError(
        f"MQTT mapping '{name}' not found. "
        f"Available built-in mappings: {list_mqtt_mappings()}"
    )

Load an MQTT mapping by name.

Searches the built-in mappings directory first, then any additional paths provided in search_paths.

Args

name
Mapping filename stem (e.g. "sharkiq_v1").
search_paths
Optional list of additional directories to search.

Raises

MappingNotFoundError
If no matching YAML file is found.
def load_rest_mapping(name: str, search_paths: Optional[List[Union[str, Path]]] = None) ‑> RESTMappingConfig
Expand source code
def load_rest_mapping(
    name: str,
    search_paths: Optional[List[Union[str, Path]]] = None,
) -> RESTMappingConfig:
    """Load a REST mapping by name.

    Searches the built-in mappings directory first, then any additional
    paths provided in *search_paths*.

    Args:
        name: Mapping filename stem (e.g. ``"sharkiq_v1"``).
        search_paths: Optional list of additional directories to search.

    Raises:
        MappingNotFoundError: If no matching YAML file is found.
    """
    for directory in [_BUILTIN_REST_DIR, *[Path(p) for p in (search_paths or [])]]:
        candidate = directory / f"{name}.yaml"
        if candidate.is_file():
            with open(candidate) as f:
                data = yaml.safe_load(f)
            return RESTMappingConfig.from_dict(data)
    raise MappingNotFoundError(
        f"REST mapping '{name}' not found. "
        f"Available built-in mappings: {list_rest_mappings()}"
    )

Load a REST mapping by name.

Searches the built-in mappings directory first, then any additional paths provided in search_paths.

Args

name
Mapping filename stem (e.g. "sharkiq_v1").
search_paths
Optional list of additional directories to search.

Raises

MappingNotFoundError
If no matching YAML file is found.
def register_decoder(name: str) ‑> Callable
Expand source code
def register_decoder(name: str) -> Callable:
    """Decorator to register a named MQTT status decoder function.

    Usage::

        @register_decoder("my_model_v1")
        def _decode_my_model(payload: bytes, modes: Dict[int, str]) -> VacuumStatus:
            ...
    """

    def decorator(fn: Callable) -> Callable:
        _STATUS_DECODERS[name] = fn
        return fn

    return decorator

Decorator to register a named MQTT status decoder function.

Usage::

@register_decoder("my_model_v1")
def _decode_my_model(payload: bytes, modes: Dict[int, str]) -> VacuumStatus:
    ...

Classes

class ActionNotSupportedError (*args, **kwargs)
Expand source code
class ActionNotSupportedError(SharklocalError):
    """Raised when an action is not defined in the configured transport mapping."""

Raised when an action is not defined in the configured transport mapping.

Ancestors

class CommandError (*args, **kwargs)
Expand source code
class CommandError(SharklocalError):
    """Raised when a command fails to execute or receives an error response."""

Raised when a command fails to execute or receives an error response.

Ancestors

class ConnectError (*args, **kwargs)
Expand source code
class ConnectError(SharklocalError):
    """Raised when a connection to the vacuum cannot be established."""

Raised when a connection to the vacuum cannot be established.

Ancestors

class DecoderError (*args, **kwargs)
Expand source code
class DecoderError(SharklocalError):
    """Raised when a response payload cannot be decoded."""

Raised when a response payload cannot be decoded.

Ancestors

class DeviceInfo (firmware: Optional[str] = None,
mac_address: Optional[str] = None,
ip_address: Optional[str] = None,
ssid: Optional[str] = None,
rssi: Optional[int] = None,
raw: Dict[str, Any] = <factory>)
Expand source code
@dataclass
class DeviceInfo:
    """Device identity and connectivity information."""

    firmware: Optional[str] = None
    mac_address: Optional[str] = None
    ip_address: Optional[str] = None
    ssid: Optional[str] = None
    rssi: Optional[int] = None
    raw: Dict[str, Any] = field(default_factory=dict)

Device identity and connectivity information.

Instance variables

var firmware : str | None

The type of the None singleton.

var ip_address : str | None

The type of the None singleton.

var mac_address : str | None

The type of the None singleton.

var raw : Dict[str, Any]

The type of the None singleton.

var rssi : int | None

The type of the None singleton.

var ssid : str | None

The type of the None singleton.

class MQTTVacuumClient (host: str, mapping: MQTTMappingConfig)
Expand source code
class MQTTVacuumClient:
    """Async MQTT client for local vacuum control.

    Requires ``aiomqtt`` (``pip install aiomqtt``).
    """

    def __init__(self, host: str, mapping: MQTTMappingConfig) -> None:
        self.host = host
        self.mapping = mapping

    def supports(self, action: str) -> bool:
        """Return ``True`` if the mapping defines *action*."""
        return action in self.mapping.actions

    def _decode_incoming(self, raw_payload: bytes) -> bytes:
        """Decode a received MQTT payload per the mapping's encoding setting."""
        if self.mapping.encoding == "base64":
            return base64.b64decode(raw_payload)
        return raw_payload

    def _decode_status(self, raw_payload: bytes) -> VacuumStatus:
        """Decode a raw MQTT payload into a normalized ``VacuumStatus``."""
        decoder = _STATUS_DECODERS.get(self.mapping.status_decoder)
        if decoder is None:
            raise DecoderError(
                f"No decoder registered for '{self.mapping.status_decoder}'. "
                f"Available decoders: {list(_STATUS_DECODERS)}"
            )
        payload = self._decode_incoming(raw_payload)
        return decoder(payload, self.mapping.modes)

    async def call(self, action: str) -> Any:
        """Execute a named action from the mapping.

        Args:
            action: Action name as defined in the mapping (e.g. ``"start_cleaning"``).

        Returns:
            ``True`` for ``command`` actions, or a ``VacuumStatus`` for
            ``status_request`` actions.

        Raises:
            ActionNotSupportedError: If *action* is not in the mapping.
            ConnectError: If the MQTT broker cannot be reached.
            CommandError: If a status response is not received within the timeout.
        """
        if not self.supports(action):
            raise ActionNotSupportedError(
                f"MQTT mapping '{self.mapping.id}' does not support '{action}'"
            )

        spec = self.mapping.actions[action]

        try:
            import aiomqtt
        except ImportError as exc:
            raise ConnectError(
                "aiomqtt is required for MQTT support. "
                "Install with: pip install aiomqtt"
            ) from exc

        try:
            if spec.type == "command":
                async with aiomqtt.Client(self.host, port=self.mapping.port) as client:
                    await client.publish(self.mapping.command_topic, payload=spec.payload)
                return True

            if spec.type == "status_request":
                return await self._request_status(spec.payload, spec.timeout)

        except (ActionNotSupportedError, CommandError, ConnectError, DecoderError):
            raise
        except Exception as exc:
            raise ConnectError(
                f"MQTT error connecting to {self.host}:{self.mapping.port}: {exc}"
            ) from exc

        raise CommandError(f"Unrecognised MQTT action type '{spec.type}'")

    async def _request_status(self, command_payload: str, timeout: float) -> VacuumStatus:
        """Publish a status-request command and return the decoded first response."""
        try:
            import aiomqtt
        except ImportError as exc:
            raise ConnectError("aiomqtt is required for MQTT support") from exc

        async with aiomqtt.Client(self.host, port=self.mapping.port) as client:
            await client.subscribe(self.mapping.status_topic)
            await client.publish(self.mapping.command_topic, payload=command_payload)
            try:
                async with asyncio.timeout(timeout):
                    async for message in client.messages:
                        return self._decode_status(bytes(message.payload))
            except TimeoutError:
                raise CommandError(
                    f"Timed out after {timeout}s waiting for MQTT status response"
                )

        raise CommandError("No status message received from vacuum")

    async def monitor(
        self,
        callback: Callable[[VacuumStatus], None],
        *,
        stop_event: Optional[asyncio.Event] = None,
    ) -> None:
        """Subscribe to the vacuum's status topic and invoke *callback* per update.

        Runs indefinitely until *stop_event* is set or the task is cancelled.
        Both synchronous and ``async`` callbacks are supported.

        Args:
            callback: Called with each decoded ``VacuumStatus``.
            stop_event: Optional ``asyncio.Event``; when set, monitoring stops
                cleanly after the current message.
        """
        try:
            import aiomqtt
        except ImportError as exc:
            raise ConnectError("aiomqtt is required for MQTT support") from exc

        try:
            async with aiomqtt.Client(self.host, port=self.mapping.port) as client:
                await client.subscribe(self.mapping.status_topic)
                async for message in client.messages:
                    if stop_event and stop_event.is_set():
                        return
                    try:
                        status = self._decode_status(bytes(message.payload))
                    except (DecoderError, CommandError):
                        continue  # Skip malformed messages silently
                    if asyncio.iscoroutinefunction(callback):
                        await callback(status)
                    else:
                        callback(status)

        except (ActionNotSupportedError, CommandError, ConnectError, DecoderError):
            raise
        except Exception as exc:
            raise ConnectError(
                f"MQTT monitor lost connection to {self.host}: {exc}"
            ) from exc

Async MQTT client for local vacuum control.

Requires aiomqtt (pip install aiomqtt).

Methods

async def call(self, action: str) ‑> Any
Expand source code
async def call(self, action: str) -> Any:
    """Execute a named action from the mapping.

    Args:
        action: Action name as defined in the mapping (e.g. ``"start_cleaning"``).

    Returns:
        ``True`` for ``command`` actions, or a ``VacuumStatus`` for
        ``status_request`` actions.

    Raises:
        ActionNotSupportedError: If *action* is not in the mapping.
        ConnectError: If the MQTT broker cannot be reached.
        CommandError: If a status response is not received within the timeout.
    """
    if not self.supports(action):
        raise ActionNotSupportedError(
            f"MQTT mapping '{self.mapping.id}' does not support '{action}'"
        )

    spec = self.mapping.actions[action]

    try:
        import aiomqtt
    except ImportError as exc:
        raise ConnectError(
            "aiomqtt is required for MQTT support. "
            "Install with: pip install aiomqtt"
        ) from exc

    try:
        if spec.type == "command":
            async with aiomqtt.Client(self.host, port=self.mapping.port) as client:
                await client.publish(self.mapping.command_topic, payload=spec.payload)
            return True

        if spec.type == "status_request":
            return await self._request_status(spec.payload, spec.timeout)

    except (ActionNotSupportedError, CommandError, ConnectError, DecoderError):
        raise
    except Exception as exc:
        raise ConnectError(
            f"MQTT error connecting to {self.host}:{self.mapping.port}: {exc}"
        ) from exc

    raise CommandError(f"Unrecognised MQTT action type '{spec.type}'")

Execute a named action from the mapping.

Args

action
Action name as defined in the mapping (e.g. "start_cleaning").

Returns

True for command actions, or a VacuumStatus for status_request actions.

Raises

ActionNotSupportedError
If action is not in the mapping.
ConnectError
If the MQTT broker cannot be reached.
CommandError
If a status response is not received within the timeout.
async def monitor(self,
callback: Callable[[VacuumStatus], None],
*,
stop_event: Optional[asyncio.Event] = None) ‑> None
Expand source code
async def monitor(
    self,
    callback: Callable[[VacuumStatus], None],
    *,
    stop_event: Optional[asyncio.Event] = None,
) -> None:
    """Subscribe to the vacuum's status topic and invoke *callback* per update.

    Runs indefinitely until *stop_event* is set or the task is cancelled.
    Both synchronous and ``async`` callbacks are supported.

    Args:
        callback: Called with each decoded ``VacuumStatus``.
        stop_event: Optional ``asyncio.Event``; when set, monitoring stops
            cleanly after the current message.
    """
    try:
        import aiomqtt
    except ImportError as exc:
        raise ConnectError("aiomqtt is required for MQTT support") from exc

    try:
        async with aiomqtt.Client(self.host, port=self.mapping.port) as client:
            await client.subscribe(self.mapping.status_topic)
            async for message in client.messages:
                if stop_event and stop_event.is_set():
                    return
                try:
                    status = self._decode_status(bytes(message.payload))
                except (DecoderError, CommandError):
                    continue  # Skip malformed messages silently
                if asyncio.iscoroutinefunction(callback):
                    await callback(status)
                else:
                    callback(status)

    except (ActionNotSupportedError, CommandError, ConnectError, DecoderError):
        raise
    except Exception as exc:
        raise ConnectError(
            f"MQTT monitor lost connection to {self.host}: {exc}"
        ) from exc

Subscribe to the vacuum's status topic and invoke callback per update.

Runs indefinitely until stop_event is set or the task is cancelled. Both synchronous and async callbacks are supported.

Args

callback
Called with each decoded VacuumStatus.
stop_event
Optional asyncio.Event; when set, monitoring stops cleanly after the current message.
def supports(self, action: str) ‑> bool
Expand source code
def supports(self, action: str) -> bool:
    """Return ``True`` if the mapping defines *action*."""
    return action in self.mapping.actions

Return True if the mapping defines action.

class MappingNotFoundError (*args, **kwargs)
Expand source code
class MappingNotFoundError(SharklocalError):
    """Raised when a named mapping file cannot be located."""

Raised when a named mapping file cannot be located.

Ancestors

class ProbeResult (rest_mapping: Optional[str] = None, mqtt_mapping: Optional[str] = None)
Expand source code
@dataclass
class ProbeResult:
    """Result of a :meth:`VacuumClient.probe` call."""

    rest_mapping: Optional[str] = None
    """``id`` of the REST mapping that responded successfully, or ``None``."""

    mqtt_mapping: Optional[str] = None
    """``id`` of the MQTT mapping that responded successfully, or ``None``."""

    @property
    def has_rest(self) -> bool:
        """``True`` if a working REST mapping was found."""
        return self.rest_mapping is not None

    @property
    def has_mqtt(self) -> bool:
        """``True`` if a working MQTT mapping was found."""
        return self.mqtt_mapping is not None

    @property
    def is_connected(self) -> bool:
        """``True`` if at least one transport responded successfully."""
        return self.has_rest or self.has_mqtt

Result of a :meth:VacuumClient.probe() call.

Instance variables

prop has_mqtt : bool
Expand source code
@property
def has_mqtt(self) -> bool:
    """``True`` if a working MQTT mapping was found."""
    return self.mqtt_mapping is not None

True if a working MQTT mapping was found.

prop has_rest : bool
Expand source code
@property
def has_rest(self) -> bool:
    """``True`` if a working REST mapping was found."""
    return self.rest_mapping is not None

True if a working REST mapping was found.

prop is_connected : bool
Expand source code
@property
def is_connected(self) -> bool:
    """``True`` if at least one transport responded successfully."""
    return self.has_rest or self.has_mqtt

True if at least one transport responded successfully.

var mqtt_mapping : str | None

id of the MQTT mapping that responded successfully, or None.

var rest_mapping : str | None

id of the REST mapping that responded successfully, or None.

class RESTVacuumClient (host: str, mapping: RESTMappingConfig)
Expand source code
class RESTVacuumClient:
    """Async HTTP/HTTPS client for local vacuum control via the REST API.

    Transport (``http`` vs ``https``) and SSL verification are driven by the
    mapping configuration, so different models can use different settings
    without code changes.
    """

    def __init__(self, host: str, mapping: RESTMappingConfig) -> None:
        self.host = host
        self.mapping = mapping
        self._session: Optional[aiohttp.ClientSession] = None

    @property
    def base_url(self) -> str:
        return f"{self.mapping.transport}://{self.host}:{self.mapping.port}"

    def supports(self, action: str) -> bool:
        """Return ``True`` if the mapping defines *action*."""
        return action in self.mapping.actions

    def _make_connector(self) -> aiohttp.TCPConnector:
        """Build a ``TCPConnector`` with SSL settings from the mapping."""
        if self.mapping.transport == "http":
            return aiohttp.TCPConnector()
        if not self.mapping.verify_ssl:
            ctx = ssl.create_default_context()
            ctx.check_hostname = False
            ctx.verify_mode = ssl.CERT_NONE
            return aiohttp.TCPConnector(ssl=ctx)
        return aiohttp.TCPConnector()

    async def _get_session(self) -> aiohttp.ClientSession:
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession(connector=self._make_connector())
        return self._session

    async def close(self) -> None:
        """Close the underlying HTTP session."""
        if self._session and not self._session.closed:
            await self._session.close()
            self._session = None

    async def call(self, action: str) -> Any:
        """Execute a named action and return a normalized result.

        Args:
            action: Action name as defined in the mapping (e.g. ``"get_status"``).

        Returns:
            A normalized model object for query actions, or ``True`` for
            fire-and-forget command actions.

        Raises:
            ActionNotSupportedError: If *action* is not in the mapping.
            ConnectError: If the vacuum host cannot be reached.
            CommandError: If the vacuum returns an HTTP error response.
        """
        if not self.supports(action):
            raise ActionNotSupportedError(
                f"REST mapping '{self.mapping.id}' does not support '{action}'"
            )

        spec = self.mapping.actions[action]
        url = f"{self.base_url}{spec.path}"
        session = await self._get_session()

        try:
            async with session.request(
                method=spec.method,
                url=url,
                json=spec.body,
                headers=spec.headers,
            ) as resp:
                resp.raise_for_status()

                if spec.response_map:
                    data = await resp.json(content_type=None)
                    return self._parse_response(spec.response_map, data)

                # Command endpoints may return minimal or empty bodies.
                try:
                    return await resp.json(content_type=None)
                except Exception:
                    return True

        except aiohttp.ClientConnectorError as exc:
            raise ConnectError(
                f"Cannot connect to vacuum at {self.host}:{self.mapping.port}"
            ) from exc
        except aiohttp.ClientResponseError as exc:
            raise CommandError(
                f"REST request to {spec.path} failed with HTTP {exc.status}"
            ) from exc

    # ------------------------------------------------------------------
    # Response parsers
    # ------------------------------------------------------------------

    def _parse_response(self, response_map: str, data: Dict[str, Any]) -> Any:
        """Dispatch to the appropriate parser for *response_map*."""
        parsers = {
            "status": self._parse_status,
            "events": self._parse_events,
            "robot_id": self._parse_robot_id,
            "wifi_status": self._parse_wifi_status,
        }
        parser = parsers.get(response_map)
        return parser(data) if parser else data

    def _parse_status(self, data: Dict[str, Any]) -> VacuumStatus:
        raw_mode = str(data.get("mode", "")).lower()
        charging_raw = str(data.get("charging", "")).lower()

        # The API returns "connected" when charging and "unconnected" when not.
        charging = charging_raw == "connected"

        # "ready" is context-dependent: combined with "unconnected" it means the
        # vacuum is stopped and away from the dock (e.g. paused mid-run), not docked.
        if raw_mode == "ready" and not charging:
            mode = VacuumMode.IDLE
        else:
            mode_str = self.mapping.mode_map.get(raw_mode, "unknown")
            try:
                mode = VacuumMode(mode_str)
            except ValueError:
                mode = VacuumMode.UNKNOWN

        return VacuumStatus(
            mode=mode,
            battery_level=data.get("battery_level"),
            charging=charging,
            raw=data,
        )

    def _parse_events(self, data: Dict[str, Any]) -> List[VacuumEvent]:
        return [
            VacuumEvent(
                id=evt.get("id", 0),
                type=evt.get("type", ""),
                type_id=evt.get("type_id", 0),
                timestamp=evt.get("timestamp", {}),
                current_status=evt.get("current_status", ""),
                source_type=evt.get("source_type", ""),
                raw=evt,
            )
            for evt in data.get("robot_events", [])
        ]

    def _parse_robot_id(self, data: Dict[str, Any]) -> DeviceInfo:
        # Per the API docs, use the top-level 'firmware' value for diagnostics.
        # Per-device entries in the 'devices' array can be ignored.
        firmware = data.get("firmware") or None
        return DeviceInfo(firmware=firmware, raw=data)

    def _parse_wifi_status(self, data: Dict[str, Any]) -> DeviceInfo:
        return DeviceInfo(
            mac_address=data.get("mac_address"),
            ip_address=data.get("ip_address"),
            ssid=data.get("ssid"),
            rssi=data.get("rssi"),
            raw=data,
        )

Async HTTP/HTTPS client for local vacuum control via the REST API.

Transport (http vs https) and SSL verification are driven by the mapping configuration, so different models can use different settings without code changes.

Instance variables

prop base_url : str
Expand source code
@property
def base_url(self) -> str:
    return f"{self.mapping.transport}://{self.host}:{self.mapping.port}"

Methods

async def call(self, action: str) ‑> Any
Expand source code
async def call(self, action: str) -> Any:
    """Execute a named action and return a normalized result.

    Args:
        action: Action name as defined in the mapping (e.g. ``"get_status"``).

    Returns:
        A normalized model object for query actions, or ``True`` for
        fire-and-forget command actions.

    Raises:
        ActionNotSupportedError: If *action* is not in the mapping.
        ConnectError: If the vacuum host cannot be reached.
        CommandError: If the vacuum returns an HTTP error response.
    """
    if not self.supports(action):
        raise ActionNotSupportedError(
            f"REST mapping '{self.mapping.id}' does not support '{action}'"
        )

    spec = self.mapping.actions[action]
    url = f"{self.base_url}{spec.path}"
    session = await self._get_session()

    try:
        async with session.request(
            method=spec.method,
            url=url,
            json=spec.body,
            headers=spec.headers,
        ) as resp:
            resp.raise_for_status()

            if spec.response_map:
                data = await resp.json(content_type=None)
                return self._parse_response(spec.response_map, data)

            # Command endpoints may return minimal or empty bodies.
            try:
                return await resp.json(content_type=None)
            except Exception:
                return True

    except aiohttp.ClientConnectorError as exc:
        raise ConnectError(
            f"Cannot connect to vacuum at {self.host}:{self.mapping.port}"
        ) from exc
    except aiohttp.ClientResponseError as exc:
        raise CommandError(
            f"REST request to {spec.path} failed with HTTP {exc.status}"
        ) from exc

Execute a named action and return a normalized result.

Args

action
Action name as defined in the mapping (e.g. "get_status").

Returns

A normalized model object for query actions, or True for fire-and-forget command actions.

Raises

ActionNotSupportedError
If action is not in the mapping.
ConnectError
If the vacuum host cannot be reached.
CommandError
If the vacuum returns an HTTP error response.
async def close(self) ‑> None
Expand source code
async def close(self) -> None:
    """Close the underlying HTTP session."""
    if self._session and not self._session.closed:
        await self._session.close()
        self._session = None

Close the underlying HTTP session.

def supports(self, action: str) ‑> bool
Expand source code
def supports(self, action: str) -> bool:
    """Return ``True`` if the mapping defines *action*."""
    return action in self.mapping.actions

Return True if the mapping defines action.

class SharklocalError (*args, **kwargs)
Expand source code
class SharklocalError(Exception):
    """Base exception for all sharklocal errors."""

Base exception for all sharklocal errors.

Ancestors

  • builtins.Exception
  • builtins.BaseException

Subclasses

class VacuumClient (host: str,
*,
rest_mappings: Optional[Union[str, List[str]]] = None,
mqtt_mappings: Optional[Union[str, List[str]]] = None,
mapping_search_paths: Optional[List[Union[str, Path]]] = None)
Expand source code
class VacuumClient:
    """Unified vacuum client with automatic transport selection.

    REST is preferred when both transports support an action. MQTT is used as a
    fallback only when REST is unreachable (``ConnectError``). If a transport is
    not configured, or does not define an action, it is skipped automatically.

    Pass a list of mapping names to enable auto-detection: call :meth:`probe`
    during setup to test each mapping against the device and pin the working one
    for all subsequent calls. With a single mapping per transport the client
    works immediately without probing.

    Example — single mapping (no probe required)::

        async with VacuumClient(
            "192.168.1.100",
            rest_mappings="sharkiq_v1",
            mqtt_mappings="sharkiq_v1",
        ) as vacuum:
            status = await vacuum.get_status()
            await vacuum.start_cleaning()

    Example — multiple candidates with probe::

        async with VacuumClient(
            "192.168.1.100",
            rest_mappings=["sharkiq_v1", "other_model_v1"],
            mqtt_mappings=["sharkiq_v1"],
        ) as vacuum:
            result = await vacuum.probe()
            print(result.rest_mapping)   # e.g. "sharkiq_v1"
            status = await vacuum.get_status()

    Args:
        host: IP address or hostname of the vacuum.
        rest_mappings: One REST mapping name, or a list of candidates to probe.
        mqtt_mappings: One MQTT mapping name, or a list of candidates to probe.
        mapping_search_paths: Additional directories to search for mapping files
            before falling back to built-in mappings.
    """

    def __init__(
        self,
        host: str,
        *,
        rest_mappings: Optional[Union[str, List[str]]] = None,
        mqtt_mappings: Optional[Union[str, List[str]]] = None,
        mapping_search_paths: Optional[List[Union[str, Path]]] = None,
    ) -> None:
        self.host = host
        paths = mapping_search_paths or []

        # Active (pinned) transport clients. Set immediately when a single
        # mapping is provided; set by probe() when multiple are configured.
        self._rest: Optional[RESTVacuumClient] = None
        self._mqtt: Optional[MQTTVacuumClient] = None
        self._status_callback: Optional[Callable[[VacuumStatus], None]] = None
        self._monitor_stop: Optional[asyncio.Event] = None
        self._monitor_task: Optional[asyncio.Task] = None

        # Primary transport in use. "REST" when REST is pinned and reachable,
        # "MQTT" when only MQTT is available, "NONE" until probe() confirms a
        # working transport (or a single mapping is auto-pinned).
        self.via: str = "NONE"

        def _to_list(val: Optional[Union[str, List[str]]]) -> List[str]:
            if val is None:
                return []
            return [val] if isinstance(val, str) else list(val)

        self._rest_candidates: List[RESTVacuumClient] = [
            RESTVacuumClient(host, load_rest_mapping(name, paths))
            for name in _to_list(rest_mappings)
        ]
        self._mqtt_candidates: List[MQTTVacuumClient] = [
            MQTTVacuumClient(host, load_mqtt_mapping(name, paths))
            for name in _to_list(mqtt_mappings)
        ]

        # Auto-pin when only one candidate is loaded — no probe() required.
        if len(self._rest_candidates) == 1:
            self._rest = self._rest_candidates[0]
            self.via = "REST"
        if len(self._mqtt_candidates) == 1:
            self._mqtt = self._mqtt_candidates[0]
            if self.via == "NONE":
                self.via = "MQTT"

    # ------------------------------------------------------------------
    # Context manager
    # ------------------------------------------------------------------

    async def __aenter__(self) -> VacuumClient:
        return self

    async def __aexit__(self, *_: Any) -> None:
        await self.close()

    async def close(self) -> None:
        """Stop monitoring and close all underlying connections."""
        await self.stop_monitoring()
        for client in self._rest_candidates:
            await client.close()

    # ------------------------------------------------------------------
    # High-level action API
    # ------------------------------------------------------------------

    async def start_cleaning(self) -> bool:
        """Start a full cleaning run."""
        return await self._execute("start_cleaning")

    async def stop(self) -> bool:
        """Pause the current cleaning task."""
        return await self._execute("stop")

    async def go_home(self) -> bool:
        """Send the vacuum back to its dock."""
        return await self._execute("go_home")

    async def explore(self) -> bool:
        """Begin a mapping/exploration run."""
        return await self._execute("explore")

    async def get_status(self) -> VacuumStatus:
        """Return the current vacuum status."""
        return await self._execute("get_status")

    async def get_events(self) -> List[VacuumEvent]:
        """Return the event log since last startup."""
        return await self._execute("get_events")

    async def get_device_info(self) -> DeviceInfo:
        """Return firmware and device identity information."""
        return await self._execute("get_robot_id")

    async def get_wifi_status(self) -> DeviceInfo:
        """Return Wi-Fi connection details including MAC address."""
        return await self._execute("get_wifi_status")

    # ------------------------------------------------------------------
    # Mapping probe
    # ------------------------------------------------------------------

    async def probe(self) -> ProbeResult:
        """Test all configured mappings and pin the best working one per transport.

        Each REST mapping is tested in the order supplied by calling
        ``get_status``. The first mapping that responds successfully is pinned
        as the active REST transport. The same process is then repeated for
        MQTT. Previously pinned transports are replaced if probe is called again.

        Call this during integration setup when multiple mapping candidates are
        configured. With a single mapping per transport the client works without
        calling probe.

        Returns:
            :class:`ProbeResult` with the ``id`` of each selected mapping, or
            ``None`` for a transport where no mapping succeeded.
        """
        rest_id: Optional[str] = None
        mqtt_id: Optional[str] = None

        best_rest: Optional[RESTVacuumClient] = None
        for client in self._rest_candidates:
            try:
                await client.call("get_status")
                if best_rest is None or client.mapping.priority > best_rest.mapping.priority:
                    best_rest = client
            except SharklocalError:
                continue

        if best_rest is not None:
            self._rest = best_rest
            rest_id = best_rest.mapping.id

        best_mqtt: Optional[MQTTVacuumClient] = None
        for client in self._mqtt_candidates:
            try:
                await client.call("get_status")
                if best_mqtt is None or client.mapping.priority > best_mqtt.mapping.priority:
                    best_mqtt = client
            except SharklocalError:
                continue

        if best_mqtt is not None:
            self._mqtt = best_mqtt
            mqtt_id = best_mqtt.mapping.id

        if rest_id is not None:
            self.via = "REST"
        elif mqtt_id is not None:
            self.via = "MQTT"
        else:
            self.via = "NONE"

        return ProbeResult(rest_mapping=rest_id, mqtt_mapping=mqtt_id)

    # ------------------------------------------------------------------
    # Real-time monitoring (MQTT only)
    # ------------------------------------------------------------------

    def on_status_update(self, callback: Callable[[VacuumStatus], None]) -> None:
        """Register a callback to receive real-time status updates via MQTT.

        The callback receives a ``VacuumStatus`` on every status message.
        Both synchronous and ``async`` callables are accepted.

        Must be called before :meth:`start_monitoring`.
        """
        if not self._mqtt_candidates:
            raise SharklocalError(
                "An MQTT mapping is required for real-time monitoring"
            )
        self._status_callback = callback

    async def start_monitoring(self) -> None:
        """Begin monitoring the vacuum status in the background via MQTT.

        Call :meth:`on_status_update` first to register a callback. When
        multiple MQTT mappings are configured, call :meth:`probe` first to
        determine the active mapping. Safe to call repeatedly; a second call
        while already running is a no-op.
        """
        if not self._mqtt_candidates:
            raise SharklocalError("An MQTT mapping is required for monitoring")
        if self._mqtt is None:
            raise SharklocalError(
                "Call probe() first to determine the active MQTT mapping "
                "when multiple MQTT mappings are configured"
            )
        if self._status_callback is None:
            raise SharklocalError(
                "Register a callback with on_status_update() before starting monitoring"
            )
        if self._monitor_task and not self._monitor_task.done():
            return  # Already running

        self._monitor_stop = asyncio.Event()
        self._monitor_task = asyncio.ensure_future(
            self._mqtt.monitor(self._status_callback, stop_event=self._monitor_stop)
        )

    async def stop_monitoring(self) -> None:
        """Stop background status monitoring if it is running."""
        if self._monitor_stop:
            self._monitor_stop.set()
        if self._monitor_task and not self._monitor_task.done():
            self._monitor_task.cancel()
            try:
                await self._monitor_task
            except (asyncio.CancelledError, Exception):
                pass
        self._monitor_task = None
        self._monitor_stop = None

    # ------------------------------------------------------------------
    # Transport inspection
    # ------------------------------------------------------------------

    def supported_actions(self) -> List[str]:
        """Return all action names supported by any configured transport."""
        actions: set[str] = set()
        rest_opts = [self._rest] if self._rest else self._rest_candidates
        mqtt_opts = [self._mqtt] if self._mqtt else self._mqtt_candidates
        for client in rest_opts:
            actions.update(client.mapping.actions)
        for client in mqtt_opts:
            actions.update(client.mapping.actions)
        return sorted(actions)

    def transports_for(self, action: str) -> List[str]:
        """Return which transports support *action*, in evaluation priority order.

        REST is listed before MQTT. When multiple candidates exist for a
        transport, any candidate supporting the action counts.
        """
        result = []
        rest_opts = [self._rest] if self._rest else self._rest_candidates
        mqtt_opts = [self._mqtt] if self._mqtt else self._mqtt_candidates
        if any(c.supports(action) for c in rest_opts):
            result.append("rest")
        if any(c.supports(action) for c in mqtt_opts):
            result.append("mqtt")
        return result

    @property
    def active_rest_mapping(self) -> Optional[str]:
        """The ``id`` of the currently pinned REST mapping, or ``None``."""
        return self._rest.mapping.id if self._rest else None

    @property
    def active_mqtt_mapping(self) -> Optional[str]:
        """The ``id`` of the currently pinned MQTT mapping, or ``None``."""
        return self._mqtt.mapping.id if self._mqtt else None

    # ------------------------------------------------------------------
    # Internal transport evaluation
    # ------------------------------------------------------------------

    async def _execute(self, action: str) -> Any:
        """Execute *action* using the best available transport.

        When a transport has been pinned (single mapping configured, or after
        :meth:`probe` ran), only that client is tried for its transport.
        When no client is pinned (multiple candidates, probe not yet called),
        all candidates for that transport are tried in the supplied order.

        Evaluation order:

        1. REST candidates — tried left to right; ``ConnectError`` moves to the
           next candidate. Any other exception propagates immediately.
        2. MQTT candidates — tried when all REST candidates raise ``ConnectError``
           or no REST candidate supports the action.

        The ``ConnectError`` from the last exhausted candidate is re-raised when
        no transport can complete the action.
        """
        rest_options: List[RESTVacuumClient] = (
            [self._rest] if self._rest is not None else self._rest_candidates
        )
        mqtt_options: List[MQTTVacuumClient] = (
            [self._mqtt] if self._mqtt is not None else self._mqtt_candidates
        )
        rest_options = [c for c in rest_options if c.supports(action)]
        mqtt_options = [c for c in mqtt_options if c.supports(action)]

        if not rest_options and not mqtt_options:
            raise ActionNotSupportedError(
                f"Action '{action}' is not supported by any configured mapping. "
                f"Supported actions: {self.supported_actions()}"
            )

        last_connect_error: Optional[ConnectError] = None

        for client in rest_options:
            try:
                return await client.call(action)
            except ConnectError as exc:
                last_connect_error = exc

        for client in mqtt_options:
            try:
                return await client.call(action)
            except ConnectError as exc:
                last_connect_error = exc

        if last_connect_error:
            raise last_connect_error
        raise ActionNotSupportedError(  # pragma: no cover
            f"No configured transport could execute action '{action}'"  # pragma: no cover
        )  # pragma: no cover

Unified vacuum client with automatic transport selection.

REST is preferred when both transports support an action. MQTT is used as a fallback only when REST is unreachable (ConnectError). If a transport is not configured, or does not define an action, it is skipped automatically.

Pass a list of mapping names to enable auto-detection: call :meth:probe during setup to test each mapping against the device and pin the working one for all subsequent calls. With a single mapping per transport the client works immediately without probing.

Example — single mapping (no probe required)::

async with VacuumClient(
    "192.168.1.100",
    rest_mappings="sharkiq_v1",
    mqtt_mappings="sharkiq_v1",
) as vacuum:
    status = await vacuum.get_status()
    await vacuum.start_cleaning()

Example — multiple candidates with probe::

async with VacuumClient(
    "192.168.1.100",
    rest_mappings=["sharkiq_v1", "other_model_v1"],
    mqtt_mappings=["sharkiq_v1"],
) as vacuum:
    result = await vacuum.probe()
    print(result.rest_mapping)   # e.g. "sharkiq_v1"
    status = await vacuum.get_status()

Args

host
IP address or hostname of the vacuum.
rest_mappings
One REST mapping name, or a list of candidates to probe.
mqtt_mappings
One MQTT mapping name, or a list of candidates to probe.
mapping_search_paths
Additional directories to search for mapping files before falling back to built-in mappings.

Instance variables

prop active_mqtt_mapping : Optional[str]
Expand source code
@property
def active_mqtt_mapping(self) -> Optional[str]:
    """The ``id`` of the currently pinned MQTT mapping, or ``None``."""
    return self._mqtt.mapping.id if self._mqtt else None

The id of the currently pinned MQTT mapping, or None.

prop active_rest_mapping : Optional[str]
Expand source code
@property
def active_rest_mapping(self) -> Optional[str]:
    """The ``id`` of the currently pinned REST mapping, or ``None``."""
    return self._rest.mapping.id if self._rest else None

The id of the currently pinned REST mapping, or None.

Methods

async def close(self) ‑> None
Expand source code
async def close(self) -> None:
    """Stop monitoring and close all underlying connections."""
    await self.stop_monitoring()
    for client in self._rest_candidates:
        await client.close()

Stop monitoring and close all underlying connections.

async def explore(self) ‑> bool
Expand source code
async def explore(self) -> bool:
    """Begin a mapping/exploration run."""
    return await self._execute("explore")

Begin a mapping/exploration run.

async def get_device_info(self) ‑> DeviceInfo
Expand source code
async def get_device_info(self) -> DeviceInfo:
    """Return firmware and device identity information."""
    return await self._execute("get_robot_id")

Return firmware and device identity information.

async def get_events(self) ‑> List[VacuumEvent]
Expand source code
async def get_events(self) -> List[VacuumEvent]:
    """Return the event log since last startup."""
    return await self._execute("get_events")

Return the event log since last startup.

async def get_status(self) ‑> VacuumStatus
Expand source code
async def get_status(self) -> VacuumStatus:
    """Return the current vacuum status."""
    return await self._execute("get_status")

Return the current vacuum status.

async def get_wifi_status(self) ‑> DeviceInfo
Expand source code
async def get_wifi_status(self) -> DeviceInfo:
    """Return Wi-Fi connection details including MAC address."""
    return await self._execute("get_wifi_status")

Return Wi-Fi connection details including MAC address.

async def go_home(self) ‑> bool
Expand source code
async def go_home(self) -> bool:
    """Send the vacuum back to its dock."""
    return await self._execute("go_home")

Send the vacuum back to its dock.

def on_status_update(self,
callback: Callable[[VacuumStatus], None]) ‑> None
Expand source code
def on_status_update(self, callback: Callable[[VacuumStatus], None]) -> None:
    """Register a callback to receive real-time status updates via MQTT.

    The callback receives a ``VacuumStatus`` on every status message.
    Both synchronous and ``async`` callables are accepted.

    Must be called before :meth:`start_monitoring`.
    """
    if not self._mqtt_candidates:
        raise SharklocalError(
            "An MQTT mapping is required for real-time monitoring"
        )
    self._status_callback = callback

Register a callback to receive real-time status updates via MQTT.

The callback receives a VacuumStatus on every status message. Both synchronous and async callables are accepted.

Must be called before :meth:start_monitoring.

async def probe(self) ‑> ProbeResult
Expand source code
async def probe(self) -> ProbeResult:
    """Test all configured mappings and pin the best working one per transport.

    Each REST mapping is tested in the order supplied by calling
    ``get_status``. The first mapping that responds successfully is pinned
    as the active REST transport. The same process is then repeated for
    MQTT. Previously pinned transports are replaced if probe is called again.

    Call this during integration setup when multiple mapping candidates are
    configured. With a single mapping per transport the client works without
    calling probe.

    Returns:
        :class:`ProbeResult` with the ``id`` of each selected mapping, or
        ``None`` for a transport where no mapping succeeded.
    """
    rest_id: Optional[str] = None
    mqtt_id: Optional[str] = None

    best_rest: Optional[RESTVacuumClient] = None
    for client in self._rest_candidates:
        try:
            await client.call("get_status")
            if best_rest is None or client.mapping.priority > best_rest.mapping.priority:
                best_rest = client
        except SharklocalError:
            continue

    if best_rest is not None:
        self._rest = best_rest
        rest_id = best_rest.mapping.id

    best_mqtt: Optional[MQTTVacuumClient] = None
    for client in self._mqtt_candidates:
        try:
            await client.call("get_status")
            if best_mqtt is None or client.mapping.priority > best_mqtt.mapping.priority:
                best_mqtt = client
        except SharklocalError:
            continue

    if best_mqtt is not None:
        self._mqtt = best_mqtt
        mqtt_id = best_mqtt.mapping.id

    if rest_id is not None:
        self.via = "REST"
    elif mqtt_id is not None:
        self.via = "MQTT"
    else:
        self.via = "NONE"

    return ProbeResult(rest_mapping=rest_id, mqtt_mapping=mqtt_id)

Test all configured mappings and pin the best working one per transport.

Each REST mapping is tested in the order supplied by calling get_status. The first mapping that responds successfully is pinned as the active REST transport. The same process is then repeated for MQTT. Previously pinned transports are replaced if probe is called again.

Call this during integration setup when multiple mapping candidates are configured. With a single mapping per transport the client works without calling probe.

Returns

:class:ProbeResult with the id of each selected mapping, or None for a transport where no mapping succeeded.

async def start_cleaning(self) ‑> bool
Expand source code
async def start_cleaning(self) -> bool:
    """Start a full cleaning run."""
    return await self._execute("start_cleaning")

Start a full cleaning run.

async def start_monitoring(self) ‑> None
Expand source code
async def start_monitoring(self) -> None:
    """Begin monitoring the vacuum status in the background via MQTT.

    Call :meth:`on_status_update` first to register a callback. When
    multiple MQTT mappings are configured, call :meth:`probe` first to
    determine the active mapping. Safe to call repeatedly; a second call
    while already running is a no-op.
    """
    if not self._mqtt_candidates:
        raise SharklocalError("An MQTT mapping is required for monitoring")
    if self._mqtt is None:
        raise SharklocalError(
            "Call probe() first to determine the active MQTT mapping "
            "when multiple MQTT mappings are configured"
        )
    if self._status_callback is None:
        raise SharklocalError(
            "Register a callback with on_status_update() before starting monitoring"
        )
    if self._monitor_task and not self._monitor_task.done():
        return  # Already running

    self._monitor_stop = asyncio.Event()
    self._monitor_task = asyncio.ensure_future(
        self._mqtt.monitor(self._status_callback, stop_event=self._monitor_stop)
    )

Begin monitoring the vacuum status in the background via MQTT.

Call :meth:on_status_update first to register a callback. When multiple MQTT mappings are configured, call :meth:probe first to determine the active mapping. Safe to call repeatedly; a second call while already running is a no-op.

async def stop(self) ‑> bool
Expand source code
async def stop(self) -> bool:
    """Pause the current cleaning task."""
    return await self._execute("stop")

Pause the current cleaning task.

async def stop_monitoring(self) ‑> None
Expand source code
async def stop_monitoring(self) -> None:
    """Stop background status monitoring if it is running."""
    if self._monitor_stop:
        self._monitor_stop.set()
    if self._monitor_task and not self._monitor_task.done():
        self._monitor_task.cancel()
        try:
            await self._monitor_task
        except (asyncio.CancelledError, Exception):
            pass
    self._monitor_task = None
    self._monitor_stop = None

Stop background status monitoring if it is running.

def supported_actions(self) ‑> List[str]
Expand source code
def supported_actions(self) -> List[str]:
    """Return all action names supported by any configured transport."""
    actions: set[str] = set()
    rest_opts = [self._rest] if self._rest else self._rest_candidates
    mqtt_opts = [self._mqtt] if self._mqtt else self._mqtt_candidates
    for client in rest_opts:
        actions.update(client.mapping.actions)
    for client in mqtt_opts:
        actions.update(client.mapping.actions)
    return sorted(actions)

Return all action names supported by any configured transport.

def transports_for(self, action: str) ‑> List[str]
Expand source code
def transports_for(self, action: str) -> List[str]:
    """Return which transports support *action*, in evaluation priority order.

    REST is listed before MQTT. When multiple candidates exist for a
    transport, any candidate supporting the action counts.
    """
    result = []
    rest_opts = [self._rest] if self._rest else self._rest_candidates
    mqtt_opts = [self._mqtt] if self._mqtt else self._mqtt_candidates
    if any(c.supports(action) for c in rest_opts):
        result.append("rest")
    if any(c.supports(action) for c in mqtt_opts):
        result.append("mqtt")
    return result

Return which transports support action, in evaluation priority order.

REST is listed before MQTT. When multiple candidates exist for a transport, any candidate supporting the action counts.

class VacuumEvent (id: int,
type: str,
type_id: int,
timestamp: Dict[str, int],
current_status: str,
source_type: str,
raw: Dict[str, Any] = <factory>)
Expand source code
@dataclass
class VacuumEvent:
    """A single event from the vacuum event log."""

    id: int
    type: str
    type_id: int
    timestamp: Dict[str, int]
    current_status: str
    source_type: str
    raw: Dict[str, Any] = field(default_factory=dict)

A single event from the vacuum event log.

Instance variables

var current_status : str

The type of the None singleton.

var id : int

The type of the None singleton.

var raw : Dict[str, Any]

The type of the None singleton.

var source_type : str

The type of the None singleton.

var timestamp : Dict[str, int]

The type of the None singleton.

var type : str

The type of the None singleton.

var type_id : int

The type of the None singleton.

class VacuumMode (*values)
Expand source code
class VacuumMode(str, Enum):
    """Normalized operating modes across all transports."""

    UNKNOWN = "unknown"
    CLEANING = "cleaning"
    RETURNING_TO_DOCK = "returning_to_dock"
    DOCKING = "docking"
    DOCKED = "docked"
    IDLE = "idle"  # Powered on but not cleaning and not on the charging dock
    EXPLORING = "exploring"  # Mapping/exploration run in progress

Normalized operating modes across all transports.

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var CLEANING

The type of the None singleton.

var DOCKED

The type of the None singleton.

var DOCKING

The type of the None singleton.

var EXPLORING

The type of the None singleton.

var IDLE

The type of the None singleton.

var RETURNING_TO_DOCK

The type of the None singleton.

var UNKNOWN

The type of the None singleton.

class VacuumStatus (mode: VacuumMode,
battery_level: Optional[int] = None,
charging: Optional[bool] = None,
raw: Dict[str, Any] = <factory>)
Expand source code
@dataclass
class VacuumStatus:
    """Normalized vacuum status, independent of transport protocol."""

    mode: VacuumMode
    battery_level: Optional[int] = None
    charging: Optional[bool] = None
    raw: Dict[str, Any] = field(default_factory=dict)

    @property
    def is_cleaning(self) -> bool:
        return self.mode == VacuumMode.CLEANING

    @property
    def is_docked(self) -> bool:
        return self.mode in (VacuumMode.DOCKED, VacuumMode.DOCKING)

Normalized vacuum status, independent of transport protocol.

Instance variables

var battery_level : int | None

The type of the None singleton.

var charging : bool | None

The type of the None singleton.

prop is_cleaning : bool
Expand source code
@property
def is_cleaning(self) -> bool:
    return self.mode == VacuumMode.CLEANING
prop is_docked : bool
Expand source code
@property
def is_docked(self) -> bool:
    return self.mode in (VacuumMode.DOCKED, VacuumMode.DOCKING)
var modeVacuumMode

The type of the None singleton.

var raw : Dict[str, Any]

The type of the None singleton.