diff --git a/.vscode/settings.json b/.vscode/settings.json index 9aaf323..7d2b31d 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,4 +1,8 @@ { "python.REPL.enableREPLSmartSend": false, "pylint.args": [], + "files.exclude": { + "**/*.c": true, + "**/*.so": true + }, } \ No newline at end of file diff --git a/build/lib.linux-x86_64-3.9/audio_controller.cpython-39-x86_64-linux-gnu.so b/build/lib.linux-x86_64-3.9/audio_controller.cpython-39-x86_64-linux-gnu.so new file mode 100755 index 0000000..497fcff Binary files /dev/null and b/build/lib.linux-x86_64-3.9/audio_controller.cpython-39-x86_64-linux-gnu.so differ diff --git a/build/lib.linux-x86_64-3.9/mp3_header_parser.cpython-39-x86_64-linux-gnu.so b/build/lib.linux-x86_64-3.9/mp3_header_parser.cpython-39-x86_64-linux-gnu.so new file mode 100755 index 0000000..89cccfa Binary files /dev/null and b/build/lib.linux-x86_64-3.9/mp3_header_parser.cpython-39-x86_64-linux-gnu.so differ diff --git a/build/lib.linux-x86_64-3.9/rtp_recevier.cpython-39-x86_64-linux-gnu.so b/build/lib.linux-x86_64-3.9/rtp_recevier.cpython-39-x86_64-linux-gnu.so new file mode 100755 index 0000000..dae8335 Binary files /dev/null and b/build/lib.linux-x86_64-3.9/rtp_recevier.cpython-39-x86_64-linux-gnu.so differ diff --git a/build/lib.linux-x86_64-3.9/scream_header_parser.cpython-39-x86_64-linux-gnu.so b/build/lib.linux-x86_64-3.9/scream_header_parser.cpython-39-x86_64-linux-gnu.so new file mode 100755 index 0000000..532d789 Binary files /dev/null and b/build/lib.linux-x86_64-3.9/scream_header_parser.cpython-39-x86_64-linux-gnu.so differ diff --git a/build/lib.linux-x86_64-3.9/scream_receiver.cpython-39-x86_64-linux-gnu.so b/build/lib.linux-x86_64-3.9/scream_receiver.cpython-39-x86_64-linux-gnu.so new file mode 100755 index 0000000..8786f1c Binary files /dev/null and b/build/lib.linux-x86_64-3.9/scream_receiver.cpython-39-x86_64-linux-gnu.so differ diff --git a/build/lib.linux-x86_64-3.9/sink_mp3_processor.cpython-39-x86_64-linux-gnu.so b/build/lib.linux-x86_64-3.9/sink_mp3_processor.cpython-39-x86_64-linux-gnu.so new file mode 100755 index 0000000..9ee19ee Binary files /dev/null and b/build/lib.linux-x86_64-3.9/sink_mp3_processor.cpython-39-x86_64-linux-gnu.so differ diff --git a/build/lib.linux-x86_64-3.9/sink_output_mixer.cpython-39-x86_64-linux-gnu.so b/build/lib.linux-x86_64-3.9/sink_output_mixer.cpython-39-x86_64-linux-gnu.so new file mode 100755 index 0000000..1a1b296 Binary files /dev/null and b/build/lib.linux-x86_64-3.9/sink_output_mixer.cpython-39-x86_64-linux-gnu.so differ diff --git a/build/lib.linux-x86_64-3.9/source_input_processor.cpython-39-x86_64-linux-gnu.so b/build/lib.linux-x86_64-3.9/source_input_processor.cpython-39-x86_64-linux-gnu.so new file mode 100755 index 0000000..214c9ac Binary files /dev/null and b/build/lib.linux-x86_64-3.9/source_input_processor.cpython-39-x86_64-linux-gnu.so differ diff --git a/build/lib.linux-x86_64-3.9/src/audio/audio_controller.cpython-39-x86_64-linux-gnu.so b/build/lib.linux-x86_64-3.9/src/audio/audio_controller.cpython-39-x86_64-linux-gnu.so new file mode 100755 index 0000000..651cd63 Binary files /dev/null and b/build/lib.linux-x86_64-3.9/src/audio/audio_controller.cpython-39-x86_64-linux-gnu.so differ diff --git a/build/lib.linux-x86_64-3.9/src/audio/mp3_header_parser.cpython-39-x86_64-linux-gnu.so b/build/lib.linux-x86_64-3.9/src/audio/mp3_header_parser.cpython-39-x86_64-linux-gnu.so new file mode 100755 index 0000000..0ffb7be Binary files /dev/null and b/build/lib.linux-x86_64-3.9/src/audio/mp3_header_parser.cpython-39-x86_64-linux-gnu.so differ diff --git a/build/lib.linux-x86_64-3.9/src/audio/rtp_recevier.cpython-39-x86_64-linux-gnu.so b/build/lib.linux-x86_64-3.9/src/audio/rtp_recevier.cpython-39-x86_64-linux-gnu.so new file mode 100755 index 0000000..cefccea Binary files /dev/null and b/build/lib.linux-x86_64-3.9/src/audio/rtp_recevier.cpython-39-x86_64-linux-gnu.so differ diff --git a/build/lib.linux-x86_64-3.9/src/audio/scream_header_parser.cpython-39-x86_64-linux-gnu.so b/build/lib.linux-x86_64-3.9/src/audio/scream_header_parser.cpython-39-x86_64-linux-gnu.so new file mode 100755 index 0000000..98b5dff Binary files /dev/null and b/build/lib.linux-x86_64-3.9/src/audio/scream_header_parser.cpython-39-x86_64-linux-gnu.so differ diff --git a/build/lib.linux-x86_64-3.9/src/audio/scream_receiver.cpython-39-x86_64-linux-gnu.so b/build/lib.linux-x86_64-3.9/src/audio/scream_receiver.cpython-39-x86_64-linux-gnu.so new file mode 100755 index 0000000..f8bd125 Binary files /dev/null and b/build/lib.linux-x86_64-3.9/src/audio/scream_receiver.cpython-39-x86_64-linux-gnu.so differ diff --git a/build/lib.linux-x86_64-3.9/src/audio/sink_mp3_processor.cpython-39-x86_64-linux-gnu.so b/build/lib.linux-x86_64-3.9/src/audio/sink_mp3_processor.cpython-39-x86_64-linux-gnu.so new file mode 100755 index 0000000..956addc Binary files /dev/null and b/build/lib.linux-x86_64-3.9/src/audio/sink_mp3_processor.cpython-39-x86_64-linux-gnu.so differ diff --git a/build/lib.linux-x86_64-3.9/src/audio/sink_output_mixer.cpython-39-x86_64-linux-gnu.so b/build/lib.linux-x86_64-3.9/src/audio/sink_output_mixer.cpython-39-x86_64-linux-gnu.so new file mode 100755 index 0000000..323f5e9 Binary files /dev/null and b/build/lib.linux-x86_64-3.9/src/audio/sink_output_mixer.cpython-39-x86_64-linux-gnu.so differ diff --git a/build/lib.linux-x86_64-3.9/src/audio/source_input_processor.cpython-39-x86_64-linux-gnu.so b/build/lib.linux-x86_64-3.9/src/audio/source_input_processor.cpython-39-x86_64-linux-gnu.so new file mode 100755 index 0000000..d3f1548 Binary files /dev/null and b/build/lib.linux-x86_64-3.9/src/audio/source_input_processor.cpython-39-x86_64-linux-gnu.so differ diff --git a/build/lib.linux-x86_64-3.9/src/audio/tcp_manager.cpython-39-x86_64-linux-gnu.so b/build/lib.linux-x86_64-3.9/src/audio/tcp_manager.cpython-39-x86_64-linux-gnu.so new file mode 100755 index 0000000..36f164d Binary files /dev/null and b/build/lib.linux-x86_64-3.9/src/audio/tcp_manager.cpython-39-x86_64-linux-gnu.so differ diff --git a/build/lib.linux-x86_64-3.9/tcp_manager.cpython-39-x86_64-linux-gnu.so b/build/lib.linux-x86_64-3.9/tcp_manager.cpython-39-x86_64-linux-gnu.so new file mode 100755 index 0000000..5d3a237 Binary files /dev/null and b/build/lib.linux-x86_64-3.9/tcp_manager.cpython-39-x86_64-linux-gnu.so differ diff --git a/build/temp.linux-x86_64-3.9/src/audio/audio_controller.o b/build/temp.linux-x86_64-3.9/src/audio/audio_controller.o new file mode 100644 index 0000000..fe3d30e Binary files /dev/null and b/build/temp.linux-x86_64-3.9/src/audio/audio_controller.o differ diff --git a/build/temp.linux-x86_64-3.9/src/audio/mp3_header_parser.o b/build/temp.linux-x86_64-3.9/src/audio/mp3_header_parser.o new file mode 100644 index 0000000..e5773d7 Binary files /dev/null and b/build/temp.linux-x86_64-3.9/src/audio/mp3_header_parser.o differ diff --git a/build/temp.linux-x86_64-3.9/src/audio/rtp_recevier.o b/build/temp.linux-x86_64-3.9/src/audio/rtp_recevier.o new file mode 100644 index 0000000..fc509b1 Binary files /dev/null and b/build/temp.linux-x86_64-3.9/src/audio/rtp_recevier.o differ diff --git a/build/temp.linux-x86_64-3.9/src/audio/scream_header_parser.o b/build/temp.linux-x86_64-3.9/src/audio/scream_header_parser.o new file mode 100644 index 0000000..5b9d2dc Binary files /dev/null and b/build/temp.linux-x86_64-3.9/src/audio/scream_header_parser.o differ diff --git a/build/temp.linux-x86_64-3.9/src/audio/scream_receiver.o b/build/temp.linux-x86_64-3.9/src/audio/scream_receiver.o new file mode 100644 index 0000000..68b4518 Binary files /dev/null and b/build/temp.linux-x86_64-3.9/src/audio/scream_receiver.o differ diff --git a/build/temp.linux-x86_64-3.9/src/audio/sink_mp3_processor.o b/build/temp.linux-x86_64-3.9/src/audio/sink_mp3_processor.o new file mode 100644 index 0000000..745d194 Binary files /dev/null and b/build/temp.linux-x86_64-3.9/src/audio/sink_mp3_processor.o differ diff --git a/build/temp.linux-x86_64-3.9/src/audio/sink_output_mixer.o b/build/temp.linux-x86_64-3.9/src/audio/sink_output_mixer.o new file mode 100644 index 0000000..182f929 Binary files /dev/null and b/build/temp.linux-x86_64-3.9/src/audio/sink_output_mixer.o differ diff --git a/build/temp.linux-x86_64-3.9/src/audio/source_input_processor.o b/build/temp.linux-x86_64-3.9/src/audio/source_input_processor.o new file mode 100644 index 0000000..453fba5 Binary files /dev/null and b/build/temp.linux-x86_64-3.9/src/audio/source_input_processor.o differ diff --git a/build/temp.linux-x86_64-3.9/src/audio/tcp_manager.o b/build/temp.linux-x86_64-3.9/src/audio/tcp_manager.o new file mode 100644 index 0000000..4673af6 Binary files /dev/null and b/build/temp.linux-x86_64-3.9/src/audio/tcp_manager.o differ diff --git a/kill_restart.sh b/kill_restart.sh new file mode 100755 index 0000000..093e86d --- /dev/null +++ b/kill_restart.sh @@ -0,0 +1 @@ +kill -9 $(ps -ef | grep -iE "scream|ffmpeg" | awk '{print $2}');./screamrouter.py diff --git a/screamrouter.py b/screamrouter.py index 04e88f0..fe25a1b 100755 --- a/screamrouter.py +++ b/screamrouter.py @@ -5,6 +5,7 @@ import signal import sys import threading +import pyximport import uvicorn from fastapi import FastAPI @@ -18,6 +19,8 @@ from src.screamrouter_logger.screamrouter_logger import get_logger from src.utils.utils import set_process_name +pyximport.install() + os.nice(-15) logger = get_logger(__name__) diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..f539686 --- /dev/null +++ b/setup.py @@ -0,0 +1,20 @@ +"""Setup script for cpython for src.audio module""" +from distutils.extension import Extension +from setuptools import setup +from Cython.Build import cythonize + +packages = [Extension("src.audio.audio_controller",["src/audio/audio_controller.py"]), + Extension("src.audio.mp3_header_parser",["src/audio/mp3_header_parser.py"]), + Extension("src.audio.rtp_recevier",["src/audio/rtp_recevier.py"]), + Extension("src.audio.scream_header_parser",["src/audio/scream_header_parser.py"]), + Extension("src.audio.scream_receiver",["src/audio/scream_receiver.py"]), + Extension("src.audio.sink_mp3_processor",["src/audio/sink_mp3_processor.py"]), + Extension("src.audio.sink_output_mixer",["src/audio/sink_output_mixer.py"]), + Extension("src.audio.source_input_processor",["src/audio/source_input_processor.py"]), + Extension("src.audio.tcp_manager",["src/audio/tcp_manager.py"])] + +setup( + name='ScreamRouter', + ext_modules=cythonize(packages), + packages=["audio"] +) diff --git a/site/dialog_views/add_edit_sink.html.jinja b/site/dialog_views/add_edit_sink.html.jinja index ed2078d..e57189a 100644 --- a/site/dialog_views/add_edit_sink.html.jinja +++ b/site/dialog_views/add_edit_sink.html.jinja @@ -10,7 +10,7 @@ {{- inputs.text("Sink IP", "ip", true, "Sink IP", data['ip']) }}
{{- inputs.text_number("Sink Port", "port", 1, 65535, true, "Sink Port", data['port']) }}
- + @@ -39,7 +39,10 @@
- {{- inputs.text_number("Delay in ms", "delay", 0, 5000, True, "Delay", data['delay']) }}

