Skip to content

Commit

Permalink
Code cleanup, use ffmpeg for bitdepth conversion if ffmpeg is running…
Browse files Browse the repository at this point in the history
… anyways, add rtp stream support for PulseAudio sources
  • Loading branch information
netham45 committed May 30, 2024
1 parent 1fecdd2 commit f66bbbb
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 52 deletions.
10 changes: 10 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ The simplicity of Scream allows it to be very easy to work with while retaining
* Milkdrop Visualizations thanks to the browser-based [Butterchurn](https://github.com/jberg/butterchurn) project
* There are playback sinks available for common OSes such as [Windows](https://github.com/duncanthrax/scream/tree/master/Receivers/dotnet-windows/ScreamReader), [Linux](https://github.com/duncanthrax/scream/tree/master/Receivers/unix), and [Android](https://github.com/martinellimarco/scream-android/tree/90d1364ee36dd12ec9d7d2798926150b370030f3), as well as some embedded devices such as [the ESP32](http://tomeko.net/projects/esp32_rtp_pager/).
* Embedded noVNC for controlling remote computers playing music
* Can accept streams from PulseAudio

![Screenshot of ScreamRouter noVNC](/images/noVNC.png)

Expand Down Expand Up @@ -86,6 +87,15 @@ Each Source holds information for the source IP, volume, and the Source name.
![Screenshot of Add Source Dialog](/images/AddSource.png)


### PulseAudio

PulseAudio can send to ScreamRouter over RTP. The RTP recevier needs a set MTU size to match Scream's packet size. The following command will send the Dummy source to a ScreamRouter Receiver:

pactl load-module module-rtp-send format=s16le source=auto_null.monitor destination=<ScreamRouter Receiver> port=40000 mtu=1164

Currently the RTP receiver assumes 16-bit 44100kHz audio with 1152 bytes of PCM in the payload.


### Sink Groups
Each Sink Group holds a name, a list of Sinks, and a volume. Groups can be nested.

Expand Down
89 changes: 89 additions & 0 deletions src/audio/rtp_recevier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""Receiver, handles a port for listening for sources to send UDP packets to
Puts received data in sink queues"""
import multiprocessing
import os
import select
import socket
from ctypes import c_bool
from subprocess import TimeoutExpired
from typing import List
import rtp
from rtp.payloadType import PayloadType

from src.audio.scream_header_parser import create_stream_info
import src.constants.constants as constants
from src.screamrouter_logger.screamrouter_logger import get_logger
from src.utils.utils import close_all_pipes, set_process_name

logger = get_logger(__name__)
RTP = rtp.RTP()

class RTPReceiver(multiprocessing.Process):
"""Handles the main socket that listens for incoming Scream streams and sends them to sinks"""
def __init__(self, controller_write_fd_list: List[int]):
"""Receives UDP packets and sends them to known queue lists"""
super().__init__(name="RTP Receiver Thread")
self.sock: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
"""Main socket all sources send to"""
self.controller_write_fd_list: List[int] = controller_write_fd_list
"""List of all sink queues to forward data to"""
self.running = multiprocessing.Value(c_bool, True)
"""Multiprocessing-passed flag to determine if the thread is running"""
self.known_ips = multiprocessing.Manager().list()
if len(controller_write_fd_list) == 0: # Will be zero if this is just a placeholder.
self.running.value = c_bool(False)
return
self.start()

def stop(self) -> None:
"""Stops the Receiver and all sinks"""
logger.info("[RTP Receiver] Stopping")
was_running: bool = bool(self.running.value)
self.running.value = c_bool(False)
if constants.KILL_AT_CLOSE:
try:
self.kill()
except AttributeError:
pass
if constants.WAIT_FOR_CLOSES and was_running:
try:
self.join(5)
except TimeoutExpired:
logger.warning("RTP Receiver failed to close")

def run(self) -> None:
"""This thread listens for traffic from all sources and sends it to sinks"""
set_process_name("RTPRecvr", "RTP Receiver Thread")
logger.debug("[RTP Receiver] Receiver Thread PID %s", os.getpid())
logger.info("[RTP Receiver] Receiver started on port %s", constants.RTP_RECEIVER_PORT)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,
constants.PACKET_SIZE * 1024 * 1024)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind(("", constants.RTP_RECEIVER_PORT))
mono_header = create_stream_info(16, 44100, 1, "mono")
stereo_header = create_stream_info(16, 44100, 2, "stereo")

while self.running.value:
ready = select.select([self.sock], [], [], .3)
if ready[0]:
data, addr = self.sock.recvfrom(constants.PACKET_SIZE + 500)
rtp_packet = RTP.fromBytearray(bytearray(data))
if not rtp_packet.payloadType in [PayloadType.L16_1chan,
PayloadType.L16_2chan,
PayloadType.DYNAMIC_127]:
logger.warning("Can only decode 16-bit LPCM, unsupported type %s from %s:%s",
rtp_packet.payloadType, addr[0], addr[1])
continue
if addr[0] not in self.known_ips:
self.known_ips.append(addr[0])
padded_tag: bytes
padded_tag = bytes(addr[0].encode("ascii")) + bytes([0] * (constants.TAG_MAX_LENGTH - len(addr[0])))
header: bytes = mono_header.header
if rtp_packet.payloadType in [PayloadType.L16_2chan,
PayloadType.DYNAMIC_127]:
header = stereo_header.header
for controller_write_fd in self.controller_write_fd_list:
os.write(controller_write_fd, padded_tag + header + rtp_packet.payload)

logger.info("[RTP Receiver] Main thread stopped")
close_all_pipes()
20 changes: 11 additions & 9 deletions src/audio/udp_receiver.py → src/audio/scream_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,26 @@

logger = get_logger(__name__)

class UDPReceiver(multiprocessing.Process):
class ScreamReceiver(multiprocessing.Process):
"""Handles the main socket that listens for incoming Scream streams and sends them to sinks"""
def __init__(self, controller_write_fd_list: List[int]):
"""Receives UDP packets and sends them to known queue lists"""
super().__init__(name="Receiver Thread")
super().__init__(name="Scream Receiver Thread")
self.sock: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
"""Main socket all sources send to"""
self.controller_write_fd_list: List[int] = controller_write_fd_list
"""List of all sink queues to forward data to"""
self.running = multiprocessing.Value(c_bool, True)
"""Multiprocessing-passed flag to determine if the thread is running"""
self.known_ips = multiprocessing.Manager().list()
if len(controller_write_fd_list) == 0: # Will be zero if this is just a placeholder.
self.running.value = c_bool(False)
return
self.start()

def stop(self) -> None:
"""Stops the Receiver and all sinks"""
logger.info("[Receiver] Stopping")
logger.info("[Scream Receiver] Stopping")
was_running: bool = bool(self.running.value)
self.running.value = c_bool(False)
if constants.KILL_AT_CLOSE:
Expand All @@ -48,22 +49,23 @@ def stop(self) -> None:

def run(self) -> None:
"""This thread listens for traffic from all sources and sends it to sinks"""
set_process_name("Receiver", "Receiver Thread")
logger.debug("[Receiver] Receiver Thread PID %s", os.getpid())
logger.info("[Receiver] Receiver started on port %s", constants.RECEIVER_PORT)
set_process_name("ScreamReceiver", "Scream Receiver Thread")
logger.debug("[Scream Receiver] Receiver Thread PID %s", os.getpid())
logger.info("[Scream Receiver] Receiver started on port %s", constants.SCREAM_RECEIVER_PORT)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,
constants.PACKET_SIZE * 1024 * 1024)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind(("", constants.RECEIVER_PORT))
self.sock.bind(("", constants.SCREAM_RECEIVER_PORT))

while self.running.value:
ready = select.select([self.sock], [], [], .3)
if ready[0]:
data, addr = self.sock.recvfrom(constants.PACKET_SIZE)
addrlen = len(addr[0])

if addr[0] not in self.known_ips:
self.known_ips.append(addr[0])
for controller_write_fd in self.controller_write_fd_list:
os.write(controller_write_fd,
bytes(addr[0].encode("ascii") + bytes([0] * (constants.TAG_MAX_LENGTH - addrlen)) + data))
logger.info("[Receiver] Main thread stopped")
logger.info("[Scream Receiver] Main thread stopped")
close_all_pipes()
28 changes: 23 additions & 5 deletions src/audio/sink_output_mixer.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(self, sink_ip: IPAddressType,
"""Output socket for sink"""
self._sink_port: PortType = sink_port
self.__output_header: bytes = output_info.header
self.__output_info: ScreamHeader = output_info
"""Holds the header added onto packets sent to Scream receivers"""
self._sink_ip = sink_ip
self.read_fds: List[int] = []
Expand Down Expand Up @@ -72,6 +73,7 @@ def run(self) -> None:
logger.debug("[Sink %s] Mixer Thread PID %s", self._sink_ip, os.getpid())
self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, constants.PACKET_SIZE * 65535)
output_data = numpy.array(constants.PACKET_DATA_SIZE, numpy.int64)
output_buffer = numpy.array([], numpy.int8)
while self.running.value:
first_value: bool = True
for index, fd in enumerate(self.read_fds):
Expand All @@ -90,11 +92,27 @@ def run(self) -> None:
if first_value: # No active source, sleep for 100ms to not burn CPU.
time.sleep(.1)
else:
output_data = numpy.clip(output_data, -2147483648, 2147483647)
self.__sock.sendto(
self.__output_header + numpy.int32(output_data).tobytes(),
(str(self._sink_ip), int(self._sink_port)))
os.write(self.ffmpeg_mp3_write_fd, numpy.int32(output_data).tobytes())
os.write(
self.ffmpeg_mp3_write_fd,
numpy.array(output_data, numpy.int32).tobytes())
if self.__output_info.bit_depth == 32:
output_data = numpy.array(output_data, numpy.uint32)
output_buffer = numpy.insert(
numpy.frombuffer(output_data, numpy.uint8), 0, output_buffer)
elif self.__output_info.bit_depth == 24:
output_data = numpy.array(output_data, numpy.uint32)
output_data = numpy.frombuffer(output_data, numpy.uint8)
output_data = numpy.delete(output_data, range(0, len(output_data), 4))
output_buffer = numpy.insert(output_data, 0, output_buffer)
elif self.__output_info.bit_depth == 16:
output_data = numpy.array(output_data, numpy.uint16)
output_buffer = numpy.insert(
numpy.frombuffer(output_data, numpy.uint8), 0, output_buffer)
if len(output_buffer) >= constants.PACKET_DATA_SIZE:
self.__sock.sendto(
self.__output_header + output_buffer[:constants.PACKET_DATA_SIZE].tobytes(),
(str(self._sink_ip), int(self._sink_port)))
output_buffer = output_buffer[constants.PACKET_DATA_SIZE:]
logger.debug("[Sink:%s] Mixer thread exit", self._sink_ip)
close_all_pipes()

Expand Down
5 changes: 3 additions & 2 deletions src/audio/source_ffmpeg_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(self, tag,
def __get_ffmpeg_inputs(self) -> List[str]:
"""Add an input for each source"""
ffmpeg_command: List[str] = []
bit_depth = self.__source_stream_attributes.bit_depth
sample_rate = self.__source_stream_attributes.sample_rate
channels = self.__source_stream_attributes.channels
channel_layout = self.__source_stream_attributes.channel_layout
Expand All @@ -60,7 +61,7 @@ def __get_ffmpeg_inputs(self) -> List[str]:
"-fflags", "nobuffer",
"-thread_queue_size", "128",
"-channel_layout", f"{channel_layout}",
"-f", "s32le",
"-f", f"s{bit_depth}le",
"-ac", f"{channels}",
"-ar", f"{sample_rate}",
"-i", f"pipe:{self.__ffmpeg_input_pipe}"])
Expand Down Expand Up @@ -110,7 +111,7 @@ def __get_ffmpeg_output(self) -> List[str]:
"-fflags", "nobuffer",
"-avioflags", "direct",
"-y",
"-f", f"s{self.__sink_info.bit_depth}le",
"-f", "s32le",
"-ac", f"{self.__sink_info.channels}",
"-ar", f"{self.__sink_info.sample_rate}",
f"pipe:{self.__ffmpeg_output_pipe}"]) # ffmpeg PCM output
Expand Down
40 changes: 12 additions & 28 deletions src/audio/source_input_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,54 +187,38 @@ def run(self) -> None:
if ready[0]:
data: bytes = os.read(self.writer_read, constants.PACKET_SIZE)
self.update_source_attributes_and_open_source(data[:constants.PACKET_HEADER_SIZE])
if self.stream_attributes.bit_depth == 16:
if self.using_ffmpeg:
os.write(self.ffmpeg_write, data[constants.PACKET_HEADER_SIZE:])
elif self.stream_attributes.bit_depth == 16:
pcm_data = numpy.frombuffer(data[constants.PACKET_HEADER_SIZE:], numpy.int16)
pcm_data = numpy.array(pcm_data * self.source_info.volume, numpy.int32)
pcm_data = numpy.left_shift(pcm_data, 16)
# Two 32-bit packets per upscaled 16-bit packet
pcm_data = self.equalizer(pcm_data)
if self.using_ffmpeg:
os.write(self.ffmpeg_write,
pcm_data[:constants.PACKET_DATA_SIZE_INT32].tobytes())
os.write(self.ffmpeg_write,
pcm_data[constants.PACKET_DATA_SIZE_INT32:].tobytes())
else:
os.write(self.source_input_fd,
pcm_data[:constants.PACKET_DATA_SIZE_INT32].tobytes())
os.write(self.source_input_fd,
pcm_data[constants.PACKET_DATA_SIZE_INT32:].tobytes())
os.write(self.source_input_fd,
pcm_data[:constants.PACKET_DATA_SIZE_INT32].tobytes())
os.write(self.source_input_fd,
pcm_data[constants.PACKET_DATA_SIZE_INT32:].tobytes())
elif self.stream_attributes.bit_depth == 24:
# Pad 24-bit to make it 32-bit
pcm_data = numpy.frombuffer(data[constants.PACKET_HEADER_SIZE:], numpy.int8)
pcm_data = numpy.insert(pcm_data, range(0, len(pcm_data), 3), 0)
pcm_data = numpy.frombuffer(pcm_data, numpy.int32)
pcm_data = numpy.array(pcm_data * self.source_info.volume, numpy.int32)
pcm_data = numpy.insert(pcm_data, 0, samples_left_over)
if self.using_ffmpeg:
os.write(self.ffmpeg_write,
os.write(self.source_input_fd,
pcm_data[:constants.PACKET_DATA_SIZE_INT32].tobytes())
else:
os.write(self.source_input_fd,
pcm_data[:constants.PACKET_DATA_SIZE_INT32].tobytes())
if len(pcm_data) >= (constants.PACKET_DATA_SIZE_INT32 * 2):
if self.using_ffmpeg:
os.write(self.ffmpeg_write,
pcm_data[constants.PACKET_DATA_SIZE_INT32:
(constants.PACKET_DATA_SIZE_INT32*2)].tobytes())
else:
os.write(self.source_input_fd,
pcm_data[constants.PACKET_DATA_SIZE_INT32:
(constants.PACKET_DATA_SIZE_INT32*2)].tobytes())
os.write(self.source_input_fd,
pcm_data[constants.PACKET_DATA_SIZE_INT32:
(constants.PACKET_DATA_SIZE_INT32*2)].tobytes())
samples_left_over = pcm_data[(constants.PACKET_DATA_SIZE_INT32 * 2):]
else:
samples_left_over = pcm_data[constants.PACKET_DATA_SIZE_INT32:]
elif self.stream_attributes.bit_depth == 32:
pcm_data = numpy.frombuffer(data[constants.PACKET_HEADER_SIZE:], numpy.int32)
pcm_data = numpy.array(pcm_data * self.source_info.volume, numpy.int32)
if self.using_ffmpeg:
os.write(self.ffmpeg_write, pcm_data.tobytes())
else:
os.write(self.source_input_fd, pcm_data.tobytes())
os.write(self.source_input_fd, pcm_data.tobytes())
self.update_activity()
if not self.ffmpeg_handler is None:
self.ffmpeg_handler.stop()
Expand Down
Loading

0 comments on commit f66bbbb

Please sign in to comment.