Source code for aiokef.aiokef

"""A module for asynchronously interacting with KEF wireless speakers."""

import asyncio
import functools
import inspect
import logging
import socket
import time
from collections import namedtuple
from contextlib import AsyncExitStack
from typing import Any, Callable, Optional, Tuple, Union

from async_timeout import timeout
from tenacity import (
    after_log,
    before_log,
    before_sleep_log,
    retry,
    stop_after_attempt,
    wait_exponential,
)

_LOGGER = logging.getLogger(__name__)

_RESPONSE_OK = 17  # the full response is [82, 17, 255]
_TIMEOUT = 2.0  # in seconds
_KEEP_ALIVE = 1.0  # in seconds
_VOLUME_SCALE = 100.0
_MAX_ATTEMPT_TILL_SUCCESS = 10
_MAX_SEND_MESSAGE_TRIES = 5
_MAX_CONNECTION_RETRIES = 10  # Each time `_send_command` is called, ...
# ... the connection is maximally refreshed this many times.

# Only in the case of Bluetooth there is a second number
# that can identify if the bluetooth is connected.
INPUT_SOURCES_20_MINUTES_LR = {
    "Bluetooth": 9,
    "Bluetooth_paired": 15,  # This cannot be used to set!
    "Aux": 10,
    "Opt": 11,
    "Usb": 12,
    "Wifi": 2,
}

# We will create {source_name: {standby_time: ("L/R code", "R/L code")}}
STANDBY_OPTIONS = [20, 60, None]  # in minutes and 0 means never standby
INPUT_SOURCES = {}
for source, code in INPUT_SOURCES_20_MINUTES_LR.items():
    LR_mapping = {t: code + i * 16 for i, t in enumerate(STANDBY_OPTIONS)}
    INPUT_SOURCES[source] = {t: (LR, LR + 64) for t, LR in LR_mapping.items()}

INPUT_SOURCES_RESPONSE = {}
for source, mapping in INPUT_SOURCES.items():
    source = source.replace("_paired", "")
    for t, (LR, RL) in mapping.items():
        INPUT_SOURCES_RESPONSE[LR] = (source, t, "L/R")
        INPUT_SOURCES_RESPONSE[RL] = (source, t, "R/L")

# This seems necessary on both the LSX and LS50W, I don't know why...
# It's the response when Wifi, "R/L", 60 standby.
INPUT_SOURCES_RESPONSE[48] = INPUT_SOURCES_RESPONSE[82]

_SET_START = ord("S")
_SET_MID = 129
_GET_END = 128
_GET_START = ord("G")

# Control
_VOL = ord("%")
_SOURCE = ord("0")
_CONTROL = ord("1")

# DSP
_MODE = 39
_DESK_DB = 40
_WALL_DB = 41
_TREBLE_DB = 42
_HIGH_HZ = 43
_LOW_HZ = 44
_SUB_DB = 45


def _get(which: int) -> bytes:
    return bytes([_GET_START, which, _GET_END])


def _set(which: int) -> Callable[[int], bytes]:
    return lambda i: bytes([_SET_START, which, _SET_MID, i])


COMMANDS = {
    "get_volume": _get(_VOL),
    "set_volume": _set(_VOL),
    "set_source": _set(_SOURCE),
    "get_source": _get(_SOURCE),
    "set_play_pause": _set(_CONTROL)(129),  # 128 also works
    "get_play_pause": _get(_CONTROL),
    "next_track": _set(_CONTROL)(130),
    "prev_track": _set(_CONTROL)(131),
    "get_mode": _get(_MODE),
    "set_mode": _set(_MODE),
    "get_desk_db": _get(_DESK_DB),
    "set_desk_db": _set(_DESK_DB),
    "get_wall_db": _get(_WALL_DB),
    "set_wall_db": _set(_WALL_DB),
    "get_treble_db": _get(_TREBLE_DB),
    "set_treble_db": _set(_TREBLE_DB),
    "get_high_hz": _get(_HIGH_HZ),
    "set_high_hz": _set(_HIGH_HZ),
    "get_low_hz": _get(_LOW_HZ),
    "set_low_hz": _set(_LOW_HZ),
    "get_sub_db": _get(_SUB_DB),
    "set_sub_db": _set(_SUB_DB),
}