+ {{- inputs.text_number("Delay in ms", "delay", 0, 5000, True, "Delay", data['delay']) }}
+ +
+ {{- inputs.text_number("Timesync Delay in ms", "time_sync_delay", 0, 5000, True, "Timesync Delay", data['time_sync_delay']) }}

{{- buttons.button("dialog_submit_close", 'Add Sink' if request_info['add_new'] else 'Edit Sink' , "dialog-button", {}, "Submit")}} {{- buttons.button("dialog_cancel", 'Close' , "dialog-button", {}, "Cancel")}} diff --git a/site/screamrouter.js.jinja b/site/screamrouter.js.jinja index bee298f..0b4f799 100644 --- a/site/screamrouter.js.jinja +++ b/site/screamrouter.js.jinja @@ -117,7 +117,9 @@ function dialog_submit(close) { if (entry.id == undefined) { continue; } - if (entry.type == "select-multiple") { + if (entry.type == "checkbox") { + result[entry.id] = entry.checked; + } else if (entry.type == "select-multiple") { var result_array = []; for (option in entry.selectedOptions) { var value = entry.selectedOptions[option].value; diff --git a/src/audio/audio_controller.py b/src/audio/audio_controller.py index a7d2512..3b0f117 100644 --- a/src/audio/audio_controller.py +++ b/src/audio/audio_controller.py @@ -1,4 +1,5 @@ """One audio controller per sink, handles taking in packets and distributing them to sources""" +import fcntl import multiprocessing import os import select @@ -6,7 +7,7 @@ from ctypes import c_bool from queue import Empty from subprocess import TimeoutExpired -from typing import Dict, List +from typing import Dict, List, Optional from src.audio.mp3_ffmpeg_process import MP3FFMpegProcess from src.audio.sink_mp3_processor import SinkMP3Processor @@ -28,7 +29,9 @@ class AudioController(multiprocessing.Process): This thread listens to the input queue and passes it to each Source processor""" def __init__(self, sink_info: SinkDescription, - sources: List[SourceDescription], websocket: APIWebStream): + sources: List[SourceDescription], + tcp_fd: Optional[int], + websocket: APIWebStream): """Initialize a sink queue""" super().__init__(name=f"[Sink {sink_info.name}] Audio Controller") logger.info("[Sink %s] Loading audio controller", sink_info.name) @@ -72,6 +75,8 @@ def __init__(self, sink_info: SinkDescription, self.controller_read_fd, self.controller_write_fd = os.pipe() self.running = multiprocessing.Value(c_bool, True) """Multiprocessing-passed flag to determine if the thread is running""" + self.request_restart = multiprocessing.Value(c_bool, False) + """Set true if we want a config reload""" logger.info("[Sink:%s] Queue %s", self.sink_info.ip, self.controller_write_fd) if self.sources_lock.acquire(timeout=1): for source in self.__controller_sources: @@ -95,11 +100,7 @@ def __init__(self, sink_info: SinkDescription, self.mp3_ffmpeg_output_write: int self.mp3_ffmpeg_input_read, self.mp3_ffmpeg_input_write = os.pipe() self.mp3_ffmpeg_output_read, self.mp3_ffmpeg_output_write = os.pipe() - self.pcm_thread: SinkOutputMixer = SinkOutputMixer(self.sink_info.ip, - self.sink_info.port, - self.stream_info, - list(self.sources.values()), - self.mp3_ffmpeg_input_write) + self.pcm_thread: SinkOutputMixer """Holds the thread to listen to PCM output from a Source""" self.mp3_ffmpeg_processor = MP3FFMpegProcess(f"[Sink {self.sink_info.ip}] MP3 Process", @@ -112,8 +113,18 @@ def __init__(self, sink_info: SinkDescription, self.mp3_ffmpeg_output_read, self.webstream.queue) """Holds the thread to generaet MP3 output from a PCM reader""" + self.pcm_thread = SinkOutputMixer(self.sink_info, + self.stream_info, + tcp_fd, + list(self.sources.values()), + self.mp3_ffmpeg_input_write) self.start() + def restart_mixer(self, tcp_client_fd: Optional[int]): + """(Re)starts the mixer""" + logger.info("[Audio Controller] Requesting config reloaed") + self.request_restart = c_bool(True) + def get_open_sources(self) -> List[SourceInputProcessor]: """Build a list of active IPs, exclude ones that aren't open""" active_sources: List[SourceInputProcessor] = [] @@ -149,7 +160,7 @@ def stop(self) -> None: source.stop() logger.debug("[Sink:%s] Stopped source %s", self.sink_info.ip, source.name) logger.debug("[Sink:%s] Stopping Audio Controller", self.sink_info.ip) - self.running.value = c_bool(False) + self.running.value = c_bool(False) # type: ignore if constants.WAIT_FOR_CLOSES: logger.debug("[Sink:%s] Waiting for Audio Controller Stop", self.sink_info.ip) @@ -162,12 +173,14 @@ def stop(self) -> None: close_pipe(self.controller_read_fd) close_pipe(self.controller_write_fd) - def wants_restart(self) -> bool: + def wants_reload(self) -> bool: """Returns true of any of the sources want a restart""" flag: bool = False + if self.request_restart.value: + return True for source in self.sources.values(): if source.wants_restart.value: - source.wants_restart.value = c_bool(False) + source.wants_restart.value = c_bool(False) # type: ignore flag = True return flag diff --git a/src/audio/mp3_header_parser.py b/src/audio/mp3_header_parser.py index 2728edd..121c96b 100644 --- a/src/audio/mp3_header_parser.py +++ b/src/audio/mp3_header_parser.py @@ -309,11 +309,11 @@ def __mp3_process_header(self, header_data: bytes) -> None: for i in range(5, 7): if byte_2[i] != 1: raise InvalidHeaderException("Invalid MP3 Header (Second byte has invalid marker)") - self.mpeg_version = numpy.packbits(byte_2[3:5], bitorder='little')[0] - self.layer_description = numpy.packbits(byte_2[1:2], bitorder='little')[0] + self.mpeg_version = int(numpy.packbits(byte_2[3:5], bitorder='little')[0]) + self.layer_description = int(numpy.packbits(byte_2[1:2], bitorder='little')[0]) self.protected = byte_2[0] == 1 - self.bitrate_index = numpy.packbits(byte_3[4:8], bitorder='little')[0] - self.samplerate_index = numpy.packbits(byte_3[2:3], bitorder='little')[0] + self.bitrate_index = int(numpy.packbits(byte_3[4:8], bitorder='little')[0]) + self.samplerate_index = int(numpy.packbits(byte_3[2:3], bitorder='little')[0]) self.padding = byte_3[1] self.private = byte_3[0] == 1 self.channelmode = numpy.packbits(byte_4[6:8], bitorder='little')[0] @@ -321,7 +321,7 @@ def __mp3_process_header(self, header_data: bytes) -> None: self.copyright = byte_4[3] == 1 self.original = byte_4[2] == 1 self.emphasis = numpy.packbits(byte_4[0:2], bitorder='little')[0] - self.samplerate = self.__mp3_process_samplerate(self.mpeg_version, self.samplerate_index) + self.samplerate = int(self.__mp3_process_samplerate(self.mpeg_version, self.samplerate_index)) self.bitrate = self.__mp3_parse_bitrate(self.bitrate_index, 1 if self.mpeg_version == 3 else 2, (3 - self.layer_description) + 1) diff --git a/src/audio/rtp_recevier.py b/src/audio/rtp_recevier.py index 676ef45..8c0d765 100644 --- a/src/audio/rtp_recevier.py +++ b/src/audio/rtp_recevier.py @@ -31,7 +31,7 @@ def __init__(self, controller_write_fd_list: List[int]): """Multiprocessing-passed flag to determine if the thread is running""" self.known_ips = multiprocessing.Manager().list() if len(controller_write_fd_list) == 0: # Will be zero if this is just a placeholder. - self.running.value = c_bool(False) + self.running.value = c_bool(False) # type: ignore return self.start() @@ -39,7 +39,7 @@ def stop(self) -> None: """Stops the Receiver and all sinks""" logger.info("[RTP Receiver] Stopping") was_running: bool = bool(self.running.value) - self.running.value = c_bool(False) + self.running.value = c_bool(False) # type: ignore if constants.KILL_AT_CLOSE: try: self.kill() @@ -64,27 +64,29 @@ def run(self) -> None: stereo_header = create_stream_info(16, 48000, 2, "stereo") while self.running.value: - ready = select.select([self.sock], [], [], .3) - if ready[0]: - data, addr = self.sock.recvfrom(constants.PACKET_SIZE + 500) - rtp_packet = RTP.fromBytearray(bytearray(data)) - if not rtp_packet.payloadType in [PayloadType.L16_1chan, - PayloadType.L16_2chan, - PayloadType.DYNAMIC_127]: - logger.warning("Can only decode 16-bit LPCM, unsupported type %s from %s:%s", - rtp_packet.payloadType, addr[0], addr[1]) - continue - if addr[0] not in self.known_ips: - self.known_ips.append(addr[0]) - padded_tag: bytes - padded_tag = bytes(addr[0].encode("ascii")) - padded_tag += bytes([0] * (constants.TAG_MAX_LENGTH - len(addr[0]))) - header: bytes = mono_header.header - if rtp_packet.payloadType in [PayloadType.L16_2chan, - PayloadType.DYNAMIC_127]: - header = stereo_header.header - for controller_write_fd in self.controller_write_fd_list: - os.write(controller_write_fd, padded_tag + header + rtp_packet.payload) - + try: + ready = select.select([self.sock], [], [], .3) + if ready[0]: + data, addr = self.sock.recvfrom(constants.PACKET_SIZE + 500) + rtp_packet = RTP.fromBytearray(bytearray(data)) + if not rtp_packet.payloadType in [PayloadType.L16_1chan, + PayloadType.L16_2chan, + PayloadType.DYNAMIC_127]: + logger.warning("Can only decode S16_LE, unsupported type %s from %s:%s", + rtp_packet.payloadType, addr[0], addr[1]) + continue + if addr[0] not in self.known_ips: + self.known_ips.append(addr[0]) + padded_tag: bytes + padded_tag = bytes(addr[0].encode("ascii")) + padded_tag += bytes([0] * (constants.TAG_MAX_LENGTH - len(addr[0]))) + header: bytes = bytes(mono_header.header) + if rtp_packet.payloadType in [PayloadType.L16_2chan, + PayloadType.DYNAMIC_127]: + header = bytes(stereo_header.header) + for controller_write_fd in self.controller_write_fd_list: + os.write(controller_write_fd, padded_tag + header + rtp_packet.payload) + except ValueError: + pass logger.info("[RTP Receiver] Main thread stopped") close_all_pipes() diff --git a/src/audio/scream_header_parser.py b/src/audio/scream_header_parser.py index ab61b28..35fe559 100644 --- a/src/audio/scream_header_parser.py +++ b/src/audio/scream_header_parser.py @@ -36,7 +36,7 @@ def __init__(self, scream_header: Union[bytearray, bytes]): sample_rate_base: int = 44100 if sample_rate_bits[7] == 1 else 48000 sample_rate_bits = numpy.delete(sample_rate_bits, 7) # Remove the uppermost bit # Convert it back into a number without the top bit, this is the multiplier - sample_rate_multiplier: int = numpy.packbits(sample_rate_bits,bitorder='little')[0] + sample_rate_multiplier: int = int(numpy.packbits(sample_rate_bits,bitorder='little')[0]) if sample_rate_multiplier < 1: sample_rate_multiplier = 1 # Bypassing pydantic verification for these @@ -49,7 +49,7 @@ def __init__(self, scream_header: Union[bytearray, bytes]): self.channel_mask: bytes = scream_header_array[3:] # type: ignore """Channel Mask""" self.channel_layout: ChannelLayoutType - self.channel_layout = self.__parse_channel_mask(scream_header_array[3:]) # type: ignore + self.channel_layout = self.__parse_channel_mask(bytes(scream_header_array[3:])) # type: ignore """Holds the channel layout""" self.header: bytes = scream_header_array """Holds the raw header bytes""" diff --git a/src/audio/scream_receiver.py b/src/audio/scream_receiver.py index 17e371b..b582fc9 100644 --- a/src/audio/scream_receiver.py +++ b/src/audio/scream_receiver.py @@ -19,7 +19,7 @@ class ScreamReceiver(multiprocessing.Process): def __init__(self, controller_write_fd_list: List[int]): """Receives UDP packets and sends them to known queue lists""" super().__init__(name="Scream Receiver Thread") - self.sock: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.sock: socket.socket """Main socket all sources send to""" self.controller_write_fd_list: List[int] = controller_write_fd_list """List of all sink queues to forward data to""" @@ -27,7 +27,7 @@ def __init__(self, controller_write_fd_list: List[int]): """Multiprocessing-passed flag to determine if the thread is running""" self.known_ips = multiprocessing.Manager().list() if len(controller_write_fd_list) == 0: # Will be zero if this is just a placeholder. - self.running.value = c_bool(False) + self.running.value = c_bool(False) # type: ignore return self.start() @@ -35,7 +35,7 @@ def stop(self) -> None: """Stops the Receiver and all sinks""" logger.info("[Scream Receiver] Stopping") was_running: bool = bool(self.running.value) - self.running.value = c_bool(False) + self.running.value = c_bool(False) # type: ignore if constants.KILL_AT_CLOSE: try: self.kill() @@ -52,20 +52,26 @@ def run(self) -> None: set_process_name("ScreamReceiver", "Scream Receiver Thread") logger.debug("[Scream Receiver] Receiver Thread PID %s", os.getpid()) logger.info("[Scream Receiver] Receiver started on port %s", constants.SCREAM_RECEIVER_PORT) + self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, constants.PACKET_SIZE * 1024 * 1024) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.bind(("", constants.SCREAM_RECEIVER_PORT)) while self.running.value: - ready = select.select([self.sock], [], [], .3) - if ready[0]: - data, addr = self.sock.recvfrom(constants.PACKET_SIZE) - addrlen = len(addr[0]) - if addr[0] not in self.known_ips: - self.known_ips.append(addr[0]) - for controller_write_fd in self.controller_write_fd_list: - os.write(controller_write_fd, - bytes(addr[0].encode("ascii") + bytes([0] * (constants.TAG_MAX_LENGTH - addrlen)) + data)) + try: + ready = select.select([self.sock], [], [], .3) + if ready[0]: + data, addr = self.sock.recvfrom(constants.PACKET_SIZE) + addrlen = len(addr[0]) + if addr[0] not in self.known_ips: + self.known_ips.append(addr[0]) + for controller_write_fd in self.controller_write_fd_list: + os.write(controller_write_fd, + bytes(addr[0].encode("ascii") + + bytes([0] * (constants.TAG_MAX_LENGTH - addrlen)) + + data)) + except ValueError: + pass logger.info("[Scream Receiver] Main thread stopped") close_all_pipes() diff --git a/src/audio/sink_mp3_processor.py b/src/audio/sink_mp3_processor.py index d81f022..bfe5869 100644 --- a/src/audio/sink_mp3_processor.py +++ b/src/audio/sink_mp3_processor.py @@ -71,16 +71,16 @@ def __read_header(self) -> Tuple[MP3Header, bytes]: if bytesin[0] == 255: header = bytesin + self._read_bytes(constants.MP3_HEADER_LENGTH - 1, 0) try: - header_parsed: MP3Header = MP3Header(header) - return (header_parsed, header) + header_parsed: MP3Header = MP3Header(bytes(header)) + return (header_parsed, bytes(header)) except InvalidHeaderException as exc: logger.warning("[Sink:%s] Bad MP3 Header: %s", self._sink_ip, exc) if bytes_searched == max_bytes_to_search: raise InvalidHeaderException( f"[Sink: {self._sink_ip}] Couldn't find MP3 header after ID3 header") else: - header_parsed: MP3Header = MP3Header(header) - return (header_parsed, header) + header_parsed: MP3Header = MP3Header(bytes(header)) + return (header_parsed, bytes(header)) raise InvalidHeaderException("Invalid header") def run(self) -> None: diff --git a/src/audio/sink_output_mixer.py b/src/audio/sink_output_mixer.py index 0d6f8d9..9f59276 100644 --- a/src/audio/sink_output_mixer.py +++ b/src/audio/sink_output_mixer.py @@ -1,5 +1,6 @@ """Thread to mix each source together for the result Do mixing in process instead of forwarding to ffmpeg for latency""" +import ctypes import multiprocessing import multiprocessing.process import os @@ -8,36 +9,53 @@ from ctypes import c_bool from subprocess import TimeoutExpired import time -from typing import List +from typing import List, Optional import numpy +import ntplib +from src.screamrouter_types.configuration import SinkDescription from src.audio.source_input_processor import SourceInputProcessor import src.constants.constants as constants from src.audio.scream_header_parser import ScreamHeader from src.screamrouter_logger.screamrouter_logger import get_logger -from src.screamrouter_types.annotations import IPAddressType, PortType from src.utils.utils import close_all_pipes, set_process_name logger = get_logger(__name__) class SinkOutputMixer(multiprocessing.Process): """Handles listening for PCM output from sources and sends it to sinks""" - def __init__(self, sink_ip: IPAddressType, - sink_port: PortType, output_info: ScreamHeader, + def __init__(self, sink_info: SinkDescription, + output_info: ScreamHeader, + tcp_client_fd: Optional[int], sources: List[SourceInputProcessor], ffmpeg_mp3_write_fd: int): super().__init__(name="[Sink:{sink_ip}] PCM Thread") - self._sink_ip: IPAddressType = sink_ip - """Holds the sink IP for the web api to filter based on""" + self.sink_info: SinkDescription = sink_info + """SinkDescription holding sink properties""" self.running = multiprocessing.Value(c_bool, True) """Multiprocessing-passed flag to determine if the thread is running""" - self.__sock: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.__sock: socket.socket """Output socket for sink""" - self._sink_port: PortType = sink_port + self.tcp_client_fd: Optional[int] = tcp_client_fd + if self.tcp_client_fd is not None: + try: + self.__sock = socket.socket(fileno=self.tcp_client_fd) + self.__sock.setblocking(False) + #self.__sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1152*4) + self.__sock.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, 0x28) + self.__sock.settimeout(15) + except OSError: + self.tcp_client_fd = None + self.__sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + except ValueError: + self.tcp_client_fd = None + self.__sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + else: + self.__sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.__output_header: bytes = output_info.header self.__output_info: ScreamHeader = output_info """Holds the header added onto packets sent to Scream receivers""" - self._sink_ip = sink_ip self.read_fds: List[int] = [] self.sources_open: List = [] for source in sources: @@ -69,11 +87,17 @@ def read_bytes(self, fd: int, count: int, timeout: float = 0, first: bool = Fals def run(self) -> None: """Reads PCM output from sources, mixes it together, attaches a header, sends it to sink.""" - set_process_name("PCMThread", f"[Sink {self._sink_ip}] Mixer") - logger.debug("[Sink %s] Mixer Thread PID %s", self._sink_ip, os.getpid()) - self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, constants.PACKET_SIZE * 65535) + set_process_name("PCMThread", f"[Sink {self.sink_info.ip}] Mixer") + logger.debug("[Sink %s] Mixer Thread PID %s", self.sink_info.ip, os.getpid()) output_data = numpy.array(constants.PACKET_DATA_SIZE, numpy.int64) output_buffer = numpy.array([], numpy.int8) + ntp_offset: int = 0 + try: + ntp_offset = int(ntplib.NTPClient().request('192.168.3.3', + version=3).offset) + except ntplib.NTPException: + pass + packet_count: int = 0 while self.running.value: first_value: bool = True for index, fd in enumerate(self.read_fds): @@ -95,33 +119,76 @@ def run(self) -> None: os.write( self.ffmpeg_mp3_write_fd, numpy.array(output_data, numpy.int32).tobytes()) - if self.__output_info.bit_depth == 32: - output_data = numpy.array(output_data, numpy.uint32) - output_buffer = numpy.insert( - numpy.frombuffer(output_data, numpy.uint8), 0, output_buffer) - elif self.__output_info.bit_depth == 24: - output_data = numpy.array(output_data, numpy.uint32) - output_data = numpy.frombuffer(output_data, numpy.uint8) + output_data = numpy.array(output_data, numpy.uint32) + output_data = numpy.frombuffer(output_data, numpy.uint8) + if self.__output_info.bit_depth == 24: + output_data = numpy.delete(output_data, range(0, len(output_data), 4)) + if self.__output_info.bit_depth == 16: output_data = numpy.delete(output_data, range(0, len(output_data), 4)) - output_buffer = numpy.insert(output_data, 0, output_buffer) - elif self.__output_info.bit_depth == 16: - output_data = numpy.array(output_data, numpy.uint16) - output_buffer = numpy.insert( - numpy.frombuffer(output_data, numpy.uint8), 0, output_buffer) + output_data = numpy.delete(output_data, range(0, len(output_data), 3)) + output_buffer = numpy.insert(output_data, 0, output_buffer) + if len(output_buffer) >= constants.PACKET_DATA_SIZE: - self.__sock.sendto( - self.__output_header + output_buffer[:constants.PACKET_DATA_SIZE].tobytes(), - (str(self._sink_ip), int(self._sink_port))) + final_buffer: bytes + if self.sink_info.time_sync: + time_bytes: bytes = bytes(ctypes.c_uint64(int(time.time() * 1000) + + ntp_offset + + constants.SYNCED_TIME_BUFFER + + self.sink_info.time_sync_delay)) + #time_bytes: bytes = bytes(ctypes.c_uint64(int(packet_count))) + packet_count += 1 + final_buffer = (bytes(self.__output_header) + + time_bytes + + output_buffer[:constants.PACKET_DATA_SIZE].tobytes()) + else: + final_buffer = (self.__output_header + + output_buffer[:constants.PACKET_DATA_SIZE].tobytes()) + if (not self.sink_info.ip is None and + not self.sink_info.port is None): + if self.tcp_client_fd is None: + self.__sock.sendto(final_buffer, + (str(self.sink_info.ip), int(self.sink_info.port))) + else: + try: + if len(final_buffer) in [constants.PACKET_SIZE, + constants.PACKET_SIZE + 8]: + self.__sock.send(final_buffer[5:]) + except ConnectionResetError: # Lost TCP, go back to UDP + logger.warning("[Sink %s] Lost TCP (Reset)", self.sink_info.ip) + self.tcp_client_fd = None + self.__sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + except BlockingIOError: + pass + #logger.warning("TCP BlockingIOError") + #self.tcp_client_fd = None + #self.__sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + except BrokenPipeError: + logger.warning("[Sink %s] Lost TCP (BrokenPipeError)", + self.sink_info.ip) + self.tcp_client_fd = None + self.__sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + except socket.timeout: + logger.warning("[Sink %s] Lost TCP (Timeout)", self.sink_info.ip) + self.tcp_client_fd = None + self.__sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + except TimeoutError: + logger.warning("[Sink %s] Lost TCP (Timeout)", self.sink_info.ip) + self.tcp_client_fd = None + self.__sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) output_buffer = output_buffer[constants.PACKET_DATA_SIZE:] - logger.debug("[Sink:%s] Mixer thread exit", self._sink_ip) + logger.debug("[Sink:%s] Mixer thread exit", self.sink_info.ip) + if not self.tcp_client_fd is None: + self.__sock.detach() close_all_pipes() - def stop(self) -> None: + def stop(self, wait_for_close: bool = False) -> None: """Stop""" - self.running.value = c_bool(False) - if constants.KILL_AT_CLOSE: + self.running.value = c_bool(False) # type: ignore + if not self.tcp_client_fd is None: + self.__sock.detach() + if constants.KILL_AT_CLOSE or wait_for_close: self.kill() - if constants.WAIT_FOR_CLOSES: + if constants.WAIT_FOR_CLOSES or wait_for_close: try: self.join(5) except TimeoutExpired: diff --git a/src/audio/source_ffmpeg_processor.py b/src/audio/source_ffmpeg_processor.py index 31b9304..3d10822 100644 --- a/src/audio/source_ffmpeg_processor.py +++ b/src/audio/source_ffmpeg_processor.py @@ -115,6 +115,7 @@ def __get_ffmpeg_output(self) -> List[str]: "-y", "-f", "s32le", "-ac", f"{self.__sink_info.channels}", + "-channel_layout", f"{self.__sink_info.channel_layout}", "-ar", f"{self.__sink_info.sample_rate}", f"pipe:{self.__ffmpeg_output_pipe}"]) # ffmpeg PCM output return ffmpeg_command_parts diff --git a/src/audio/source_input_processor.py b/src/audio/source_input_processor.py index bc0dcc9..f69f637 100644 --- a/src/audio/source_input_processor.py +++ b/src/audio/source_input_processor.py @@ -65,26 +65,26 @@ def __init__(self, tag: str, if self.stream_attributes.sample_rate != self.sink_info.sample_rate: logger.debug("[Source %s] Using ffmpeg because sample rate", self.source_info.ip) + logger.debug("[Source %s] Sample Rate %s != %s", + self.source_info.ip, + self.stream_attributes.sample_rate, + self.sink_info.sample_rate) if self.stream_attributes.channels != self.sink_info.channels: logger.debug("[Source %s] Using ffmpeg because channels", self.source_info.ip) + logger.debug("[Source %s] Channels %s != %s", + self.source_info.ip, + self.stream_attributes.channels, + self.sink_info.channels) if self.stream_attributes.channel_layout != self.sink_info.channel_layout: logger.debug("[Source %s] Using ffmpeg because channel layout", self.source_info.ip) + logger.debug("[Source %s] Channel_Layout %s != %s", + self.source_info.ip, + self.stream_attributes.channel_layout, + self.sink_info.channel_layout) if self.source_info.equalizer != Equalizer(): logger.debug("[Source %s] Using ffmpeg because equalizer", self.source_info.ip) if self.source_info.delay != 0: logger.debug("[Source %s] Using ffmpeg because delay", self.source_info.ip) - if self.stream_attributes.sample_rate != self.sink_info.sample_rate: - print("Sample Rate Difference") - if self.stream_attributes.channels != self.sink_info.channels: - print("Channels Difference") - if self.stream_attributes.channel_layout != self.sink_info.channel_layout: - print("Channel Layout Difference") - if self.source_info.equalizer != Equalizer(): - print("Equalizer Difference") - print(self.source_info.equalizer) - print(Equalizer()) - if self.source_info.delay != float(0): - print("Delay Difference") self.ffmpeg_read: int self.ffmpeg_write: int self.ffmpeg_read, self.ffmpeg_write = os.pipe() @@ -118,10 +118,43 @@ def update_source_attributes_and_open_source(self, f"{parsed_scream_header.sample_rate}k", f"{parsed_scream_header.channel_layout} layout."])) self.stream_attributes = parsed_scream_header - self.wants_restart.value = c_bool(True) + self.wants_restart.value = c_bool(True) # type: ignore if self.using_ffmpeg: if not self.ffmpeg_handler is None: self.ffmpeg_handler.stop() + self.using_ffmpeg = ( + self.stream_attributes.sample_rate != self.sink_info.sample_rate or + self.stream_attributes.channels != self.sink_info.channels or + self.stream_attributes.channel_layout != self.sink_info.channel_layout or + self.source_info.equalizer != Equalizer() or + self.source_info.delay != 0) + + if self.stream_attributes.sample_rate != self.sink_info.sample_rate: + logger.debug("[Source %s] Using ffmpeg because sample rate", + self.source_info.ip) + logger.debug("[Source %s] Sample Rate %s != %s", + self.source_info.ip, + self.stream_attributes.sample_rate, + self.sink_info.sample_rate) + if self.stream_attributes.channels != self.sink_info.channels: + logger.debug("[Source %s] Using ffmpeg because channels", + self.source_info.ip) + logger.debug("[Source %s] Channels %s != %s", + self.source_info.ip, + self.stream_attributes.channels, + self.sink_info.channels) + if self.stream_attributes.channel_layout != self.sink_info.channel_layout: + logger.debug("[Source %s] Using ffmpeg because channel layout", + self.source_info.ip) + logger.debug("[Source %s] Channel_Layout %s != %s", + self.source_info.ip, + self.stream_attributes.channel_layout, + self.sink_info.channel_layout) + if self.source_info.equalizer != Equalizer(): + logger.debug("[Source %s] Using ffmpeg because equalizer", self.source_info.ip) + if self.source_info.delay != 0: + logger.debug("[Source %s] Using ffmpeg because delay", self.source_info.ip) + if self.using_ffmpeg: self.ffmpeg_handler = SourceFFMpegProcessor( f"[Sink: {self.__sink_ip}][Source: {self.source_info.ip}]FFMpeg", self.source_input_fd, @@ -130,7 +163,7 @@ def update_source_attributes_and_open_source(self, self.stream_attributes, self.sink_info) self.update_activity() - self.is_open.value = c_bool(True) + self.is_open.value = c_bool(True) # type: ignore def check_if_inactive(self) -> None: """Looks for old pipes that are open and closes them""" @@ -139,14 +172,14 @@ def check_if_inactive(self) -> None: self.__sink_ip, self.tag, constants.SOURCE_INACTIVE_TIME_MS) - self.is_open.value = c_bool(False) + self.is_open.value = c_bool(False) # type: ignore os.write(self.source_input_fd, bytes([0] * constants.PACKET_DATA_SIZE)) def stop(self) -> None: """Fully stops and closes the source, closes fifo handles""" - self.running.value = c_bool(False) + self.running.value = c_bool(False) # type: ignore if self.is_open.value: - self.is_open.value = c_bool(False) + self.is_open.value = c_bool(False) # type: ignore logger.info("[Sink:%s][Source:%s] Stopping", self.__sink_ip, self.tag) if not self.ffmpeg_handler is None: self.ffmpeg_handler.stop() @@ -194,9 +227,19 @@ def run(self) -> None: self.sink_info) while self.running.value: self.check_if_inactive() - ready = select.select([self.writer_read], [], [], .01) + ready = select.select([self.writer_read], [], [], .3) if ready[0]: + self.update_activity() data: bytes = os.read(self.writer_read, constants.PACKET_SIZE) + if self.using_ffmpeg: + ready = select.select([], [self.ffmpeg_write], [], 0) + else: + ready = select.select([], [self.source_input_fd], [], 0) + if not ready[1]: + continue + if len(data) < constants.PACKET_SIZE: + logger.warning("Got bad packet length %s", len(data)) + continue self.update_source_attributes_and_open_source(data[:constants.PACKET_HEADER_SIZE]) if self.using_ffmpeg: os.write(self.ffmpeg_write, data[constants.PACKET_HEADER_SIZE:]) @@ -230,7 +273,6 @@ def run(self) -> None: pcm_data = numpy.frombuffer(data[constants.PACKET_HEADER_SIZE:], numpy.int32) pcm_data = numpy.array(pcm_data * self.source_info.volume, numpy.int32) os.write(self.source_input_fd, pcm_data.tobytes()) - self.update_activity() if not self.ffmpeg_handler is None: self.ffmpeg_handler.stop() close_all_pipes() diff --git a/src/audio/tcp_manager.py b/src/audio/tcp_manager.py new file mode 100644 index 0000000..2b8f58d --- /dev/null +++ b/src/audio/tcp_manager.py @@ -0,0 +1,78 @@ +"""Receiver, handles a port for listening for sources to send UDP packets to + Puts received data in sink queues""" +import threading +import socket +from subprocess import TimeoutExpired +import time +from typing import List, Optional, Tuple + +from src.audio.audio_controller import AudioController +from src.screamrouter_types.annotations import IPAddressType +import src.constants.constants as constants +from src.screamrouter_logger.screamrouter_logger import get_logger + +logger = get_logger(__name__) + +class TCPManager(threading.Thread): + """Handles the TCP Sender socket""" + def __init__(self, audio_controllers: List[AudioController]): + """Listens for incoming connections on TCP and passes them to listeners""" + super().__init__(name="TCP Connection Manager") + self.sock: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + """Main socket all sources send to""" + self.audio_controllers: List[AudioController] = audio_controllers + """List of all sink queues to forward data to""" + self.running: bool = True + """Multiprocessing-passed flag to determine if the thread is running""" + bound: bool = False + while not bound: + try: + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.sock.bind(("0.0.0.0", 4010)) + bound = True + except OSError: + logger.warning("Binding to port %s failed, retrying in .5 seconds", 4010) + time.sleep(.5) + self.known_connections: List[Tuple[IPAddressType, socket.socket]] = [] + self.sock.listen(12321312) + self.start() + + def stop(self) -> None: + """Stops the Sender""" + logger.info("[TCP Manager] Stopping") + was_running: bool = self.running + self.running = False + for connection in self.known_connections: + connection[1].close() + self.sock.close() + if constants.WAIT_FOR_CLOSES and was_running: + try: + self.join(5) + except TimeoutExpired: + logger.warning("TCP Manager failed to close") + + def replace_mixers(self, audio_controllers: List[AudioController]) -> None: + """Replace mixers with a new set""" + self.audio_controllers = audio_controllers + + def get_fd(self, sink_ip: Optional[IPAddressType]) -> Optional[int]: + """Returns the FD for a sink IP, None if not found""" + if sink_ip is None: + return None + for connection in reversed(self.known_connections): + if str(connection[0]) == str(sink_ip): + return connection[1].fileno() + return None + + def run(self) -> None: + while self.running: + client, address = self.sock.accept() + logger.info("[TCP Manager] New connection from %s", address[0]) + self.known_connections.append((address[0], client)) + for audio_controller in self.audio_controllers: + logger.info("Comparing IP %s to %s", + audio_controller.pcm_thread.sink_info.ip, address[0]) + if str(audio_controller.pcm_thread.sink_info.ip) == str(address[0]): + logger.info("[TCP Manager] Wrote FD to mixer %s, fd %s", + address[0], client.fileno()) + audio_controller.restart_mixer(client.fileno()) diff --git a/src/configuration/configuration_manager.py b/src/configuration/configuration_manager.py index b8a401d..afc36fd 100644 --- a/src/configuration/configuration_manager.py +++ b/src/configuration/configuration_manager.py @@ -1,6 +1,6 @@ """This manages the target state of sinks, sources, and routes then runs audio controllers for each source""" -import os +import os import socket import sys import threading @@ -17,6 +17,7 @@ import dns.rrset import yaml +from src.audio.tcp_manager import TCPManager import src.constants.constants as constants import src.screamrouter_logger.screamrouter_logger as screamrouter_logger from src.api.api_webstream import APIWebStream @@ -64,6 +65,7 @@ def __init__(self, websocket: APIWebStream, plugin_manager: PluginManager): """Holds the thread that receives UDP packets from Scream""" self.rtp_receiver: RTPReceiver = RTPReceiver([]) """Holds the thread that receives UDP packets from an RTP source""" + self.tcp_manager: TCPManager = TCPManager([]) self.reload_config: bool = False """Set to true to reload the config. Used so config changes during another config reload still trigger @@ -317,6 +319,7 @@ def stop(self) -> bool: _logger.debug("[Configuration Manager] Stopping receiver") self.scream_recevier.stop() self.rtp_receiver.stop() + self.tcp_manager.stop() _logger.debug("[Configuration Manager] Receiver stopped") _logger.debug("[Configuration Manager] Stopping Plugin Manager") self.plugin_manager.stop_registered_plugins() @@ -526,6 +529,15 @@ def __find_added_removed_changed_sinks(self) -> Tuple[List[SinkDescription], if sink not in changed_sinks: changed_sinks.append(sink) + # Add sinks requesting a reload + for audio_controller in self.audio_controllers: + if audio_controller.wants_reload(): + _logger.info("[Configuration Manager] Controller %s wants a reload", audio_controller.sink_info.name) + for sink in new_map.keys(): + if sink.name == audio_controller.sink_info.name: + if sink not in changed_sinks: + changed_sinks.append(sink) + return added_sinks, removed_sinks, changed_sinks @@ -636,7 +648,7 @@ def __process_and_apply_configuration(self) -> None: # Load a new controller sources: List[SourceDescription] sources = self.active_configuration.real_sinks_to_real_sources[sink] - audio_controller = AudioController(sink, sources, self.__api_webstream) + audio_controller = AudioController(sink, sources, self.tcp_manager.get_fd(sink.ip), self.__api_webstream) _logger.debug("[Configuration Manager] Adding Audio Controller %s", sink.name) self.audio_controllers.append(audio_controller) @@ -658,7 +670,7 @@ def __process_and_apply_configuration(self) -> None: # Load a new controller sources: List[SourceDescription] sources = self.active_configuration.real_sinks_to_real_sources[sink] - audio_controller = AudioController(sink, sources, self.__api_webstream) + audio_controller = AudioController(sink, sources, self.tcp_manager.get_fd(sink.ip), self.__api_webstream) _logger.debug("[Configuration Manager] Adding Audio Controller %s", sink.name) self.audio_controllers.append(audio_controller) @@ -674,6 +686,7 @@ def __process_and_apply_configuration(self) -> None: controller_write_fds: List[int] = [] for audio_controller in self.audio_controllers: controller_write_fds.append(audio_controller.controller_write_fd) + self.tcp_manager.replace_mixers(self.audio_controllers) self.plugin_manager.load_registered_plugins(controller_write_fds) _logger.debug("[Configuration Manager] Reload done") @@ -746,6 +759,9 @@ def run(self): # while it's already reloading if self.plugin_manager.wants_reload(): self.reload_config = True + for audio_controller in self.audio_controllers: + if audio_controller.wants_reload(): + self.reload_config = True while self.reload_config: self.reload_config = False _logger.info("[Configuration Manager] Reloading the configuration") diff --git a/src/constants/constants.py b/src/constants/constants.py index b71b5d3..b1b197a 100644 --- a/src/constants/constants.py +++ b/src/constants/constants.py @@ -31,7 +31,7 @@ """Show ffmpeg output""" DEBUG_MULTIPROCESSING: bool = False """Debugs Multiprocessing to stdout.""" -SOURCE_INACTIVE_TIME_MS: int = 35 +SOURCE_INACTIVE_TIME_MS: int = 350 """Inactive time for a source before it's closed. Some plugins may need this raised. If this is too long there will be gaps when a source stops sending.""" @@ -39,6 +39,9 @@ """SSL Cert""" CERTIFICATE_KEY: str = "/root/screamrouter/cert/privkey.pem" """SSL Cert Key""" +SYNCED_TIME_BUFFER: int = 5 +"""Number of ms that synced players play out in the future + negative delays can be up to this amount""" # ########## # Internal Options @@ -58,7 +61,7 @@ """On configuration reload, wait for existing processes to close before starting new processes, mostly for testing. Configuration changes reload faster when set to False.""" -KILL_AT_CLOSE: bool = False +KILL_AT_CLOSE: bool = True """Closes quickly but leaves lingering processes, doesn't reload any faster than disabling WAIT_FOR_CLOSES.""" EXIT_HACK: bool = False diff --git a/src/plugins/play_url.py b/src/plugins/play_url.py index d3b6e7a..c4e0e35 100644 --- a/src/plugins/play_url.py +++ b/src/plugins/play_url.py @@ -60,8 +60,14 @@ def start_plugin(self): def stop_plugin(self): """This is called when the plugin is stopped. You may perform shutdown tasks here.""" - os.close(self.ffmpeg_read) - os.close(self.ffmpeg_write) + try: + os.close(self.ffmpeg_read) + except OSError: + pass + try: + os.close(self.ffmpeg_write) + except OSError: + pass def load_plugin(self): """This is called when the configuration is loaded.""" diff --git a/src/plugins/play_url_multiple.py b/src/plugins/play_url_multiple.py index 4909471..015ea4f 100644 --- a/src/plugins/play_url_multiple.py +++ b/src/plugins/play_url_multiple.py @@ -166,8 +166,14 @@ def stop(self): self.join(5) except subprocess.TimeoutExpired: logger.warning("Play URL Multiple failed to close") - close_pipe(self.fifo_read) - close_pipe(self.fifo_write) + try: + os.close(self.fifo_read) + except OSError: + pass + try: + os.close(self.fifo_write) + except OSError: + pass def check_ffmpeg_done(self) -> bool: """Checks if ffmpeg is done""" diff --git a/src/screamrouter_types/configuration.py b/src/screamrouter_types/configuration.py index b4a3de8..1da8f0a 100644 --- a/src/screamrouter_types/configuration.py +++ b/src/screamrouter_types/configuration.py @@ -64,8 +64,12 @@ def __eq__(self, other): def __hash__(self): """Returns a hash""" hashstr: str = "" - for field_name in self.model_fields: - hashstr = hashstr + f"{field_name}:{getattr(self, field_name)}" + for field_name, field_info in self.model_fields.items(): + try: + hashstr = hashstr + f"{field_name}:{getattr(self, field_name)}" + except AttributeError: + setattr(self, field_name, field_info.default) + hashstr = hashstr + f"{field_name}:{getattr(self, field_name)}" return hash(hashstr) def __mul__(self, other): @@ -124,6 +128,10 @@ class SinkDescription(BaseModel): b7=1, b8=1, b9=1, b10=1, b11=1, b12=1, b13=1, b14=1, b15=1, b16=1, b17=1, b18=1) """Audio Equalizer""" + time_sync: bool = False + """Rather the sink is timesynced (Normal Scream receivers are not compatible)""" + time_sync_delay: int = 0 + """Delay for time sync in ms""" def __eq__(self, other): """Compares the name if a string. @@ -149,8 +157,12 @@ def __eq__(self, other): def __hash__(self): """Returns a hash""" hashstr: str = "" - for field_name in self.model_fields: - hashstr = hashstr + f"{field_name}:{getattr(self, field_name)}" + for field_name, field_info in self.model_fields.items(): + try: + hashstr = hashstr + f"{field_name}:{getattr(self, field_name)}" + except AttributeError: + setattr(self, field_name, field_info.default) + hashstr = hashstr + f"{field_name}:{getattr(self, field_name)}" return hash(hashstr) class SourceDescription(BaseModel): @@ -206,8 +218,12 @@ def __eq__(self, other): def __hash__(self): """Returns a hash""" hashstr: str = "" - for field_name in self.model_fields: - hashstr = hashstr + f"{field_name}:{getattr(self, field_name)}" + for field_name, field_info in self.model_fields.items(): + try: + hashstr = hashstr + f"{field_name}:{getattr(self, field_name)}" + except AttributeError: + setattr(self, field_name, field_info.default) + hashstr = hashstr + f"{field_name}:{getattr(self, field_name)}" return hash(hashstr) @@ -256,6 +272,10 @@ def __eq__(self, other): def __hash__(self): """Returns a hash""" hashstr: str = "" - for field_name in self.model_fields: - hashstr = hashstr + f"{field_name}:{getattr(self, field_name)}" + for field_name, field_info in self.model_fields.items(): + try: + hashstr = hashstr + f"{field_name}:{getattr(self, field_name)}" + except AttributeError: + setattr(self, field_name, field_info.default) + hashstr = hashstr + f"{field_name}:{getattr(self, field_name)}" return hash(hashstr)