Package sharklocal
sharklocal — Local control library for Shark robot vacuums.
Designed for use with Home Assistant integrations. Supports both REST and MQTT transports with YAML-configurable action mappings per vacuum model.
Basic usage::
from sharklocal import VacuumClient
async with VacuumClient(
"192.168.1.100",
rest_mapping="sharkiq_v1",
mqtt_mapping="sharkiq_v1",
) as vacuum:
status = await vacuum.get_status()
print(status.mode, status.battery_level)
await vacuum.start_cleaning()
Direct transport access::
from sharklocal import RESTVacuumClient, load_rest_mapping
mapping = load_rest_mapping("sharkiq_v1")
client = RESTVacuumClient("192.168.1.100", mapping)
status = await client.call("get_status")
await client.close()
Sub-modules
sharklocal.client-
Unified VacuumClient with automatic transport selection.
sharklocal.exceptions-
Custom exceptions for the sharklocal library.
sharklocal.mappings-
Mapping loader utilities for REST and MQTT transport configurations.
sharklocal.models-
Data models for vacuum state and device information.
sharklocal.mqtt_client-
Async MQTT client for local vacuum control.
sharklocal.protobuf-
Pure-Python protobuf decoder for SharkIQ MQTT messages …
sharklocal.rest_client-
Async REST client for local vacuum control.
Functions
-
Expand source code
def list_mqtt_mappings( search_paths: Optional[List[Union[str, Path]]] = None, ) -> List[str]: """Return names of all available MQTT mappings.""" names: set[str] = set() for directory in [_BUILTIN_MQTT_DIR, *[Path(p) for p in (search_paths or [])]]: if directory.is_dir(): names.update(p.stem for p in directory.glob("*.yaml")) return sorted(names)Return names of all available MQTT mappings.
-
Expand source code
def list_rest_mappings( search_paths: Optional[List[Union[str, Path]]] = None, ) -> List[str]: """Return names of all available REST mappings.""" names: set[str] = set() for directory in [_BUILTIN_REST_DIR, *[Path(p) for p in (search_paths or [])]]: if directory.is_dir(): names.update(p.stem for p in directory.glob("*.yaml")) return sorted(names)Return names of all available REST mappings.
-
Expand source code
def load_mqtt_mapping( name: str, search_paths: Optional[List[Union[str, Path]]] = None, ) -> MQTTMappingConfig: """Load an MQTT mapping by name. Searches the built-in mappings directory first, then any additional paths provided in *search_paths*. Args: name: Mapping filename stem (e.g. ``"sharkiq_v1"``). search_paths: Optional list of additional directories to search. Raises: MappingNotFoundError: If no matching YAML file is found. """ for directory in [_BUILTIN_MQTT_DIR, *[Path(p) for p in (search_paths or [])]]: candidate = directory / f"{name}.yaml" if candidate.is_file(): with open(candidate) as f: data = yaml.safe_load(f) return MQTTMappingConfig.from_dict(data) raise MappingNotFoundError( f"MQTT mapping '{name}' not found. " f"Available built-in mappings: {list_mqtt_mappings()}" )Load an MQTT mapping by name.
Searches the built-in mappings directory first, then any additional paths provided in search_paths.
Args
name- Mapping filename stem (e.g.
"sharkiq_v1"). search_paths- Optional list of additional directories to search.
Raises
MappingNotFoundError- If no matching YAML file is found.
-
Expand source code
def load_rest_mapping( name: str, search_paths: Optional[List[Union[str, Path]]] = None, ) -> RESTMappingConfig: """Load a REST mapping by name. Searches the built-in mappings directory first, then any additional paths provided in *search_paths*. Args: name: Mapping filename stem (e.g. ``"sharkiq_v1"``). search_paths: Optional list of additional directories to search. Raises: MappingNotFoundError: If no matching YAML file is found. """ for directory in [_BUILTIN_REST_DIR, *[Path(p) for p in (search_paths or [])]]: candidate = directory / f"{name}.yaml" if candidate.is_file(): with open(candidate) as f: data = yaml.safe_load(f) return RESTMappingConfig.from_dict(data) raise MappingNotFoundError( f"REST mapping '{name}' not found. " f"Available built-in mappings: {list_rest_mappings()}" )Load a REST mapping by name.
Searches the built-in mappings directory first, then any additional paths provided in search_paths.
Args
name- Mapping filename stem (e.g.
"sharkiq_v1"). search_paths- Optional list of additional directories to search.
Raises
MappingNotFoundError- If no matching YAML file is found.
-
Expand source code
def register_decoder(name: str) -> Callable: """Decorator to register a named MQTT status decoder function. Usage:: @register_decoder("my_model_v1") def _decode_my_model(payload: bytes, modes: Dict[int, str]) -> VacuumStatus: ... """ def decorator(fn: Callable) -> Callable: _STATUS_DECODERS[name] = fn return fn return decoratorDecorator to register a named MQTT status decoder function.
Usage::
@register_decoder("my_model_v1") def _decode_my_model(payload: bytes, modes: Dict[int, str]) -> VacuumStatus: ...
Classes
-
Expand source code
class ActionNotSupportedError(SharklocalError): """Raised when an action is not defined in the configured transport mapping."""Raised when an action is not defined in the configured transport mapping.
Ancestors
- SharklocalError
- builtins.Exception
- builtins.BaseException
-
Expand source code
class CommandError(SharklocalError): """Raised when a command fails to execute or receives an error response."""Raised when a command fails to execute or receives an error response.
Ancestors
- SharklocalError
- builtins.Exception
- builtins.BaseException
-
Expand source code
class ConnectError(SharklocalError): """Raised when a connection to the vacuum cannot be established."""Raised when a connection to the vacuum cannot be established.
Ancestors
- SharklocalError
- builtins.Exception
- builtins.BaseException
-
Expand source code
class DecoderError(SharklocalError): """Raised when a response payload cannot be decoded."""Raised when a response payload cannot be decoded.
Ancestors
- SharklocalError
- builtins.Exception
- builtins.BaseException
-
Expand source code
@dataclass class DeviceInfo: """Device identity and connectivity information.""" firmware: Optional[str] = None mac_address: Optional[str] = None ip_address: Optional[str] = None ssid: Optional[str] = None rssi: Optional[int] = None raw: Dict[str, Any] = field(default_factory=dict)Device identity and connectivity information.
Instance variables
-
The type of the None singleton.
-
The type of the None singleton.
-
The type of the None singleton.
-
The type of the None singleton.
-
The type of the None singleton.
-
The type of the None singleton.
-
-
Expand source code
class MQTTVacuumClient: """Async MQTT client for local vacuum control. Requires ``aiomqtt`` (``pip install aiomqtt``). """ def __init__(self, host: str, mapping: MQTTMappingConfig) -> None: self.host = host self.mapping = mapping def supports(self, action: str) -> bool: """Return ``True`` if the mapping defines *action*.""" return action in self.mapping.actions def _decode_incoming(self, raw_payload: bytes) -> bytes: """Decode a received MQTT payload per the mapping's encoding setting.""" if self.mapping.encoding == "base64": return base64.b64decode(raw_payload) return raw_payload def _decode_status(self, raw_payload: bytes) -> VacuumStatus: """Decode a raw MQTT payload into a normalized ``VacuumStatus``.""" decoder = _STATUS_DECODERS.get(self.mapping.status_decoder) if decoder is None: raise DecoderError( f"No decoder registered for '{self.mapping.status_decoder}'. " f"Available decoders: {list(_STATUS_DECODERS)}" ) payload = self._decode_incoming(raw_payload) return decoder(payload, self.mapping.modes) async def call(self, action: str) -> Any: """Execute a named action from the mapping. Args: action: Action name as defined in the mapping (e.g. ``"start_cleaning"``). Returns: ``True`` for ``command`` actions, or a ``VacuumStatus`` for ``status_request`` actions. Raises: ActionNotSupportedError: If *action* is not in the mapping. ConnectError: If the MQTT broker cannot be reached. CommandError: If a status response is not received within the timeout. """ if not self.supports(action): raise ActionNotSupportedError( f"MQTT mapping '{self.mapping.id}' does not support '{action}'" ) spec = self.mapping.actions[action] try: import aiomqtt except ImportError as exc: raise ConnectError( "aiomqtt is required for MQTT support. " "Install with: pip install aiomqtt" ) from exc try: if spec.type == "command": async with aiomqtt.Client(self.host, port=self.mapping.port) as client: await client.publish(self.mapping.command_topic, payload=spec.payload) return True if spec.type == "status_request": return await self._request_status(spec.payload, spec.timeout) except (ActionNotSupportedError, CommandError, ConnectError, DecoderError): raise except Exception as exc: raise ConnectError( f"MQTT error connecting to {self.host}:{self.mapping.port}: {exc}" ) from exc raise CommandError(f"Unrecognised MQTT action type '{spec.type}'") async def _request_status(self, command_payload: str, timeout: float) -> VacuumStatus: """Publish a status-request command and return the decoded first response.""" try: import aiomqtt except ImportError as exc: raise ConnectError("aiomqtt is required for MQTT support") from exc async with aiomqtt.Client(self.host, port=self.mapping.port) as client: await client.subscribe(self.mapping.status_topic) await client.publish(self.mapping.command_topic, payload=command_payload) try: async with asyncio.timeout(timeout): async for message in client.messages: return self._decode_status(bytes(message.payload)) except TimeoutError: raise CommandError( f"Timed out after {timeout}s waiting for MQTT status response" ) raise CommandError("No status message received from vacuum") async def monitor( self, callback: Callable[[VacuumStatus], None], *, stop_event: Optional[asyncio.Event] = None, ) -> None: """Subscribe to the vacuum's status topic and invoke *callback* per update. Runs indefinitely until *stop_event* is set or the task is cancelled. Both synchronous and ``async`` callbacks are supported. Args: callback: Called with each decoded ``VacuumStatus``. stop_event: Optional ``asyncio.Event``; when set, monitoring stops cleanly after the current message. """ try: import aiomqtt except ImportError as exc: raise ConnectError("aiomqtt is required for MQTT support") from exc try: async with aiomqtt.Client(self.host, port=self.mapping.port) as client: await client.subscribe(self.mapping.status_topic) async for message in client.messages: if stop_event and stop_event.is_set(): return try: status = self._decode_status(bytes(message.payload)) except (DecoderError, CommandError): continue # Skip malformed messages silently if asyncio.iscoroutinefunction(callback): await callback(status) else: callback(status) except (ActionNotSupportedError, CommandError, ConnectError, DecoderError): raise except Exception as exc: raise ConnectError( f"MQTT monitor lost connection to {self.host}: {exc}" ) from excAsync MQTT client for local vacuum control.
Requires
aiomqtt(pip install aiomqtt).Methods
-
Expand source code
async def call(self, action: str) -> Any: """Execute a named action from the mapping. Args: action: Action name as defined in the mapping (e.g. ``"start_cleaning"``). Returns: ``True`` for ``command`` actions, or a ``VacuumStatus`` for ``status_request`` actions. Raises: ActionNotSupportedError: If *action* is not in the mapping. ConnectError: If the MQTT broker cannot be reached. CommandError: If a status response is not received within the timeout. """ if not self.supports(action): raise ActionNotSupportedError( f"MQTT mapping '{self.mapping.id}' does not support '{action}'" ) spec = self.mapping.actions[action] try: import aiomqtt except ImportError as exc: raise ConnectError( "aiomqtt is required for MQTT support. " "Install with: pip install aiomqtt" ) from exc try: if spec.type == "command": async with aiomqtt.Client(self.host, port=self.mapping.port) as client: await client.publish(self.mapping.command_topic, payload=spec.payload) return True if spec.type == "status_request": return await self._request_status(spec.payload, spec.timeout) except (ActionNotSupportedError, CommandError, ConnectError, DecoderError): raise except Exception as exc: raise ConnectError( f"MQTT error connecting to {self.host}:{self.mapping.port}: {exc}" ) from exc raise CommandError(f"Unrecognised MQTT action type '{spec.type}'")Execute a named action from the mapping.
Args
action- Action name as defined in the mapping (e.g.
"start_cleaning").
Returns
Trueforcommandactions, or aVacuumStatusforstatus_requestactions.Raises
ActionNotSupportedError- If action is not in the mapping.
ConnectError- If the MQTT broker cannot be reached.
CommandError- If a status response is not received within the timeout.
-
Expand source code
async def monitor( self, callback: Callable[[VacuumStatus], None], *, stop_event: Optional[asyncio.Event] = None, ) -> None: """Subscribe to the vacuum's status topic and invoke *callback* per update. Runs indefinitely until *stop_event* is set or the task is cancelled. Both synchronous and ``async`` callbacks are supported. Args: callback: Called with each decoded ``VacuumStatus``. stop_event: Optional ``asyncio.Event``; when set, monitoring stops cleanly after the current message. """ try: import aiomqtt except ImportError as exc: raise ConnectError("aiomqtt is required for MQTT support") from exc try: async with aiomqtt.Client(self.host, port=self.mapping.port) as client: await client.subscribe(self.mapping.status_topic) async for message in client.messages: if stop_event and stop_event.is_set(): return try: status = self._decode_status(bytes(message.payload)) except (DecoderError, CommandError): continue # Skip malformed messages silently if asyncio.iscoroutinefunction(callback): await callback(status) else: callback(status) except (ActionNotSupportedError, CommandError, ConnectError, DecoderError): raise except Exception as exc: raise ConnectError( f"MQTT monitor lost connection to {self.host}: {exc}" ) from excSubscribe to the vacuum's status topic and invoke callback per update.
Runs indefinitely until stop_event is set or the task is cancelled. Both synchronous and
asynccallbacks are supported.Args
callback- Called with each decoded
VacuumStatus. stop_event- Optional
asyncio.Event; when set, monitoring stops cleanly after the current message.
-
Expand source code
def supports(self, action: str) -> bool: """Return ``True`` if the mapping defines *action*.""" return action in self.mapping.actionsReturn
Trueif the mapping defines action.
-
-
Expand source code
class MappingNotFoundError(SharklocalError): """Raised when a named mapping file cannot be located."""Raised when a named mapping file cannot be located.
Ancestors
- SharklocalError
- builtins.Exception
- builtins.BaseException
-
Expand source code
@dataclass class ProbeResult: """Result of a :meth:`VacuumClient.probe` call.""" rest_mapping: Optional[str] = None """``id`` of the REST mapping that responded successfully, or ``None``.""" mqtt_mapping: Optional[str] = None """``id`` of the MQTT mapping that responded successfully, or ``None``.""" @property def has_rest(self) -> bool: """``True`` if a working REST mapping was found.""" return self.rest_mapping is not None @property def has_mqtt(self) -> bool: """``True`` if a working MQTT mapping was found.""" return self.mqtt_mapping is not None @property def is_connected(self) -> bool: """``True`` if at least one transport responded successfully.""" return self.has_rest or self.has_mqttResult of a :meth:
VacuumClient.probe()call.Instance variables
-
Expand source code
@property def has_mqtt(self) -> bool: """``True`` if a working MQTT mapping was found.""" return self.mqtt_mapping is not NoneTrueif a working MQTT mapping was found. -
Expand source code
@property def has_rest(self) -> bool: """``True`` if a working REST mapping was found.""" return self.rest_mapping is not NoneTrueif a working REST mapping was found. -
Expand source code
@property def is_connected(self) -> bool: """``True`` if at least one transport responded successfully.""" return self.has_rest or self.has_mqttTrueif at least one transport responded successfully. -
idof the MQTT mapping that responded successfully, orNone. -
idof the REST mapping that responded successfully, orNone.
-
-
Expand source code
class RESTVacuumClient: """Async HTTP/HTTPS client for local vacuum control via the REST API. Transport (``http`` vs ``https``) and SSL verification are driven by the mapping configuration, so different models can use different settings without code changes. """ def __init__(self, host: str, mapping: RESTMappingConfig) -> None: self.host = host self.mapping = mapping self._session: Optional[aiohttp.ClientSession] = None @property def base_url(self) -> str: return f"{self.mapping.transport}://{self.host}:{self.mapping.port}" def supports(self, action: str) -> bool: """Return ``True`` if the mapping defines *action*.""" return action in self.mapping.actions def _make_connector(self) -> aiohttp.TCPConnector: """Build a ``TCPConnector`` with SSL settings from the mapping.""" if self.mapping.transport == "http": return aiohttp.TCPConnector() if not self.mapping.verify_ssl: ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE return aiohttp.TCPConnector(ssl=ctx) return aiohttp.TCPConnector() async def _get_session(self) -> aiohttp.ClientSession: if self._session is None or self._session.closed: self._session = aiohttp.ClientSession(connector=self._make_connector()) return self._session async def close(self) -> None: """Close the underlying HTTP session.""" if self._session and not self._session.closed: await self._session.close() self._session = None async def call(self, action: str) -> Any: """Execute a named action and return a normalized result. Args: action: Action name as defined in the mapping (e.g. ``"get_status"``). Returns: A normalized model object for query actions, or ``True`` for fire-and-forget command actions. Raises: ActionNotSupportedError: If *action* is not in the mapping. ConnectError: If the vacuum host cannot be reached. CommandError: If the vacuum returns an HTTP error response. """ if not self.supports(action): raise ActionNotSupportedError( f"REST mapping '{self.mapping.id}' does not support '{action}'" ) spec = self.mapping.actions[action] url = f"{self.base_url}{spec.path}" session = await self._get_session() try: async with session.request( method=spec.method, url=url, json=spec.body, headers=spec.headers, ) as resp: resp.raise_for_status() if spec.response_map: data = await resp.json(content_type=None) return self._parse_response(spec.response_map, data) # Command endpoints may return minimal or empty bodies. try: return await resp.json(content_type=None) except Exception: return True except aiohttp.ClientConnectorError as exc: raise ConnectError( f"Cannot connect to vacuum at {self.host}:{self.mapping.port}" ) from exc except aiohttp.ClientResponseError as exc: raise CommandError( f"REST request to {spec.path} failed with HTTP {exc.status}" ) from exc # ------------------------------------------------------------------ # Response parsers # ------------------------------------------------------------------ def _parse_response(self, response_map: str, data: Dict[str, Any]) -> Any: """Dispatch to the appropriate parser for *response_map*.""" parsers = { "status": self._parse_status, "events": self._parse_events, "robot_id": self._parse_robot_id, "wifi_status": self._parse_wifi_status, } parser = parsers.get(response_map) return parser(data) if parser else data def _parse_status(self, data: Dict[str, Any]) -> VacuumStatus: raw_mode = str(data.get("mode", "")).lower() charging_raw = str(data.get("charging", "")).lower() # The API returns "connected" when charging and "unconnected" when not. charging = charging_raw == "connected" # "ready" is context-dependent: combined with "unconnected" it means the # vacuum is stopped and away from the dock (e.g. paused mid-run), not docked. if raw_mode == "ready" and not charging: mode = VacuumMode.IDLE else: mode_str = self.mapping.mode_map.get(raw_mode, "unknown") try: mode = VacuumMode(mode_str) except ValueError: mode = VacuumMode.UNKNOWN return VacuumStatus( mode=mode, battery_level=data.get("battery_level"), charging=charging, raw=data, ) def _parse_events(self, data: Dict[str, Any]) -> List[VacuumEvent]: return [ VacuumEvent( id=evt.get("id", 0), type=evt.get("type", ""), type_id=evt.get("type_id", 0), timestamp=evt.get("timestamp", {}), current_status=evt.get("current_status", ""), source_type=evt.get("source_type", ""), raw=evt, ) for evt in data.get("robot_events", []) ] def _parse_robot_id(self, data: Dict[str, Any]) -> DeviceInfo: # Per the API docs, use the top-level 'firmware' value for diagnostics. # Per-device entries in the 'devices' array can be ignored. firmware = data.get("firmware") or None return DeviceInfo(firmware=firmware, raw=data) def _parse_wifi_status(self, data: Dict[str, Any]) -> DeviceInfo: return DeviceInfo( mac_address=data.get("mac_address"), ip_address=data.get("ip_address"), ssid=data.get("ssid"), rssi=data.get("rssi"), raw=data, )Async HTTP/HTTPS client for local vacuum control via the REST API.
Transport (
httpvshttps) and SSL verification are driven by the mapping configuration, so different models can use different settings without code changes.Instance variables
-
Expand source code
@property def base_url(self) -> str: return f"{self.mapping.transport}://{self.host}:{self.mapping.port}"
Methods
-
Expand source code
async def call(self, action: str) -> Any: """Execute a named action and return a normalized result. Args: action: Action name as defined in the mapping (e.g. ``"get_status"``). Returns: A normalized model object for query actions, or ``True`` for fire-and-forget command actions. Raises: ActionNotSupportedError: If *action* is not in the mapping. ConnectError: If the vacuum host cannot be reached. CommandError: If the vacuum returns an HTTP error response. """ if not self.supports(action): raise ActionNotSupportedError( f"REST mapping '{self.mapping.id}' does not support '{action}'" ) spec = self.mapping.actions[action] url = f"{self.base_url}{spec.path}" session = await self._get_session() try: async with session.request( method=spec.method, url=url, json=spec.body, headers=spec.headers, ) as resp: resp.raise_for_status() if spec.response_map: data = await resp.json(content_type=None) return self._parse_response(spec.response_map, data) # Command endpoints may return minimal or empty bodies. try: return await resp.json(content_type=None) except Exception: return True except aiohttp.ClientConnectorError as exc: raise ConnectError( f"Cannot connect to vacuum at {self.host}:{self.mapping.port}" ) from exc except aiohttp.ClientResponseError as exc: raise CommandError( f"REST request to {spec.path} failed with HTTP {exc.status}" ) from excExecute a named action and return a normalized result.
Args
action- Action name as defined in the mapping (e.g.
"get_status").
Returns
A normalized model object for query actions, or
Truefor fire-and-forget command actions.Raises
ActionNotSupportedError- If action is not in the mapping.
ConnectError- If the vacuum host cannot be reached.
CommandError- If the vacuum returns an HTTP error response.
-
Expand source code
async def close(self) -> None: """Close the underlying HTTP session.""" if self._session and not self._session.closed: await self._session.close() self._session = NoneClose the underlying HTTP session.
-
Expand source code
def supports(self, action: str) -> bool: """Return ``True`` if the mapping defines *action*.""" return action in self.mapping.actionsReturn
Trueif the mapping defines action.
-
-
Expand source code
class SharklocalError(Exception): """Base exception for all sharklocal errors."""Base exception for all sharklocal errors.
Ancestors
- builtins.Exception
- builtins.BaseException
Subclasses
-
Expand source code
class VacuumClient: """Unified vacuum client with automatic transport selection. REST is preferred when both transports support an action. MQTT is used as a fallback only when REST is unreachable (``ConnectError``). If a transport is not configured, or does not define an action, it is skipped automatically. Pass a list of mapping names to enable auto-detection: call :meth:`probe` during setup to test each mapping against the device and pin the working one for all subsequent calls. With a single mapping per transport the client works immediately without probing. Example — single mapping (no probe required):: async with VacuumClient( "192.168.1.100", rest_mappings="sharkiq_v1", mqtt_mappings="sharkiq_v1", ) as vacuum: status = await vacuum.get_status() await vacuum.start_cleaning() Example — multiple candidates with probe:: async with VacuumClient( "192.168.1.100", rest_mappings=["sharkiq_v1", "other_model_v1"], mqtt_mappings=["sharkiq_v1"], ) as vacuum: result = await vacuum.probe() print(result.rest_mapping) # e.g. "sharkiq_v1" status = await vacuum.get_status() Args: host: IP address or hostname of the vacuum. rest_mappings: One REST mapping name, or a list of candidates to probe. mqtt_mappings: One MQTT mapping name, or a list of candidates to probe. mapping_search_paths: Additional directories to search for mapping files before falling back to built-in mappings. """ def __init__( self, host: str, *, rest_mappings: Optional[Union[str, List[str]]] = None, mqtt_mappings: Optional[Union[str, List[str]]] = None, mapping_search_paths: Optional[List[Union[str, Path]]] = None, ) -> None: self.host = host paths = mapping_search_paths or [] # Active (pinned) transport clients. Set immediately when a single # mapping is provided; set by probe() when multiple are configured. self._rest: Optional[RESTVacuumClient] = None self._mqtt: Optional[MQTTVacuumClient] = None self._status_callback: Optional[Callable[[VacuumStatus], None]] = None self._monitor_stop: Optional[asyncio.Event] = None self._monitor_task: Optional[asyncio.Task] = None # Primary transport in use. "REST" when REST is pinned and reachable, # "MQTT" when only MQTT is available, "NONE" until probe() confirms a # working transport (or a single mapping is auto-pinned). self.via: str = "NONE" def _to_list(val: Optional[Union[str, List[str]]]) -> List[str]: if val is None: return [] return [val] if isinstance(val, str) else list(val) self._rest_candidates: List[RESTVacuumClient] = [ RESTVacuumClient(host, load_rest_mapping(name, paths)) for name in _to_list(rest_mappings) ] self._mqtt_candidates: List[MQTTVacuumClient] = [ MQTTVacuumClient(host, load_mqtt_mapping(name, paths)) for name in _to_list(mqtt_mappings) ] # Auto-pin when only one candidate is loaded — no probe() required. if len(self._rest_candidates) == 1: self._rest = self._rest_candidates[0] self.via = "REST" if len(self._mqtt_candidates) == 1: self._mqtt = self._mqtt_candidates[0] if self.via == "NONE": self.via = "MQTT" # ------------------------------------------------------------------ # Context manager # ------------------------------------------------------------------ async def __aenter__(self) -> VacuumClient: return self async def __aexit__(self, *_: Any) -> None: await self.close() async def close(self) -> None: """Stop monitoring and close all underlying connections.""" await self.stop_monitoring() for client in self._rest_candidates: await client.close() # ------------------------------------------------------------------ # High-level action API # ------------------------------------------------------------------ async def start_cleaning(self) -> bool: """Start a full cleaning run.""" return await self._execute("start_cleaning") async def stop(self) -> bool: """Pause the current cleaning task.""" return await self._execute("stop") async def go_home(self) -> bool: """Send the vacuum back to its dock.""" return await self._execute("go_home") async def explore(self) -> bool: """Begin a mapping/exploration run.""" return await self._execute("explore") async def get_status(self) -> VacuumStatus: """Return the current vacuum status.""" return await self._execute("get_status") async def get_events(self) -> List[VacuumEvent]: """Return the event log since last startup.""" return await self._execute("get_events") async def get_device_info(self) -> DeviceInfo: """Return firmware and device identity information.""" return await self._execute("get_robot_id") async def get_wifi_status(self) -> DeviceInfo: """Return Wi-Fi connection details including MAC address.""" return await self._execute("get_wifi_status") # ------------------------------------------------------------------ # Mapping probe # ------------------------------------------------------------------ async def probe(self) -> ProbeResult: """Test all configured mappings and pin the best working one per transport. Each REST mapping is tested in the order supplied by calling ``get_status``. The first mapping that responds successfully is pinned as the active REST transport. The same process is then repeated for MQTT. Previously pinned transports are replaced if probe is called again. Call this during integration setup when multiple mapping candidates are configured. With a single mapping per transport the client works without calling probe. Returns: :class:`ProbeResult` with the ``id`` of each selected mapping, or ``None`` for a transport where no mapping succeeded. """ rest_id: Optional[str] = None mqtt_id: Optional[str] = None best_rest: Optional[RESTVacuumClient] = None for client in self._rest_candidates: try: await client.call("get_status") if best_rest is None or client.mapping.priority > best_rest.mapping.priority: best_rest = client except SharklocalError: continue if best_rest is not None: self._rest = best_rest rest_id = best_rest.mapping.id best_mqtt: Optional[MQTTVacuumClient] = None for client in self._mqtt_candidates: try: await client.call("get_status") if best_mqtt is None or client.mapping.priority > best_mqtt.mapping.priority: best_mqtt = client except SharklocalError: continue if best_mqtt is not None: self._mqtt = best_mqtt mqtt_id = best_mqtt.mapping.id if rest_id is not None: self.via = "REST" elif mqtt_id is not None: self.via = "MQTT" else: self.via = "NONE" return ProbeResult(rest_mapping=rest_id, mqtt_mapping=mqtt_id) # ------------------------------------------------------------------ # Real-time monitoring (MQTT only) # ------------------------------------------------------------------ def on_status_update(self, callback: Callable[[VacuumStatus], None]) -> None: """Register a callback to receive real-time status updates via MQTT. The callback receives a ``VacuumStatus`` on every status message. Both synchronous and ``async`` callables are accepted. Must be called before :meth:`start_monitoring`. """ if not self._mqtt_candidates: raise SharklocalError( "An MQTT mapping is required for real-time monitoring" ) self._status_callback = callback async def start_monitoring(self) -> None: """Begin monitoring the vacuum status in the background via MQTT. Call :meth:`on_status_update` first to register a callback. When multiple MQTT mappings are configured, call :meth:`probe` first to determine the active mapping. Safe to call repeatedly; a second call while already running is a no-op. """ if not self._mqtt_candidates: raise SharklocalError("An MQTT mapping is required for monitoring") if self._mqtt is None: raise SharklocalError( "Call probe() first to determine the active MQTT mapping " "when multiple MQTT mappings are configured" ) if self._status_callback is None: raise SharklocalError( "Register a callback with on_status_update() before starting monitoring" ) if self._monitor_task and not self._monitor_task.done(): return # Already running self._monitor_stop = asyncio.Event() self._monitor_task = asyncio.ensure_future( self._mqtt.monitor(self._status_callback, stop_event=self._monitor_stop) ) async def stop_monitoring(self) -> None: """Stop background status monitoring if it is running.""" if self._monitor_stop: self._monitor_stop.set() if self._monitor_task and not self._monitor_task.done(): self._monitor_task.cancel() try: await self._monitor_task except (asyncio.CancelledError, Exception): pass self._monitor_task = None self._monitor_stop = None # ------------------------------------------------------------------ # Transport inspection # ------------------------------------------------------------------ def supported_actions(self) -> List[str]: """Return all action names supported by any configured transport.""" actions: set[str] = set() rest_opts = [self._rest] if self._rest else self._rest_candidates mqtt_opts = [self._mqtt] if self._mqtt else self._mqtt_candidates for client in rest_opts: actions.update(client.mapping.actions) for client in mqtt_opts: actions.update(client.mapping.actions) return sorted(actions) def transports_for(self, action: str) -> List[str]: """Return which transports support *action*, in evaluation priority order. REST is listed before MQTT. When multiple candidates exist for a transport, any candidate supporting the action counts. """ result = [] rest_opts = [self._rest] if self._rest else self._rest_candidates mqtt_opts = [self._mqtt] if self._mqtt else self._mqtt_candidates if any(c.supports(action) for c in rest_opts): result.append("rest") if any(c.supports(action) for c in mqtt_opts): result.append("mqtt") return result @property def active_rest_mapping(self) -> Optional[str]: """The ``id`` of the currently pinned REST mapping, or ``None``.""" return self._rest.mapping.id if self._rest else None @property def active_mqtt_mapping(self) -> Optional[str]: """The ``id`` of the currently pinned MQTT mapping, or ``None``.""" return self._mqtt.mapping.id if self._mqtt else None # ------------------------------------------------------------------ # Internal transport evaluation # ------------------------------------------------------------------ async def _execute(self, action: str) -> Any: """Execute *action* using the best available transport. When a transport has been pinned (single mapping configured, or after :meth:`probe` ran), only that client is tried for its transport. When no client is pinned (multiple candidates, probe not yet called), all candidates for that transport are tried in the supplied order. Evaluation order: 1. REST candidates — tried left to right; ``ConnectError`` moves to the next candidate. Any other exception propagates immediately. 2. MQTT candidates — tried when all REST candidates raise ``ConnectError`` or no REST candidate supports the action. The ``ConnectError`` from the last exhausted candidate is re-raised when no transport can complete the action. """ rest_options: List[RESTVacuumClient] = ( [self._rest] if self._rest is not None else self._rest_candidates ) mqtt_options: List[MQTTVacuumClient] = ( [self._mqtt] if self._mqtt is not None else self._mqtt_candidates ) rest_options = [c for c in rest_options if c.supports(action)] mqtt_options = [c for c in mqtt_options if c.supports(action)] if not rest_options and not mqtt_options: raise ActionNotSupportedError( f"Action '{action}' is not supported by any configured mapping. " f"Supported actions: {self.supported_actions()}" ) last_connect_error: Optional[ConnectError] = None for client in rest_options: try: return await client.call(action) except ConnectError as exc: last_connect_error = exc for client in mqtt_options: try: return await client.call(action) except ConnectError as exc: last_connect_error = exc if last_connect_error: raise last_connect_error raise ActionNotSupportedError( # pragma: no cover f"No configured transport could execute action '{action}'" # pragma: no cover ) # pragma: no coverUnified vacuum client with automatic transport selection.
REST is preferred when both transports support an action. MQTT is used as a fallback only when REST is unreachable (
ConnectError). If a transport is not configured, or does not define an action, it is skipped automatically.Pass a list of mapping names to enable auto-detection: call :meth:
probeduring setup to test each mapping against the device and pin the working one for all subsequent calls. With a single mapping per transport the client works immediately without probing.Example — single mapping (no probe required)::
async with VacuumClient( "192.168.1.100", rest_mappings="sharkiq_v1", mqtt_mappings="sharkiq_v1", ) as vacuum: status = await vacuum.get_status() await vacuum.start_cleaning()Example — multiple candidates with probe::
async with VacuumClient( "192.168.1.100", rest_mappings=["sharkiq_v1", "other_model_v1"], mqtt_mappings=["sharkiq_v1"], ) as vacuum: result = await vacuum.probe() print(result.rest_mapping) # e.g. "sharkiq_v1" status = await vacuum.get_status()Args
host- IP address or hostname of the vacuum.
rest_mappings- One REST mapping name, or a list of candidates to probe.
mqtt_mappings- One MQTT mapping name, or a list of candidates to probe.
mapping_search_paths- Additional directories to search for mapping files before falling back to built-in mappings.
Instance variables
-
Expand source code
@property def active_mqtt_mapping(self) -> Optional[str]: """The ``id`` of the currently pinned MQTT mapping, or ``None``.""" return self._mqtt.mapping.id if self._mqtt else NoneThe
idof the currently pinned MQTT mapping, orNone. -
Expand source code
@property def active_rest_mapping(self) -> Optional[str]: """The ``id`` of the currently pinned REST mapping, or ``None``.""" return self._rest.mapping.id if self._rest else NoneThe
idof the currently pinned REST mapping, orNone.
Methods
-
Expand source code
async def close(self) -> None: """Stop monitoring and close all underlying connections.""" await self.stop_monitoring() for client in self._rest_candidates: await client.close()Stop monitoring and close all underlying connections.
-
Expand source code
async def explore(self) -> bool: """Begin a mapping/exploration run.""" return await self._execute("explore")Begin a mapping/exploration run.
-
Expand source code
async def get_device_info(self) -> DeviceInfo: """Return firmware and device identity information.""" return await self._execute("get_robot_id")Return firmware and device identity information.
-
Expand source code
async def get_events(self) -> List[VacuumEvent]: """Return the event log since last startup.""" return await self._execute("get_events")Return the event log since last startup.
-
Expand source code
async def get_status(self) -> VacuumStatus: """Return the current vacuum status.""" return await self._execute("get_status")Return the current vacuum status.
-
Expand source code
async def get_wifi_status(self) -> DeviceInfo: """Return Wi-Fi connection details including MAC address.""" return await self._execute("get_wifi_status")Return Wi-Fi connection details including MAC address.
-
Expand source code
async def go_home(self) -> bool: """Send the vacuum back to its dock.""" return await self._execute("go_home")Send the vacuum back to its dock.
-
Expand source code
def on_status_update(self, callback: Callable[[VacuumStatus], None]) -> None: """Register a callback to receive real-time status updates via MQTT. The callback receives a ``VacuumStatus`` on every status message. Both synchronous and ``async`` callables are accepted. Must be called before :meth:`start_monitoring`. """ if not self._mqtt_candidates: raise SharklocalError( "An MQTT mapping is required for real-time monitoring" ) self._status_callback = callbackRegister a callback to receive real-time status updates via MQTT.
The callback receives a
VacuumStatuson every status message. Both synchronous andasynccallables are accepted.Must be called before :meth:
start_monitoring. -
Expand source code
async def probe(self) -> ProbeResult: """Test all configured mappings and pin the best working one per transport. Each REST mapping is tested in the order supplied by calling ``get_status``. The first mapping that responds successfully is pinned as the active REST transport. The same process is then repeated for MQTT. Previously pinned transports are replaced if probe is called again. Call this during integration setup when multiple mapping candidates are configured. With a single mapping per transport the client works without calling probe. Returns: :class:`ProbeResult` with the ``id`` of each selected mapping, or ``None`` for a transport where no mapping succeeded. """ rest_id: Optional[str] = None mqtt_id: Optional[str] = None best_rest: Optional[RESTVacuumClient] = None for client in self._rest_candidates: try: await client.call("get_status") if best_rest is None or client.mapping.priority > best_rest.mapping.priority: best_rest = client except SharklocalError: continue if best_rest is not None: self._rest = best_rest rest_id = best_rest.mapping.id best_mqtt: Optional[MQTTVacuumClient] = None for client in self._mqtt_candidates: try: await client.call("get_status") if best_mqtt is None or client.mapping.priority > best_mqtt.mapping.priority: best_mqtt = client except SharklocalError: continue if best_mqtt is not None: self._mqtt = best_mqtt mqtt_id = best_mqtt.mapping.id if rest_id is not None: self.via = "REST" elif mqtt_id is not None: self.via = "MQTT" else: self.via = "NONE" return ProbeResult(rest_mapping=rest_id, mqtt_mapping=mqtt_id)Test all configured mappings and pin the best working one per transport.
Each REST mapping is tested in the order supplied by calling
get_status. The first mapping that responds successfully is pinned as the active REST transport. The same process is then repeated for MQTT. Previously pinned transports are replaced if probe is called again.Call this during integration setup when multiple mapping candidates are configured. With a single mapping per transport the client works without calling probe.
Returns
:class:
ProbeResultwith theidof each selected mapping, orNonefor a transport where no mapping succeeded. -
Expand source code
async def start_cleaning(self) -> bool: """Start a full cleaning run.""" return await self._execute("start_cleaning")Start a full cleaning run.
-
Expand source code
async def start_monitoring(self) -> None: """Begin monitoring the vacuum status in the background via MQTT. Call :meth:`on_status_update` first to register a callback. When multiple MQTT mappings are configured, call :meth:`probe` first to determine the active mapping. Safe to call repeatedly; a second call while already running is a no-op. """ if not self._mqtt_candidates: raise SharklocalError("An MQTT mapping is required for monitoring") if self._mqtt is None: raise SharklocalError( "Call probe() first to determine the active MQTT mapping " "when multiple MQTT mappings are configured" ) if self._status_callback is None: raise SharklocalError( "Register a callback with on_status_update() before starting monitoring" ) if self._monitor_task and not self._monitor_task.done(): return # Already running self._monitor_stop = asyncio.Event() self._monitor_task = asyncio.ensure_future( self._mqtt.monitor(self._status_callback, stop_event=self._monitor_stop) )Begin monitoring the vacuum status in the background via MQTT.
Call :meth:
on_status_updatefirst to register a callback. When multiple MQTT mappings are configured, call :meth:probefirst to determine the active mapping. Safe to call repeatedly; a second call while already running is a no-op. -
Expand source code
async def stop(self) -> bool: """Pause the current cleaning task.""" return await self._execute("stop")Pause the current cleaning task.
-
Expand source code
async def stop_monitoring(self) -> None: """Stop background status monitoring if it is running.""" if self._monitor_stop: self._monitor_stop.set() if self._monitor_task and not self._monitor_task.done(): self._monitor_task.cancel() try: await self._monitor_task except (asyncio.CancelledError, Exception): pass self._monitor_task = None self._monitor_stop = NoneStop background status monitoring if it is running.
-
Expand source code
def supported_actions(self) -> List[str]: """Return all action names supported by any configured transport.""" actions: set[str] = set() rest_opts = [self._rest] if self._rest else self._rest_candidates mqtt_opts = [self._mqtt] if self._mqtt else self._mqtt_candidates for client in rest_opts: actions.update(client.mapping.actions) for client in mqtt_opts: actions.update(client.mapping.actions) return sorted(actions)Return all action names supported by any configured transport.
-
Expand source code
def transports_for(self, action: str) -> List[str]: """Return which transports support *action*, in evaluation priority order. REST is listed before MQTT. When multiple candidates exist for a transport, any candidate supporting the action counts. """ result = [] rest_opts = [self._rest] if self._rest else self._rest_candidates mqtt_opts = [self._mqtt] if self._mqtt else self._mqtt_candidates if any(c.supports(action) for c in rest_opts): result.append("rest") if any(c.supports(action) for c in mqtt_opts): result.append("mqtt") return resultReturn which transports support action, in evaluation priority order.
REST is listed before MQTT. When multiple candidates exist for a transport, any candidate supporting the action counts.
-
Expand source code
@dataclass class VacuumEvent: """A single event from the vacuum event log.""" id: int type: str type_id: int timestamp: Dict[str, int] current_status: str source_type: str raw: Dict[str, Any] = field(default_factory=dict)A single event from the vacuum event log.
Instance variables
-
The type of the None singleton.
-
The type of the None singleton.
-
The type of the None singleton.
-
The type of the None singleton.
-
The type of the None singleton.
-
The type of the None singleton.
-
The type of the None singleton.
-
-
Expand source code
class VacuumMode(str, Enum): """Normalized operating modes across all transports.""" UNKNOWN = "unknown" CLEANING = "cleaning" RETURNING_TO_DOCK = "returning_to_dock" DOCKING = "docking" DOCKED = "docked" IDLE = "idle" # Powered on but not cleaning and not on the charging dock EXPLORING = "exploring" # Mapping/exploration run in progressNormalized operating modes across all transports.
Ancestors
- builtins.str
- enum.Enum
Class variables
-
The type of the None singleton.
-
The type of the None singleton.
-
The type of the None singleton.
-
The type of the None singleton.
-
The type of the None singleton.
-
The type of the None singleton.
-
The type of the None singleton.
-
Expand source code
@dataclass class VacuumStatus: """Normalized vacuum status, independent of transport protocol.""" mode: VacuumMode battery_level: Optional[int] = None charging: Optional[bool] = None raw: Dict[str, Any] = field(default_factory=dict) @property def is_cleaning(self) -> bool: return self.mode == VacuumMode.CLEANING @property def is_docked(self) -> bool: return self.mode in (VacuumMode.DOCKED, VacuumMode.DOCKING)Normalized vacuum status, independent of transport protocol.
Instance variables
-
The type of the None singleton.
-
The type of the None singleton.
-
Expand source code
@property def is_cleaning(self) -> bool: return self.mode == VacuumMode.CLEANING -
Expand source code
@property def is_docked(self) -> bool: return self.mode in (VacuumMode.DOCKED, VacuumMode.DOCKING) -
The type of the None singleton.
-
The type of the None singleton.
-