import time import sounddevice import sounddevice as sd from pprint import pformat DEFAULT = 0 sd.default.channels = 2 sd.default.dtype = "int16" sd.default.latency = "low" sd.default.samplerate = 48000 class PCMStream: globals() def __init__(self, _device_id): self.stream = sd.RawInputStream(device=_device_id) def change_device(self, _device_id): self.clean_up() self.stream = sd.RawInputStream(device=_device_id) # Stops and destroys the current stream def clean_up(self): if not self.stream.closed: self.stream.stop(ignore_errors=True) self.stream.close(ignore_errors=True) # Stops the current running stream but does not destroy it async def pause(self): if self.stream.active: self.stream.stop(ignore_errors=True) # Plays the stream async def play(self): if not self.stream.active: self.stream.start() # call back read function for the stream def read(self, num_bytes): if self.stream.active: # frame is 4 bytes frames = int(num_bytes / 4) data = self.stream.read(frames)[0] # convert to pcm format return bytes(data) class DeviceNotFoundError(Exception): def __init__(self): self.devices = sd.query_devices() self.host_apis = sd.query_hostapis() super().__init__("No Devices Found") def __str__(self): return ( f"Devices \n" f"{self.devices} \n " f"Host APIs \n" f"{pformat(self.host_apis)}" ) def query_devices(): options = { device.get("name"): index for index, device in enumerate(sd.query_devices()) if (device.get("max_input_channels") > 0 and device.get("hostapi") == DEFAULT) } if not options: raise DeviceNotFoundError() return options