diff --git a/readme.md b/readme.md index 052dd96..4048be2 100644 --- a/readme.md +++ b/readme.md @@ -25,6 +25,7 @@ The simplicity of Scream allows it to be very easy to work with while retaining * Milkdrop Visualizations thanks to the browser-based [Butterchurn](https://github.com/jberg/butterchurn) project * There are playback sinks available for common OSes such as [Windows](https://github.com/duncanthrax/scream/tree/master/Receivers/dotnet-windows/ScreamReader), [Linux](https://github.com/duncanthrax/scream/tree/master/Receivers/unix), and [Android](https://github.com/martinellimarco/scream-android/tree/90d1364ee36dd12ec9d7d2798926150b370030f3), as well as some embedded devices such as [the ESP32](http://tomeko.net/projects/esp32_rtp_pager/). * Embedded noVNC for controlling remote computers playing music +* Can accept streams from PulseAudio ![Screenshot of ScreamRouter noVNC](/images/noVNC.png) @@ -86,6 +87,15 @@ Each Source holds information for the source IP, volume, and the Source name. ![Screenshot of Add Source Dialog](/images/AddSource.png) +### PulseAudio + +PulseAudio can send to ScreamRouter over RTP. The RTP recevier needs a set MTU size to match Scream's packet size. The following command will send the Dummy source to a ScreamRouter Receiver: + + pactl load-module module-rtp-send format=s16le source=auto_null.monitor destination= port=40000 mtu=1164 + +Currently the RTP receiver assumes 16-bit 44100kHz audio with 1152 bytes of PCM in the payload. + + ### Sink Groups Each Sink Group holds a name, a list of Sinks, and a volume. Groups can be nested. diff --git a/src/audio/rtp_recevier.py b/src/audio/rtp_recevier.py new file mode 100644 index 0000000..a527c57 --- /dev/null +++ b/src/audio/rtp_recevier.py @@ -0,0 +1,89 @@ +"""Receiver, handles a port for listening for sources to send UDP packets to + Puts received data in sink queues""" +import multiprocessing +import os +import select +import socket +from ctypes import c_bool +from subprocess import TimeoutExpired +from typing import List +import rtp +from rtp.payloadType import PayloadType + +from src.audio.scream_header_parser import create_stream_info +import src.constants.constants as constants +from src.screamrouter_logger.screamrouter_logger import get_logger +from src.utils.utils import close_all_pipes, set_process_name + +logger = get_logger(__name__) +RTP = rtp.RTP() + +class RTPReceiver(multiprocessing.Process): + """Handles the main socket that listens for incoming Scream streams and sends them to sinks""" + def __init__(self, controller_write_fd_list: List[int]): + """Receives UDP packets and sends them to known queue lists""" + super().__init__(name="RTP Receiver Thread") + self.sock: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + """Main socket all sources send to""" + self.controller_write_fd_list: List[int] = controller_write_fd_list + """List of all sink queues to forward data to""" + self.running = multiprocessing.Value(c_bool, True) + """Multiprocessing-passed flag to determine if the thread is running""" + self.known_ips = multiprocessing.Manager().list() + if len(controller_write_fd_list) == 0: # Will be zero if this is just a placeholder. + self.running.value = c_bool(False) + return + self.start() + + def stop(self) -> None: + """Stops the Receiver and all sinks""" + logger.info("[RTP Receiver] Stopping") + was_running: bool = bool(self.running.value) + self.running.value = c_bool(False) + if constants.KILL_AT_CLOSE: + try: + self.kill() + except AttributeError: + pass + if constants.WAIT_FOR_CLOSES and was_running: + try: + self.join(5) + except TimeoutExpired: + logger.warning("RTP Receiver failed to close") + + def run(self) -> None: + """This thread listens for traffic from all sources and sends it to sinks""" + set_process_name("RTPRecvr", "RTP Receiver Thread") + logger.debug("[RTP Receiver] Receiver Thread PID %s", os.getpid()) + logger.info("[RTP Receiver] Receiver started on port %s", constants.RTP_RECEIVER_PORT) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, + constants.PACKET_SIZE * 1024 * 1024) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.sock.bind(("", constants.RTP_RECEIVER_PORT)) + mono_header = create_stream_info(16, 44100, 1, "mono") + stereo_header = create_stream_info(16, 44100, 2, "stereo") + + while self.running.value: + ready = select.select([self.sock], [], [], .3) + if ready[0]: + data, addr = self.sock.recvfrom(constants.PACKET_SIZE + 500) + rtp_packet = RTP.fromBytearray(bytearray(data)) + if not rtp_packet.payloadType in [PayloadType.L16_1chan, + PayloadType.L16_2chan, + PayloadType.DYNAMIC_127]: + logger.warning("Can only decode 16-bit LPCM, unsupported type %s from %s:%s", + rtp_packet.payloadType, addr[0], addr[1]) + continue + if addr[0] not in self.known_ips: + self.known_ips.append(addr[0]) + padded_tag: bytes + padded_tag = bytes(addr[0].encode("ascii")) + bytes([0] * (constants.TAG_MAX_LENGTH - len(addr[0]))) + header: bytes = mono_header.header + if rtp_packet.payloadType in [PayloadType.L16_2chan, + PayloadType.DYNAMIC_127]: + header = stereo_header.header + for controller_write_fd in self.controller_write_fd_list: + os.write(controller_write_fd, padded_tag + header + rtp_packet.payload) + + logger.info("[RTP Receiver] Main thread stopped") + close_all_pipes() diff --git a/src/audio/udp_receiver.py b/src/audio/scream_receiver.py similarity index 79% rename from src/audio/udp_receiver.py rename to src/audio/scream_receiver.py index ec9de22..17e371b 100644 --- a/src/audio/udp_receiver.py +++ b/src/audio/scream_receiver.py @@ -14,17 +14,18 @@ logger = get_logger(__name__) -class UDPReceiver(multiprocessing.Process): +class ScreamReceiver(multiprocessing.Process): """Handles the main socket that listens for incoming Scream streams and sends them to sinks""" def __init__(self, controller_write_fd_list: List[int]): """Receives UDP packets and sends them to known queue lists""" - super().__init__(name="Receiver Thread") + super().__init__(name="Scream Receiver Thread") self.sock: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) """Main socket all sources send to""" self.controller_write_fd_list: List[int] = controller_write_fd_list """List of all sink queues to forward data to""" self.running = multiprocessing.Value(c_bool, True) """Multiprocessing-passed flag to determine if the thread is running""" + self.known_ips = multiprocessing.Manager().list() if len(controller_write_fd_list) == 0: # Will be zero if this is just a placeholder. self.running.value = c_bool(False) return @@ -32,7 +33,7 @@ def __init__(self, controller_write_fd_list: List[int]): def stop(self) -> None: """Stops the Receiver and all sinks""" - logger.info("[Receiver] Stopping") + logger.info("[Scream Receiver] Stopping") was_running: bool = bool(self.running.value) self.running.value = c_bool(False) if constants.KILL_AT_CLOSE: @@ -48,22 +49,23 @@ def stop(self) -> None: def run(self) -> None: """This thread listens for traffic from all sources and sends it to sinks""" - set_process_name("Receiver", "Receiver Thread") - logger.debug("[Receiver] Receiver Thread PID %s", os.getpid()) - logger.info("[Receiver] Receiver started on port %s", constants.RECEIVER_PORT) + set_process_name("ScreamReceiver", "Scream Receiver Thread") + logger.debug("[Scream Receiver] Receiver Thread PID %s", os.getpid()) + logger.info("[Scream Receiver] Receiver started on port %s", constants.SCREAM_RECEIVER_PORT) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, constants.PACKET_SIZE * 1024 * 1024) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.sock.bind(("", constants.RECEIVER_PORT)) + self.sock.bind(("", constants.SCREAM_RECEIVER_PORT)) while self.running.value: ready = select.select([self.sock], [], [], .3) if ready[0]: data, addr = self.sock.recvfrom(constants.PACKET_SIZE) addrlen = len(addr[0]) - + if addr[0] not in self.known_ips: + self.known_ips.append(addr[0]) for controller_write_fd in self.controller_write_fd_list: os.write(controller_write_fd, bytes(addr[0].encode("ascii") + bytes([0] * (constants.TAG_MAX_LENGTH - addrlen)) + data)) - logger.info("[Receiver] Main thread stopped") + logger.info("[Scream Receiver] Main thread stopped") close_all_pipes() diff --git a/src/audio/sink_output_mixer.py b/src/audio/sink_output_mixer.py index a969e49..122660c 100644 --- a/src/audio/sink_output_mixer.py +++ b/src/audio/sink_output_mixer.py @@ -35,6 +35,7 @@ def __init__(self, sink_ip: IPAddressType, """Output socket for sink""" self._sink_port: PortType = sink_port self.__output_header: bytes = output_info.header + self.__output_info: ScreamHeader = output_info """Holds the header added onto packets sent to Scream receivers""" self._sink_ip = sink_ip self.read_fds: List[int] = [] @@ -72,6 +73,7 @@ def run(self) -> None: logger.debug("[Sink %s] Mixer Thread PID %s", self._sink_ip, os.getpid()) self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, constants.PACKET_SIZE * 65535) output_data = numpy.array(constants.PACKET_DATA_SIZE, numpy.int64) + output_buffer = numpy.array([], numpy.int8) while self.running.value: first_value: bool = True for index, fd in enumerate(self.read_fds): @@ -90,11 +92,27 @@ def run(self) -> None: if first_value: # No active source, sleep for 100ms to not burn CPU. time.sleep(.1) else: - output_data = numpy.clip(output_data, -2147483648, 2147483647) - self.__sock.sendto( - self.__output_header + numpy.int32(output_data).tobytes(), - (str(self._sink_ip), int(self._sink_port))) - os.write(self.ffmpeg_mp3_write_fd, numpy.int32(output_data).tobytes()) + os.write( + self.ffmpeg_mp3_write_fd, + numpy.array(output_data, numpy.int32).tobytes()) + if self.__output_info.bit_depth == 32: + output_data = numpy.array(output_data, numpy.uint32) + output_buffer = numpy.insert( + numpy.frombuffer(output_data, numpy.uint8), 0, output_buffer) + elif self.__output_info.bit_depth == 24: + output_data = numpy.array(output_data, numpy.uint32) + output_data = numpy.frombuffer(output_data, numpy.uint8) + output_data = numpy.delete(output_data, range(0, len(output_data), 4)) + output_buffer = numpy.insert(output_data, 0, output_buffer) + elif self.__output_info.bit_depth == 16: + output_data = numpy.array(output_data, numpy.uint16) + output_buffer = numpy.insert( + numpy.frombuffer(output_data, numpy.uint8), 0, output_buffer) + if len(output_buffer) >= constants.PACKET_DATA_SIZE: + self.__sock.sendto( + self.__output_header + output_buffer[:constants.PACKET_DATA_SIZE].tobytes(), + (str(self._sink_ip), int(self._sink_port))) + output_buffer = output_buffer[constants.PACKET_DATA_SIZE:] logger.debug("[Sink:%s] Mixer thread exit", self._sink_ip) close_all_pipes() diff --git a/src/audio/source_ffmpeg_processor.py b/src/audio/source_ffmpeg_processor.py index 78e303b..ab535c1 100644 --- a/src/audio/source_ffmpeg_processor.py +++ b/src/audio/source_ffmpeg_processor.py @@ -43,6 +43,7 @@ def __init__(self, tag, def __get_ffmpeg_inputs(self) -> List[str]: """Add an input for each source""" ffmpeg_command: List[str] = [] + bit_depth = self.__source_stream_attributes.bit_depth sample_rate = self.__source_stream_attributes.sample_rate channels = self.__source_stream_attributes.channels channel_layout = self.__source_stream_attributes.channel_layout @@ -60,7 +61,7 @@ def __get_ffmpeg_inputs(self) -> List[str]: "-fflags", "nobuffer", "-thread_queue_size", "128", "-channel_layout", f"{channel_layout}", - "-f", "s32le", + "-f", f"s{bit_depth}le", "-ac", f"{channels}", "-ar", f"{sample_rate}", "-i", f"pipe:{self.__ffmpeg_input_pipe}"]) @@ -110,7 +111,7 @@ def __get_ffmpeg_output(self) -> List[str]: "-fflags", "nobuffer", "-avioflags", "direct", "-y", - "-f", f"s{self.__sink_info.bit_depth}le", + "-f", "s32le", "-ac", f"{self.__sink_info.channels}", "-ar", f"{self.__sink_info.sample_rate}", f"pipe:{self.__ffmpeg_output_pipe}"]) # ffmpeg PCM output diff --git a/src/audio/source_input_processor.py b/src/audio/source_input_processor.py index dea2c6e..471a2c2 100644 --- a/src/audio/source_input_processor.py +++ b/src/audio/source_input_processor.py @@ -187,22 +187,18 @@ def run(self) -> None: if ready[0]: data: bytes = os.read(self.writer_read, constants.PACKET_SIZE) self.update_source_attributes_and_open_source(data[:constants.PACKET_HEADER_SIZE]) - if self.stream_attributes.bit_depth == 16: + if self.using_ffmpeg: + os.write(self.ffmpeg_write, data[constants.PACKET_HEADER_SIZE:]) + elif self.stream_attributes.bit_depth == 16: pcm_data = numpy.frombuffer(data[constants.PACKET_HEADER_SIZE:], numpy.int16) pcm_data = numpy.array(pcm_data * self.source_info.volume, numpy.int32) pcm_data = numpy.left_shift(pcm_data, 16) # Two 32-bit packets per upscaled 16-bit packet pcm_data = self.equalizer(pcm_data) - if self.using_ffmpeg: - os.write(self.ffmpeg_write, - pcm_data[:constants.PACKET_DATA_SIZE_INT32].tobytes()) - os.write(self.ffmpeg_write, - pcm_data[constants.PACKET_DATA_SIZE_INT32:].tobytes()) - else: - os.write(self.source_input_fd, - pcm_data[:constants.PACKET_DATA_SIZE_INT32].tobytes()) - os.write(self.source_input_fd, - pcm_data[constants.PACKET_DATA_SIZE_INT32:].tobytes()) + os.write(self.source_input_fd, + pcm_data[:constants.PACKET_DATA_SIZE_INT32].tobytes()) + os.write(self.source_input_fd, + pcm_data[constants.PACKET_DATA_SIZE_INT32:].tobytes()) elif self.stream_attributes.bit_depth == 24: # Pad 24-bit to make it 32-bit pcm_data = numpy.frombuffer(data[constants.PACKET_HEADER_SIZE:], numpy.int8) @@ -210,31 +206,19 @@ def run(self) -> None: pcm_data = numpy.frombuffer(pcm_data, numpy.int32) pcm_data = numpy.array(pcm_data * self.source_info.volume, numpy.int32) pcm_data = numpy.insert(pcm_data, 0, samples_left_over) - if self.using_ffmpeg: - os.write(self.ffmpeg_write, + os.write(self.source_input_fd, pcm_data[:constants.PACKET_DATA_SIZE_INT32].tobytes()) - else: - os.write(self.source_input_fd, - pcm_data[:constants.PACKET_DATA_SIZE_INT32].tobytes()) if len(pcm_data) >= (constants.PACKET_DATA_SIZE_INT32 * 2): - if self.using_ffmpeg: - os.write(self.ffmpeg_write, - pcm_data[constants.PACKET_DATA_SIZE_INT32: - (constants.PACKET_DATA_SIZE_INT32*2)].tobytes()) - else: - os.write(self.source_input_fd, - pcm_data[constants.PACKET_DATA_SIZE_INT32: - (constants.PACKET_DATA_SIZE_INT32*2)].tobytes()) + os.write(self.source_input_fd, + pcm_data[constants.PACKET_DATA_SIZE_INT32: + (constants.PACKET_DATA_SIZE_INT32*2)].tobytes()) samples_left_over = pcm_data[(constants.PACKET_DATA_SIZE_INT32 * 2):] else: samples_left_over = pcm_data[constants.PACKET_DATA_SIZE_INT32:] elif self.stream_attributes.bit_depth == 32: pcm_data = numpy.frombuffer(data[constants.PACKET_HEADER_SIZE:], numpy.int32) pcm_data = numpy.array(pcm_data * self.source_info.volume, numpy.int32) - if self.using_ffmpeg: - os.write(self.ffmpeg_write, pcm_data.tobytes()) - else: - os.write(self.source_input_fd, pcm_data.tobytes()) + os.write(self.source_input_fd, pcm_data.tobytes()) self.update_activity() if not self.ffmpeg_handler is None: self.ffmpeg_handler.stop() diff --git a/src/configuration/configuration_manager.py b/src/configuration/configuration_manager.py index c685eec..922e753 100644 --- a/src/configuration/configuration_manager.py +++ b/src/configuration/configuration_manager.py @@ -14,7 +14,8 @@ import src.screamrouter_logger.screamrouter_logger as screamrouter_logger from src.api.api_webstream import APIWebStream from src.audio.audio_controller import AudioController -from src.audio.udp_receiver import UDPReceiver +from src.audio.scream_receiver import ScreamReceiver +from src.audio.rtp_recevier import RTPReceiver from src.configuration.configuration_solver import ConfigurationSolver from src.plugin_manager.plugin_manager import PluginManager from src.screamrouter_types.annotations import (DelayType, RouteNameType, @@ -52,8 +53,10 @@ def __init__(self, websocket: APIWebStream, plugin_manager: PluginManager): """Condition to indicate the Configuration Manager needs to reload""" self.running: bool = True """Rather the thread is running or not""" - self.receiver: UDPReceiver = UDPReceiver([]) + self.scream_recevier: ScreamReceiver = ScreamReceiver([]) """Holds the thread that receives UDP packets from Scream""" + self.rtp_receiver: RTPReceiver = RTPReceiver([]) + """Holds the thread that receives UDP packets from an RTP source""" self.reload_config: bool = False """Set to true to reload the config. Used so config changes during another config reload still trigger @@ -283,7 +286,8 @@ def stop(self) -> bool: self.__api_webstream.stop() _logger.debug("Webstream stopped") _logger.debug("Stopping receiver") - self.receiver.stop() + self.scream_recevier.stop() + self.rtp_receiver.stop() _logger.debug("Receiver stopped") _logger.debug("Stopping Plugin Manager") self.plugin_manager.stop_registered_plugins() @@ -584,7 +588,8 @@ def __process_and_apply_configuration(self) -> None: original_audio_controllers: List[AudioController] = copy(self.audio_controllers) if len(changed_sinks) > 0 or len(removed_sinks) > 0 or len(added_sinks) > 0: - self.receiver.stop() + self.scream_recevier.stop() + self.rtp_receiver.stop() _logger.debug("[Configuration Manager] Removing and re-adding changed sinks") @@ -628,7 +633,9 @@ def __process_and_apply_configuration(self) -> None: # Check if there was a change before reloading or saving if len(changed_sinks) > 0 or len(removed_sinks) > 0 or len(added_sinks) > 0: - self.receiver = UDPReceiver([audio_controller.controller_write_fd for + self.scream_recevier = ScreamReceiver([audio_controller.controller_write_fd for + audio_controller in self.audio_controllers]) + self.rtp_receiver = RTPReceiver([audio_controller.controller_write_fd for audio_controller in self.audio_controllers]) _logger.debug("[Configuration Manager] Saving configuration") self.__save_config() diff --git a/src/constants/constants.py b/src/constants/constants.py index 7a19831..5b06b01 100644 --- a/src/constants/constants.py +++ b/src/constants/constants.py @@ -4,8 +4,10 @@ # User Configurable Options # ########## -RECEIVER_PORT: int = 16401 -"""This is the port for the receiver""" +SCREAM_RECEIVER_PORT: int = 16401 +"""This is the port to receive Scream data at""" +RTP_RECEIVER_PORT: int = 40000 +"""This is the port to receive RTP data at""" SINK_PORT: int = 4010 """This is the port for a Scream Sink""" API_PORT: int = 443 @@ -44,7 +46,7 @@ PACKET_DATA_SIZE: int = 1152 """This is the packet size minus the header""" -PACKET_DATA_SIZE_INT32: int = int(PACKET_DATA_SIZE / 8) +PACKET_DATA_SIZE_INT32: int = int(PACKET_DATA_SIZE / 4) """This is the number of int32's in a packet""" PACKET_HEADER_SIZE: int = 5 """This is the packet header size"""