[docs]def arange(start, end, step): return [x * step for x in range(int(start / step), int(end / step) + 1)]
# DSP options _DESK_WALL_DB_OPTIONS = arange(-6, 0, 0.5) _TREBLE_DB_OPTIONS = arange(-2, 2, 0.5) _HIGH_HZ_OPTIONS = arange(50, 120, 5) _LOW_HZ_OPTIONS = arange(40, 250, 5) _SUB_DB_OPTIONS = arange(-10, 10, 1) DSP_OPTION_MAPPING = { "desk_db": _DESK_WALL_DB_OPTIONS, "wall_db": _DESK_WALL_DB_OPTIONS, "treble_db": _TREBLE_DB_OPTIONS, "high_hz": _HIGH_HZ_OPTIONS, "low_hz": _LOW_HZ_OPTIONS, "sub_db": _SUB_DB_OPTIONS, } State = namedtuple("State", ["source", "is_on", "standby_time", "orientation"]) Mode = namedtuple( "Mode", [ "desk_mode", "wall_mode", "phase_correction", "high_pass", "sub_polarity", "bass_extension", ], ) _RETRY_KWARGS = { "wait": wait_exponential(exp_base=1.5), "before": before_log(_LOGGER, logging.DEBUG), "before_sleep": before_sleep_log(_LOGGER, logging.DEBUG), "after": after_log(_LOGGER, logging.DEBUG), } _CMD_RETRY_KWARGS = dict( _RETRY_KWARGS, stop=stop_after_attempt(_MAX_ATTEMPT_TILL_SUCCESS) ) _SEND_MSG_RETRY_KWARGS = dict( _RETRY_KWARGS, stop=stop_after_attempt(_MAX_SEND_MESSAGE_TRIES) ) BASS_EXTENSION_MAPPING = { "00": "Standard", "10": "Less", "01": "Extra", "11": "Unknown", } BASS_EXTENSION_MAPPING_INV = {v: k for k, v in BASS_EXTENSION_MAPPING.items()}
[docs]def bits_to_mode(bits: int) -> Mode: if bits == 255: # Happens if device is off return Mode(*(len(Mode._fields) * ["Unknown"])) mode_bits = f"{bits:08b}" desk_mode = mode_bits[7] == "1" wall_mode = mode_bits[6] == "1" phase_correction = mode_bits[5] == "1" high_pass = mode_bits[4] == "1" sub_polarity = "-" if mode_bits[1] == "1" else "+" bass_extension_bits = mode_bits[2:4] bass_extension = BASS_EXTENSION_MAPPING[bass_extension_bits] return Mode( desk_mode=desk_mode, wall_mode=wall_mode, phase_correction=phase_correction, high_pass=high_pass, sub_polarity=sub_polarity, bass_extension=bass_extension, )
[docs]def mode_to_bits(mode: Mode) -> int: true_false = {True: "1", False: "0"} desk_mode = true_false[mode.desk_mode] wall_mode = true_false[mode.wall_mode] phase_correction = true_false[mode.phase_correction] high_pass = true_false[mode.high_pass] sub_polarity = {"-": "1", "+": "0"}[mode.sub_polarity] bass_extension = BASS_EXTENSION_MAPPING_INV[mode.bass_extension] byte = f"1{sub_polarity}{bass_extension}{high_pass}{phase_correction}{wall_mode}{desk_mode}" return int(byte, 2)
def _parse_response(message: bytes, reply: bytes) -> bytes: """Sometimes we receive many messages, so we need to split them up and choose the right one.""" responses = [b"R" + i for i in reply.split(b"R") if i] if message[0] == ord("G"): which = message[1] try: return next(r for r in responses if r[1] == which) except StopIteration: msg = "The query type didn't match with the response." raise Exception(msg) from None elif message[0] == ord("S"): FULL_RESPONSE_OK = bytes([82, 17, 255]) if FULL_RESPONSE_OK in responses: return FULL_RESPONSE_OK else: raise Exception("Didn't get OK after SET command.") else: raise Exception(f"Got an unknown response '{reply!r}'") class _AsyncCommunicator: def __init__( self, host: str, port: int, *, loop: Optional[asyncio.events.AbstractEventLoop] = None, ): self.host = host self.port = port self._reader: Optional[asyncio.StreamReader] = None self._writer: Optional[asyncio.StreamWriter] = None self._last_time_stamp = 0.0 self._is_online = False self._loop = loop or asyncio.get_event_loop() self._disconnect_task = None self._lock = asyncio.Lock() @property def is_connected(self) -> bool: return (self._reader, self._writer) != (None, None) async def open_connection(self) -> None: retries = 0 while retries < _MAX_CONNECTION_RETRIES: try: async with self._lock, timeout(_TIMEOUT): if self.is_connected: if self._writer.is_closing(): # type: ignore _LOGGER.debug( "%s: Connection closing but did not disconnect", self.host, ) await self._disconnect(use_lock=False) else: _LOGGER.debug("%s: Connection is still alive", self.host) return _LOGGER.debug("%s: Opening connection", self.host) self._reader, self._writer = await asyncio.open_connection( self.host, self.port, family=socket.AF_INET ) _LOGGER.debug("%s: Opening connection successful", self.host) except ConnectionRefusedError: _LOGGER.debug("%s: Opening connection failed", self.host) await asyncio.sleep(0.5) except BlockingIOError: # Connection incoming # XXX: I have never seen this. _LOGGER.debug("%s: BlockingIOError", self.host) retries = 0 await asyncio.sleep(1) except (asyncio.TimeoutError, OSError) as e: # Host is down self._is_online = False raise ConnectionRefusedError("Speaker is offline.") from e else: self._is_online = True self._last_time_stamp = time.time() self._schedule_disconnect() return retries += 1 self._is_online = False raise ConnectionRefusedError("Connection tries exceeded.") async def _send_message(self, message: bytes) -> bytes: # type: ignore[return] async with self._lock: assert self._writer is not None assert self._reader is not None _LOGGER.debug("%s: Writing message: %s", self.host, str(message)) try: # I am getting `[asyncio] socket.send() raised exception.` # in one of the two lines below. # After adding this, I've never seen the error again, but also # never seen the log message below... self._writer.write(message) await self._writer.drain() except ConnectionResetError: _LOGGER.exception("%s: Got an exception in writing", self.host) await self._disconnect(use_lock=False) raise _LOGGER.debug("%s: Reading message", self.host) try: async with timeout(_TIMEOUT): data = await self._reader.read(100) _LOGGER.debug("%s: Got reply, %s", self.host, str(data)) self._last_time_stamp = time.time() self._schedule_disconnect() except asyncio.TimeoutError: _LOGGER.error("%s: Timeout in waiting for reply", self.host) else: return data async def _disconnect(self, use_lock=True) -> None: _LOGGER.debug("%s: _disconnect called", self.host) self._maybe_cancel_disconnect_task() maybe_lock = self._lock if use_lock else AsyncExitStack() if self.is_connected: async with maybe_lock: # type: ignore assert self._writer is not None _LOGGER.debug("%s: Going to disconnect now", self.host) try: self._writer.close() await self._writer.wait_closed() _LOGGER.debug("%s: Disconnected", self.host) except ConnectionResetError: # Raised ConnectionResetError: [Errno 104] Connection reset by peer # which means that the speaker closed the connection. _LOGGER.exception("%s: Disconnecting raised", self.host) self._reader, self._writer = (None, None) async def _disconnect_in(self, dt): await asyncio.sleep(dt) await asyncio.shield(self._disconnect()) # ℹ️ shield it from being cancelled def _maybe_cancel_disconnect_task(self): if self._disconnect_task is not None: _LOGGER.debug("%s: Cancelling the _disconnect_task", self.host) self._disconnect_task.cancel() self._disconnect_task = None def _schedule_disconnect(self, dt=_KEEP_ALIVE): self._maybe_cancel_disconnect_task() self._disconnect_task = asyncio.create_task(self._disconnect_in(dt)) @retry(**_SEND_MSG_RETRY_KWARGS) async def send_message(self, msg: bytes) -> int: await self.open_connection() raw_reply = await self._send_message(msg) reply = _parse_response(msg, raw_reply)[-2] _LOGGER.debug("%s: Received: %s", self.host, reply) return reply
[docs]class AsyncKefSpeaker: """Asynchronous KEF speaker class. Parameters ---------- host : str The IP of the speaker. port : int, optional The port used for the communication, the default is 50001. volume_step : float, optional The volume change when calling `increase_volume` or `decrease_volume`, by default 0.05. maximum_volume : float, optional The maximum allow volume, between 0 and 1. Use this to avoid accidentally setting very high volumes, by default 1.0. loop : `asyncio.BaseEventLoop`, optional The eventloop to use. standby_time: int, optional Put the speaker in standby when inactive for ``standby_time`` minutes. The only options are None (default), 20, and 60. inverse_speaker_mode : bool, optional Reverse L/R to R/L. Attributes ---------- sync : `aiokef.SyncKefSpeaker` Run any method that the `AsyncKefSpeaker` has in a synchronous way. For example ``kef_speaker.sync.mute()``. """ def __init__( self, host: str, port: int = 50001, volume_step: float = 0.05, maximum_volume: float = 1.0, standby_time: Optional[int] = None, inverse_speaker_mode: bool = False, *, loop: Optional[asyncio.events.AbstractEventLoop] = None, ): if standby_time not in STANDBY_OPTIONS: raise ValueError( f"It is only possible to use `standby_time` from {STANDBY_OPTIONS}" ) self.host = host self.port = port self.volume_step = volume_step self.maximum_volume = maximum_volume self.standby_time = standby_time self.inverse_speaker_mode = inverse_speaker_mode self._comm = _AsyncCommunicator(host, port, loop=loop) self.sync = SyncKefSpeaker(self)
[docs] @retry(**_CMD_RETRY_KWARGS) async def get_state(self) -> State: # If the speaker is off, the source increases by 128 response = await self._comm.send_message(COMMANDS["get_source"]) is_on = response <= 128 code = response % 128 if code not in INPUT_SOURCES_RESPONSE: raise ConnectionError(f"Getting source failed, got response {response}.") source, standby_time, orientation = INPUT_SOURCES_RESPONSE[code] return State(source, is_on, standby_time, orientation)
[docs] async def get_source(self) -> None: state = await self.get_state() return state.source
[docs] @retry(**_CMD_RETRY_KWARGS) async def set_source(self, source: str, *, state="on") -> None: assert source in INPUT_SOURCES i = INPUT_SOURCES[source][self.standby_time][self.inverse_speaker_mode] % 128 if state == "off": i += 128 response = await self._comm.send_message( COMMANDS["set_source"](i) # type: ignore ) if response != _RESPONSE_OK: raise ConnectionError(f"Setting source failed, got response {response}.") for i in range(_MAX_ATTEMPT_TILL_SUCCESS): state = await self.get_state() current_source = state.source if ( (current_source == source) and ("R/L" if self.inverse_speaker_mode else "L/R") and (state.standby_time == self.standby_time) ): _LOGGER.debug("%s: Source is %s", self.host, source) return _LOGGER.debug( "%s: Try #%s: Source is %s but %s is selected", self.host, i, current_source, source, ) await asyncio.sleep(0.5) raise TimeoutError( f"Tried to set {source} {_MAX_ATTEMPT_TILL_SUCCESS} times" f" but the speaker is still {current_source}." )
[docs] @retry(**_CMD_RETRY_KWARGS) async def get_volume_and_is_muted( self, scale=True ) -> Tuple[Union[float, int], bool]: """Return volume level (0..1) and is_muted (in a single call).""" volume = await self._comm.send_message(COMMANDS["get_volume"]) if volume is None: raise ConnectionError("Getting volume failed.") is_muted = volume >= 128 return volume / _VOLUME_SCALE if scale else volume, is_muted
@retry(**_CMD_RETRY_KWARGS) async def _set_volume(self, volume: int) -> None: # Write volume level (0..100) on index 3, # add 128 to current level to mute. response = await self._comm.send_message( COMMANDS["set_volume"](volume) # type: ignore ) if response != _RESPONSE_OK: raise ConnectionError( f"Setting the volume failed, got response {response}." )
[docs] @retry(**_CMD_RETRY_KWARGS) async def set_play_pause(self) -> None: response = await self._comm.send_message(COMMANDS["set_play_pause"]) if response != _RESPONSE_OK: raise ConnectionError( f"Setting play or pause failed, got response {response}." )
[docs] @retry(**_CMD_RETRY_KWARGS) async def get_play_pause(self) -> str: response = await self._comm.send_message(COMMANDS["get_play_pause"]) if response == 128: return "Paused" elif response == 129: return "Playing" elif response == 132: return "Stopped" else: raise ConnectionError( f"Getting play or pause failed, got response {response}." )
[docs] @retry(**_CMD_RETRY_KWARGS) async def prev_track(self) -> None: response = await self._comm.send_message(COMMANDS["prev_track"]) if response != _RESPONSE_OK: raise ConnectionError( f"Setting the previous track failed, got response {response}." )
[docs] @retry(**_CMD_RETRY_KWARGS) async def next_track(self) -> None: response = await self._comm.send_message(COMMANDS["next_track"]) if response != _RESPONSE_OK: raise ConnectionError( f"Setting the next track failed, got response {response}." )
[docs] @retry(**_CMD_RETRY_KWARGS) async def get_mode(self) -> Union[Mode, str]: response = await self._comm.send_message(COMMANDS["get_mode"]) return bits_to_mode(response)
@retry(**_CMD_RETRY_KWARGS) async def _set_mode(self, mode: Mode) -> None: i = mode_to_bits(mode) cmd = COMMANDS["set_mode"](i) # type: ignore response = await self._comm.send_message(cmd) if response != _RESPONSE_OK: raise ConnectionError(f"Setting the mode failed, got response {response}.")
[docs] async def set_mode( self, desk_mode=None, wall_mode=None, phase_correction=None, high_pass=None, sub_polarity=None, bass_extension=None, ) -> None: """Set the mode of the speaker. Leave option None to keep the setting the same.""" current_mode = await self.get_mode() if desk_mode is None: desk_mode = current_mode.desk_mode if wall_mode is None: wall_mode = current_mode.wall_mode if phase_correction is None: phase_correction = current_mode.phase_correction if high_pass is None: high_pass = current_mode.high_pass if sub_polarity is None: sub_polarity = current_mode.sub_polarity if bass_extension is None: bass_extension = current_mode.bass_extension new_mode = Mode( desk_mode=desk_mode, wall_mode=wall_mode, phase_correction=phase_correction, high_pass=high_pass, sub_polarity=sub_polarity, bass_extension=bass_extension, ) await self._set_mode(new_mode)
# XXX: implement a check like in set_source @retry(**_CMD_RETRY_KWARGS) async def _get_dsp(self, which) -> Union[int, str]: cmd = COMMANDS[f"get_{which}"] response = await self._comm.send_message(cmd) if response == 255: # Happens for example when getting "high_hz" and "High pass mode" if off. return "Unknown" return DSP_OPTION_MAPPING[which][response - 128]
[docs] async def get_desk_db(self) -> int: return await self._get_dsp("desk_db")
[docs] async def get_wall_db(self) -> int: return await self._get_dsp("wall_db")
[docs] async def get_treble_db(self) -> int: return await self._get_dsp("treble_db")
[docs] async def get_high_hz(self) -> int: return await self._get_dsp("high_hz")
[docs] async def get_low_hz(self) -> int: return await self._get_dsp("low_hz")
[docs] async def get_sub_db(self) -> int: return await self._get_dsp("sub_db")
@retry(**_CMD_RETRY_KWARGS) async def _set_dsp(self, which, value) -> None: options = DSP_OPTION_MAPPING[which] as_type = type(options[0]) i = options.index(as_type(value)) + 128 # "+ 128" seems to do nothing cmd = COMMANDS[f"set_{which}"](i) # type: ignore response = await self._comm.send_message(cmd) if response != _RESPONSE_OK: raise ConnectionError( f"Setting the {which} failed, got response {response}." )
[docs] async def set_desk_db(self, db) -> None: await self._set_dsp("desk_db", db)
[docs] async def set_wall_db(self, db) -> None: await self._set_dsp("wall_db", db)
[docs] async def set_treble_db(self, db) -> None: await self._set_dsp("treble_db", db)
[docs] async def set_high_hz(self, hz) -> None: await self._set_dsp("high_hz", hz)
[docs] async def set_low_hz(self, hz) -> None: await self._set_dsp("low_hz", hz)
[docs] async def set_sub_db(self, db) -> None: await self._set_dsp("sub_db", db)
[docs] async def get_volume(self) -> Optional[float]: """Volume level of the media player (0..1). None if muted.""" volume, is_muted = await self.get_volume_and_is_muted(scale=True) return volume if not is_muted else None
[docs] async def set_volume(self, value: float) -> float: volume = max(0.0, min(self.maximum_volume, value)) await self._set_volume(int(volume * _VOLUME_SCALE)) return volume
async def _change_volume(self, step: float) -> float: """Change volume by `step`.""" volume = await self.get_volume() is_muted = await self.is_muted() if is_muted: await self.unmute() assert volume is not None return await self.set_volume(volume + step)
[docs] async def increase_volume(self) -> float: """Increase volume by `self.volume_step`.""" return await self._change_volume(self.volume_step)
[docs] async def decrease_volume(self) -> float: """Decrease volume by `self.volume_step`.""" return await self._change_volume(-self.volume_step)
[docs] async def is_muted(self) -> bool: _, is_muted = await self.get_volume_and_is_muted(scale=False) return is_muted
[docs] async def mute(self) -> None: volume, _ = await self.get_volume_and_is_muted(scale=False) await self._set_volume(int(volume) % 128 + 128)
[docs] async def unmute(self) -> None: volume, _ = await self.get_volume_and_is_muted(scale=False) await self._set_volume(int(volume) % 128)
[docs] async def is_online(self) -> bool: # type: ignore[return] try: await self._comm.open_connection() except ConnectionRefusedError: assert not self._comm._is_online else: return self._comm._is_online
[docs] async def is_on(self) -> bool: state = await self.get_state() return state.is_on
[docs] async def turn_on(self, source: Optional[str] = None) -> None: """The speaker can be turned on by selecting an INPUT_SOURCE.""" state = await self.get_state() if state.is_on: return await self.set_source(source or state.source, state="on") for i in range(20): # it can take 20s to boot if await self.is_on(): _LOGGER.debug("%s: Speaker is on", self.host) return _LOGGER.debug( "%s: Try #%s: Turned on the speaker, but it is still off", self.host, i ) await asyncio.sleep(1)
[docs] async def turn_off(self) -> None: state = await self.get_state() if not state.is_on: return await self.set_source(state.source, state="off") for i in range(20): # it can take 20s to boot if not await self.is_on(): _LOGGER.debug("%s: Speaker is off", self.host) return _LOGGER.debug( "%s: Try #%s: Turned off the speaker, but it is still on", self.host, i ) await asyncio.sleep(1)
[docs]class SyncKefSpeaker: """A synchronous KEF speaker class. This has the same methods as `aiokef.AsyncKefSpeaker`, however, it wraps all async methods and call them in a blocking way.""" def __init__(self, async_speaker: AsyncKefSpeaker): self.async_speaker = async_speaker def __getattr__(self, attr: str) -> Any: method = getattr(self.async_speaker, attr) if method is None: raise AttributeError(f"'SyncKefSpeaker' object has no attribute '{attr}.'") if inspect.iscoroutinefunction(method): @functools.wraps(method) def wrapped(*args, **kwargs): return asyncio.run(method(*args, **kwargs)) return wrapped else: return method