Skip to content

Commit

Permalink
Add TCP handler, various bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
netham45 committed Jul 7, 2024
1 parent 34949c7 commit 533a9fe
Show file tree
Hide file tree
Showing 48 changed files with 422 additions and 129 deletions.
4 changes: 4 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
{
"python.REPL.enableREPLSmartSend": false,
"pylint.args": [],
"files.exclude": {
"**/*.c": true,
"**/*.so": true
},
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
1 change: 1 addition & 0 deletions kill_restart.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
kill -9 $(ps -ef | grep -iE "scream|ffmpeg" | awk '{print $2}');./screamrouter.py
3 changes: 3 additions & 0 deletions screamrouter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import signal
import sys
import threading
import pyximport

import uvicorn
from fastapi import FastAPI
Expand All @@ -18,6 +19,8 @@
from src.screamrouter_logger.screamrouter_logger import get_logger
from src.utils.utils import set_process_name

pyximport.install()

os.nice(-15)

logger = get_logger(__name__)
Expand Down
20 changes: 20 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Setup script for cpython for src.audio module"""
from distutils.extension import Extension
from setuptools import setup
from Cython.Build import cythonize

packages = [Extension("src.audio.audio_controller",["src/audio/audio_controller.py"]),
Extension("src.audio.mp3_header_parser",["src/audio/mp3_header_parser.py"]),
Extension("src.audio.rtp_recevier",["src/audio/rtp_recevier.py"]),
Extension("src.audio.scream_header_parser",["src/audio/scream_header_parser.py"]),
Extension("src.audio.scream_receiver",["src/audio/scream_receiver.py"]),
Extension("src.audio.sink_mp3_processor",["src/audio/sink_mp3_processor.py"]),
Extension("src.audio.sink_output_mixer",["src/audio/sink_output_mixer.py"]),
Extension("src.audio.source_input_processor",["src/audio/source_input_processor.py"]),
Extension("src.audio.tcp_manager",["src/audio/tcp_manager.py"])]

setup(
name='ScreamRouter',
ext_modules=cythonize(packages),
packages=["audio"]
)
9 changes: 6 additions & 3 deletions site/dialog_views/add_edit_sink.html.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
{{- inputs.text("Sink IP", "ip", true, "Sink IP", data['ip']) }}<BR />
{{- inputs.text_number("Sink Port", "port", 1, 65535, true, "Sink Port", data['port']) }}<BR />
<SPAN>
<LABEL FOR="bit_depth">Bit Depth</LABEL>
<LABEL FOR="bit_depth">Bit Depth</LABEL>
<SELECT ID="bit_depth">
<OPTION VALUE="16"{{' SELECTED' if data['bit_depth'] == 16}}>16</OPTION>
<OPTION VALUE="24"{{' SELECTED' if data['bit_depth'] == 24}}>24</OPTION>
Expand All @@ -29,7 +29,7 @@
</SPAN><BR />
{{- inputs.text_number("Sink Channels", "channels", 1, 8, True, "Sink Channels", data['channels']) }}<BR />
<SPAN>
<LABEL FOR="channel_layout">Channel Layout</LABEL>
<LABEL FOR="channel_layout">Channel Layout</LABEL>
<SELECT ID="channel_layout">
<OPTION VALUE="mono"{{' SELECTED' if data['channel_layout'] =='mono'}}>Mono</OPTION>
<OPTION VALUE="stereo"{{' SELECTED' if data['channel_layout'] =='stereo'}}>Stereo</OPTION>
Expand All @@ -39,7 +39,10 @@
<OPTION VALUE="7.1"{{' SELECTED' if data['channel_layout'] =='7.1'}}>7.1</OPTION>
</SELECT>
</SPAN><BR />
{{- inputs.text_number("Delay in ms", "delay", 0, 5000, True, "Delay", data['delay']) }}<BR /><BR />
{{- inputs.text_number("Delay in ms", "delay", 0, 5000, True, "Delay", data['delay']) }}<BR />
<LABEL FOR="checkbox">Time Sync Protocol</LABEL>
<INPUT TYPE="checkbox" NAME="time_sync" ID="time_sync"{{ " checked" if data['time_sync'] }}><BR/>
{{- inputs.text_number("Timesync Delay in ms", "time_sync_delay", 0, 5000, True, "Timesync Delay", data['time_sync_delay']) }}<BR /><BR />
{{- buttons.button("dialog_submit_close", 'Add Sink' if request_info['add_new'] else 'Edit Sink' , "dialog-button", {}, "Submit")}}
{{- buttons.button("dialog_cancel", 'Close' , "dialog-button", {}, "Cancel")}}
</DIV>
4 changes: 3 additions & 1 deletion site/screamrouter.js.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ function dialog_submit(close) {
if (entry.id == undefined) {
continue;
}
if (entry.type == "select-multiple") {
if (entry.type == "checkbox") {
result[entry.id] = entry.checked;
} else if (entry.type == "select-multiple") {
var result_array = [];
for (option in entry.selectedOptions) {
var value = entry.selectedOptions[option].value;
Expand Down
33 changes: 23 additions & 10 deletions src/audio/audio_controller.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
"""One audio controller per sink, handles taking in packets and distributing them to sources"""
import fcntl
import multiprocessing
import os
import select
import threading
from ctypes import c_bool
from queue import Empty
from subprocess import TimeoutExpired
from typing import Dict, List
from typing import Dict, List, Optional

from src.audio.mp3_ffmpeg_process import MP3FFMpegProcess
from src.audio.sink_mp3_processor import SinkMP3Processor
Expand All @@ -28,7 +29,9 @@ class AudioController(multiprocessing.Process):
This thread listens to the input queue and passes it to each Source processor"""

def __init__(self, sink_info: SinkDescription,
sources: List[SourceDescription], websocket: APIWebStream):
sources: List[SourceDescription],
tcp_fd: Optional[int],
websocket: APIWebStream):
"""Initialize a sink queue"""
super().__init__(name=f"[Sink {sink_info.name}] Audio Controller")
logger.info("[Sink %s] Loading audio controller", sink_info.name)
Expand Down Expand Up @@ -72,6 +75,8 @@ def __init__(self, sink_info: SinkDescription,
self.controller_read_fd, self.controller_write_fd = os.pipe()
self.running = multiprocessing.Value(c_bool, True)
"""Multiprocessing-passed flag to determine if the thread is running"""
self.request_restart = multiprocessing.Value(c_bool, False)
"""Set true if we want a config reload"""
logger.info("[Sink:%s] Queue %s", self.sink_info.ip, self.controller_write_fd)
if self.sources_lock.acquire(timeout=1):
for source in self.__controller_sources:
Expand All @@ -95,11 +100,7 @@ def __init__(self, sink_info: SinkDescription,
self.mp3_ffmpeg_output_write: int
self.mp3_ffmpeg_input_read, self.mp3_ffmpeg_input_write = os.pipe()
self.mp3_ffmpeg_output_read, self.mp3_ffmpeg_output_write = os.pipe()
self.pcm_thread: SinkOutputMixer = SinkOutputMixer(self.sink_info.ip,
self.sink_info.port,
self.stream_info,
list(self.sources.values()),
self.mp3_ffmpeg_input_write)
self.pcm_thread: SinkOutputMixer
"""Holds the thread to listen to PCM output from a Source"""

self.mp3_ffmpeg_processor = MP3FFMpegProcess(f"[Sink {self.sink_info.ip}] MP3 Process",
Expand All @@ -112,8 +113,18 @@ def __init__(self, sink_info: SinkDescription,
self.mp3_ffmpeg_output_read,
self.webstream.queue)
"""Holds the thread to generaet MP3 output from a PCM reader"""
self.pcm_thread = SinkOutputMixer(self.sink_info,
self.stream_info,
tcp_fd,
list(self.sources.values()),
self.mp3_ffmpeg_input_write)
self.start()

def restart_mixer(self, tcp_client_fd: Optional[int]):
"""(Re)starts the mixer"""
logger.info("[Audio Controller] Requesting config reloaed")
self.request_restart = c_bool(True)

def get_open_sources(self) -> List[SourceInputProcessor]:
"""Build a list of active IPs, exclude ones that aren't open"""
active_sources: List[SourceInputProcessor] = []
Expand Down Expand Up @@ -149,7 +160,7 @@ def stop(self) -> None:
source.stop()
logger.debug("[Sink:%s] Stopped source %s", self.sink_info.ip, source.name)
logger.debug("[Sink:%s] Stopping Audio Controller", self.sink_info.ip)
self.running.value = c_bool(False)
self.running.value = c_bool(False) # type: ignore

if constants.WAIT_FOR_CLOSES:
logger.debug("[Sink:%s] Waiting for Audio Controller Stop", self.sink_info.ip)
Expand All @@ -162,12 +173,14 @@ def stop(self) -> None:
close_pipe(self.controller_read_fd)
close_pipe(self.controller_write_fd)

def wants_restart(self) -> bool:
def wants_reload(self) -> bool:
"""Returns true of any of the sources want a restart"""
flag: bool = False
if self.request_restart.value:
return True
for source in self.sources.values():
if source.wants_restart.value:
source.wants_restart.value = c_bool(False)
source.wants_restart.value = c_bool(False) # type: ignore
flag = True
return flag

Expand Down
10 changes: 5 additions & 5 deletions src/audio/mp3_header_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,19 +309,19 @@ def __mp3_process_header(self, header_data: bytes) -> None:
for i in range(5, 7):
if byte_2[i] != 1:
raise InvalidHeaderException("Invalid MP3 Header (Second byte has invalid marker)")
self.mpeg_version = numpy.packbits(byte_2[3:5], bitorder='little')[0]
self.layer_description = numpy.packbits(byte_2[1:2], bitorder='little')[0]
self.mpeg_version = int(numpy.packbits(byte_2[3:5], bitorder='little')[0])
self.layer_description = int(numpy.packbits(byte_2[1:2], bitorder='little')[0])
self.protected = byte_2[0] == 1
self.bitrate_index = numpy.packbits(byte_3[4:8], bitorder='little')[0]
self.samplerate_index = numpy.packbits(byte_3[2:3], bitorder='little')[0]
self.bitrate_index = int(numpy.packbits(byte_3[4:8], bitorder='little')[0])
self.samplerate_index = int(numpy.packbits(byte_3[2:3], bitorder='little')[0])
self.padding = byte_3[1]
self.private = byte_3[0] == 1
self.channelmode = numpy.packbits(byte_4[6:8], bitorder='little')[0]
self.modeextension = numpy.packbits(byte_4[4:6], bitorder='little')[0]
self.copyright = byte_4[3] == 1
self.original = byte_4[2] == 1
self.emphasis = numpy.packbits(byte_4[0:2], bitorder='little')[0]
self.samplerate = self.__mp3_process_samplerate(self.mpeg_version, self.samplerate_index)
self.samplerate = int(self.__mp3_process_samplerate(self.mpeg_version, self.samplerate_index))
self.bitrate = self.__mp3_parse_bitrate(self.bitrate_index,
1 if self.mpeg_version == 3 else 2,
(3 - self.layer_description) + 1)
Expand Down
50 changes: 26 additions & 24 deletions src/audio/rtp_recevier.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ def __init__(self, controller_write_fd_list: List[int]):
"""Multiprocessing-passed flag to determine if the thread is running"""
self.known_ips = multiprocessing.Manager().list()
if len(controller_write_fd_list) == 0: # Will be zero if this is just a placeholder.
self.running.value = c_bool(False)
self.running.value = c_bool(False) # type: ignore
return
self.start()

def stop(self) -> None:
"""Stops the Receiver and all sinks"""
logger.info("[RTP Receiver] Stopping")
was_running: bool = bool(self.running.value)
self.running.value = c_bool(False)
self.running.value = c_bool(False) # type: ignore
if constants.KILL_AT_CLOSE:
try:
self.kill()
Expand All @@ -64,27 +64,29 @@ def run(self) -> None:
stereo_header = create_stream_info(16, 48000, 2, "stereo")

while self.running.value:
ready = select.select([self.sock], [], [], .3)
if ready[0]:
data, addr = self.sock.recvfrom(constants.PACKET_SIZE + 500)
rtp_packet = RTP.fromBytearray(bytearray(data))
if not rtp_packet.payloadType in [PayloadType.L16_1chan,
PayloadType.L16_2chan,
PayloadType.DYNAMIC_127]:
logger.warning("Can only decode 16-bit LPCM, unsupported type %s from %s:%s",
rtp_packet.payloadType, addr[0], addr[1])
continue
if addr[0] not in self.known_ips:
self.known_ips.append(addr[0])
padded_tag: bytes
padded_tag = bytes(addr[0].encode("ascii"))
padded_tag += bytes([0] * (constants.TAG_MAX_LENGTH - len(addr[0])))
header: bytes = mono_header.header
if rtp_packet.payloadType in [PayloadType.L16_2chan,
PayloadType.DYNAMIC_127]:
header = stereo_header.header
for controller_write_fd in self.controller_write_fd_list:
os.write(controller_write_fd, padded_tag + header + rtp_packet.payload)

try:
ready = select.select([self.sock], [], [], .3)
if ready[0]:
data, addr = self.sock.recvfrom(constants.PACKET_SIZE + 500)
rtp_packet = RTP.fromBytearray(bytearray(data))
if not rtp_packet.payloadType in [PayloadType.L16_1chan,
PayloadType.L16_2chan,
PayloadType.DYNAMIC_127]:
logger.warning("Can only decode S16_LE, unsupported type %s from %s:%s",
rtp_packet.payloadType, addr[0], addr[1])
continue
if addr[0] not in self.known_ips:
self.known_ips.append(addr[0])
padded_tag: bytes
padded_tag = bytes(addr[0].encode("ascii"))
padded_tag += bytes([0] * (constants.TAG_MAX_LENGTH - len(addr[0])))
header: bytes = bytes(mono_header.header)
if rtp_packet.payloadType in [PayloadType.L16_2chan,
PayloadType.DYNAMIC_127]:
header = bytes(stereo_header.header)
for controller_write_fd in self.controller_write_fd_list:
os.write(controller_write_fd, padded_tag + header + rtp_packet.payload)
except ValueError:
pass
logger.info("[RTP Receiver] Main thread stopped")
close_all_pipes()
4 changes: 2 additions & 2 deletions src/audio/scream_header_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, scream_header: Union[bytearray, bytes]):
sample_rate_base: int = 44100 if sample_rate_bits[7] == 1 else 48000
sample_rate_bits = numpy.delete(sample_rate_bits, 7) # Remove the uppermost bit
# Convert it back into a number without the top bit, this is the multiplier
sample_rate_multiplier: int = numpy.packbits(sample_rate_bits,bitorder='little')[0]
sample_rate_multiplier: int = int(numpy.packbits(sample_rate_bits,bitorder='little')[0])
if sample_rate_multiplier < 1:
sample_rate_multiplier = 1
# Bypassing pydantic verification for these
Expand All @@ -49,7 +49,7 @@ def __init__(self, scream_header: Union[bytearray, bytes]):
self.channel_mask: bytes = scream_header_array[3:] # type: ignore
"""Channel Mask"""
self.channel_layout: ChannelLayoutType
self.channel_layout = self.__parse_channel_mask(scream_header_array[3:]) # type: ignore
self.channel_layout = self.__parse_channel_mask(bytes(scream_header_array[3:])) # type: ignore
"""Holds the channel layout"""
self.header: bytes = scream_header_array
"""Holds the raw header bytes"""
Expand Down
30 changes: 18 additions & 12 deletions src/audio/scream_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,23 @@ class ScreamReceiver(multiprocessing.Process):
def __init__(self, controller_write_fd_list: List[int]):
"""Receives UDP packets and sends them to known queue lists"""
super().__init__(name="Scream Receiver Thread")
self.sock: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock: socket.socket
"""Main socket all sources send to"""
self.controller_write_fd_list: List[int] = controller_write_fd_list
"""List of all sink queues to forward data to"""
self.running = multiprocessing.Value(c_bool, True)
"""Multiprocessing-passed flag to determine if the thread is running"""
self.known_ips = multiprocessing.Manager().list()
if len(controller_write_fd_list) == 0: # Will be zero if this is just a placeholder.
self.running.value = c_bool(False)
self.running.value = c_bool(False) # type: ignore
return
self.start()

def stop(self) -> None:
"""Stops the Receiver and all sinks"""
logger.info("[Scream Receiver] Stopping")
was_running: bool = bool(self.running.value)
self.running.value = c_bool(False)
self.running.value = c_bool(False) # type: ignore
if constants.KILL_AT_CLOSE:
try:
self.kill()
Expand All @@ -52,20 +52,26 @@ def run(self) -> None:
set_process_name("ScreamReceiver", "Scream Receiver Thread")
logger.debug("[Scream Receiver] Receiver Thread PID %s", os.getpid())
logger.info("[Scream Receiver] Receiver started on port %s", constants.SCREAM_RECEIVER_PORT)
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,
constants.PACKET_SIZE * 1024 * 1024)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind(("", constants.SCREAM_RECEIVER_PORT))

while self.running.value:
ready = select.select([self.sock], [], [], .3)
if ready[0]:
data, addr = self.sock.recvfrom(constants.PACKET_SIZE)
addrlen = len(addr[0])
if addr[0] not in self.known_ips:
self.known_ips.append(addr[0])
for controller_write_fd in self.controller_write_fd_list:
os.write(controller_write_fd,
bytes(addr[0].encode("ascii") + bytes([0] * (constants.TAG_MAX_LENGTH - addrlen)) + data))
try:
ready = select.select([self.sock], [], [], .3)
if ready[0]:
data, addr = self.sock.recvfrom(constants.PACKET_SIZE)
addrlen = len(addr[0])
if addr[0] not in self.known_ips:
self.known_ips.append(addr[0])
for controller_write_fd in self.controller_write_fd_list:
os.write(controller_write_fd,
bytes(addr[0].encode("ascii") +
bytes([0] * (constants.TAG_MAX_LENGTH - addrlen)) +
data))
except ValueError:
pass
logger.info("[Scream Receiver] Main thread stopped")
close_all_pipes()
8 changes: 4 additions & 4 deletions src/audio/sink_mp3_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,16 @@ def __read_header(self) -> Tuple[MP3Header, bytes]:
if bytesin[0] == 255:
header = bytesin + self._read_bytes(constants.MP3_HEADER_LENGTH - 1, 0)
try:
header_parsed: MP3Header = MP3Header(header)
return (header_parsed, header)
header_parsed: MP3Header = MP3Header(bytes(header))
return (header_parsed, bytes(header))
except InvalidHeaderException as exc:
logger.warning("[Sink:%s] Bad MP3 Header: %s", self._sink_ip, exc)
if bytes_searched == max_bytes_to_search:
raise InvalidHeaderException(
f"[Sink: {self._sink_ip}] Couldn't find MP3 header after ID3 header")
else:
header_parsed: MP3Header = MP3Header(header)
return (header_parsed, header)
header_parsed: MP3Header = MP3Header(bytes(header))
return (header_parsed, bytes(header))
raise InvalidHeaderException("Invalid header")

def run(self) -> None:
Expand Down
Loading

0 comments on commit 533a9fe

Please sign in to comment.