Skip to content

Commit

Permalink
Rudimentary/Basic self_video incoming RTP support
Browse files Browse the repository at this point in the history
-disco.types.channel: add toggle to `Channel.connect()` for video packets;
- disco.voice.client: enable video reception and everything needed to make it happen;
- disco.voice.udp: everything needed to handle RTP video packets;
- requirements.txt: websocket-client (1.7.0 >> 1.8.0);
- setup.py: add `libnacl` as a voice requirement;

My life has become RFCs. Though useless as a raw RTP stream, it's a step closer to something useful.
  • Loading branch information
elderlabs committed Apr 24, 2024
1 parent c18845d commit 5cb4c5f
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 33 deletions.
4 changes: 2 additions & 2 deletions disco/types/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,15 +491,15 @@ def send_typing(self):
"""
self.client.api.channels_typing(self.id)

def connect(self, *args, **kwargs):
def connect(self, video_enabled=False, *args, **kwargs):
"""
Connect to this channel over voice.
"""
from disco.voice.client import VoiceClient
assert self.is_voice, 'Cannot connect to a non-voice channel'

server_id = self.guild_id or self.id
vc = self.client.state.voice_clients.get(server_id) or VoiceClient(self.client, server_id, is_dm=self.is_dm)
vc = self.client.state.voice_clients.get(server_id) or VoiceClient(self.client, server_id, is_dm=self.is_dm, video_enabled=video_enabled)

return vc.connect(self.id, *args, **kwargs)

Expand Down
130 changes: 116 additions & 14 deletions disco/voice/client.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
from gevent import sleep as gevent_sleep, spawn as gevent_spawn
from time import perf_counter as time_perf_counter, time

from collections import namedtuple as collections_namedtuple
from collections import namedtuple as namedtuple
from websocket import WebSocketConnectionClosedException, WebSocketTimeoutException

from disco.gateway.encoding.json import JSONEncoder
from disco.gateway.encoding import ENCODERS
from disco.gateway.packets import OPCode
from disco.types.base import cached_property
from disco.util.emitter import Emitter
from disco.util.logging import LoggingClass
from disco.util.websocket import Websocket
from disco.voice.packets import VoiceOPCode
from disco.voice.udp import AudioCodecs, RTPPayloadTypes, UDPVoiceClient
from disco.voice.udp import AudioCodecs, RTPPayloadTypes, UDPVoiceClient, VideoCodecs


class SpeakingFlags:
Expand All @@ -36,7 +36,7 @@ class VoiceState:
AUTHENTICATED = 'AUTHENTICATED'


VoiceSpeaking = collections_namedtuple('VoiceSpeaking', [
VoiceSpeaking = namedtuple('VoiceSpeaking', [
'client',
'user_id',
'speaking',
Expand All @@ -45,6 +45,22 @@ class VoiceState:
])


VideoStream = namedtuple('VideoStream', [
'client',
'user_id',
'streams',
'video_ssrc',
'audio_ssrc',
])


VoiceUser = namedtuple('VoiceUser', [
'user_id',
'flags',
'platform',
])


class VoiceException(Exception):
def __init__(self, msg, client):
self.voice_client = client
Expand All @@ -64,16 +80,16 @@ class VoiceClient(LoggingClass):
'xsalsa20_poly1305',
}

def __init__(self, client, server_id, is_dm=False, max_reconnects=5, encoder='json'):
def __init__(self, client, server_id, is_dm=False, max_reconnects=5, encoder='json', video_enabled=False):
super(VoiceClient, self).__init__()

self.client = client
self.server_id = server_id
self.channel_id = None
self.is_dm = is_dm
self.encoder = JSONEncoder
self.encoder = ENCODERS[encoder] # Discord's erlpack doesn't seem supported here
self.max_reconnects = max_reconnects
self.video_enabled = False
self.video_enabled = video_enabled
self.media = None

self.proxy = None
Expand All @@ -92,6 +108,8 @@ def __init__(self, client, server_id, is_dm=False, max_reconnects=5, encoder='js
self.packets.on(VoiceOPCode.RESUMED, self.on_voice_resumed)
self.packets.on(VoiceOPCode.CLIENT_DISCONNECT, self.on_voice_client_disconnect)
self.packets.on(VoiceOPCode.CODECS, self.on_voice_codecs)
if self.video_enabled:
self.packets.on(VoiceOPCode.VIDEO, self.on_video)

# State + state change emitter
self.state = VoiceState.DISCONNECTED
Expand All @@ -105,13 +123,14 @@ def __init__(self, client, server_id, is_dm=False, max_reconnects=5, encoder='js
self.port = None
self.enc_modes = None
self.experiments = None
self.streams = None
# self.streams = None
self.sdp = None
self.mode = None
self.udp = None
self.audio_codec = None
self.video_codec = None
self.transport_id = None
self.secure_frames_version = None

# Websocket connection
self.ws = None
Expand All @@ -129,6 +148,8 @@ def __init__(self, client, server_id, is_dm=False, max_reconnects=5, encoder='js

# SSRCs
self.audio_ssrcs = {}
self.video_ssrcs = {}
self.rtx_ssrcs = {}

self.deaf = False
self.mute = False
Expand Down Expand Up @@ -219,7 +240,7 @@ def heartbeat_task(self, interval):
gevent_sleep(interval / 1000)

def handle_heartbeat(self, _):
self.send(VoiceOPCode.HEARTBEAT, time())
self.send(VoiceOPCode.HEARTBEAT, int(time()))

def handle_heartbeat_acknowledge(self, _):
self.log.debug('[{}] Received WS HEARTBEAT_ACK'.format(self.channel_id))
Expand Down Expand Up @@ -266,11 +287,20 @@ def send(self, op, data):
self.log.debug('[{}] dropping because WS is closed OP {} (data = {})'.format(self.channel_id, op, data))

def on_voice_client_disconnect(self, data):
user_id = int(data['user_id'])
for ssrc in self.audio_ssrcs.keys():
if self.audio_ssrcs[ssrc] == int(data['user_id']):
if self.audio_ssrcs[ssrc] == user_id:
del self.audio_ssrcs[ssrc]
break

payload = VoiceUser(
user_id=user_id,
flags=None,
platform=None,
)

self.client.events.emit('VoiceUserLeave', payload)

def on_voice_codecs(self, data):
self.audio_codec = data['audio_codec']
self.video_codec = data['video_codec']
Expand All @@ -290,11 +320,13 @@ def on_voice_ready(self, data):
self.set_state(VoiceState.CONNECTING)
self.ssrc = data['ssrc']
self.audio_ssrcs[self.ssrc] = self.client.state.me.id
if self.video_enabled:
self.video_ssrcs[self.ssrc + 1] = self.client.state.me.id
self.rtx_ssrcs[self.ssrc + 2] = self.client.state.me.id
self.ip = data['ip']
self.port = data['port']
self.enc_modes = data['modes']
self.experiments = data['experiments']
self.streams = data['streams']
self._identified = True

for mode in self.enc_modes:
Expand All @@ -320,11 +352,25 @@ def on_voice_ready(self, data):
for idx, codec in enumerate(AudioCodecs):
codecs.append({
'name': codec,
'type': 'audio',
'priority': (idx + 1) * 1000,
'payload_type': RTPPayloadTypes.get(codec).value,
'priority': idx,
'type': 'audio',
})

if self.video_enabled:
for idx, codec in enumerate(VideoCodecs):
ptype = RTPPayloadTypes.get(codec.lower())
if ptype:
codecs.append({
'decode': True,
'encode': False,
'name': codec,
'payload_type': ptype.value,
'priority': idx,
'rtxPayloadType': ptype.value + 1,
'type': 'video',
})

self.log.debug('[{}] IP discovery completed ({}:{}), sending SELECT_PROTOCOL'.format(self.channel_id, ip, port))
self.send(VoiceOPCode.SELECT_PROTOCOL, {
'protocol': 'udp',
Expand Down Expand Up @@ -354,13 +400,17 @@ def on_voice_sdp(self, sdp):

self.mode = sdp['mode'] # UDP-only, does not apply to webRTC
self.audio_codec = sdp['audio_codec']
self.video_codec = sdp['video_codec']
if self.video_enabled:
self.video_codec = sdp['video_codec']
self.transport_id = sdp['media_session_id'] # analytics
self.secure_frames_version = sdp['secure_frames_version']
# self.sdp = sdp['sdp'] # webRTC only
# self.keyframe_interval = sdp['keyframe_interval']

# Set the UDP's RTP Audio Header's Payload Type
self.udp.set_audio_codec(sdp['audio_codec'])
if self.video_enabled:
self.udp.set_video_codec(sdp['video_codec'])

# Create a secret box for encryption/decryption
self.udp.setup_encryption(bytes(bytearray(sdp['secret_key']))) # UDP only
Expand Down Expand Up @@ -394,6 +444,58 @@ def on_voice_speaking(self, data):

self.client.events.emit('VoiceSpeaking', payload)

def on_client_connect(self, data):
user_id = int(data['user_id'])

payload = VoiceUser(
user_id=user_id,
flags=data['flags'],
platform=None,
)

self.client.events.emit('VoiceUserJoin', payload)

def on_platform(self, data):
user_id = int(data['user_id'])

payload = VoiceUser(
user_id=user_id,
flags=None,
platform=data['platform'],
)

self.client.events.emit('VoiceUserPlatform', payload)

def on_video(self, data):
user_id = int(data['user_id'])
video_ssrc = data['video_ssrc']

if video_ssrc:
self.video_ssrcs[data['video_ssrc']] = user_id
self.rtx_ssrcs[video_ssrc + 1] = user_id
else:
for ssrc, uid in self.video_ssrcs.items():
if uid == user_id:
del self.video_ssrcs[ssrc]
break
for ssrc, uid in self.rtx_ssrcs.items():
if uid == user_id:
del self.rtx_ssrcs[ssrc]
break

payload = VideoStream(
client=self,
user_id=user_id,
streams=data['streams'],
video_ssrc=video_ssrc,
audio_ssrc=data['audio_ssrc'],
)

if data['video_ssrc']:
self.client.events.emit('VideoStreamStart', payload)
else:
self.client.events.emit('VideoStreamEnd', payload)

def on_message(self, msg):
try:
data = self.encoder.decode(msg)
Expand Down
72 changes: 57 additions & 15 deletions disco/voice/udp.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,16 @@
from disco.util.logging import LoggingClass

AudioCodecs = ('opus',)

RTPPayloadTypes = Enum(OPUS=0x78)
VideoCodecs = ('AV1X', 'H265', 'H264', 'VP8', 'VP9')

RTPPayloadTypes = Enum(
OPUS=0x78, # 120
# AV1X=0x, # ?
H265=0x65, # 101
H264=0x67, # 103
VP8=0x69, # 105
VP9=0x71, # 107
)

RTCPPayloadTypes = Enum(
SENDER_REPORT=200,
Expand Down Expand Up @@ -71,6 +79,15 @@
'data',
])

VideoData = namedtuple('VideoData', [
'client',
'user_id',
'payload_type',
'rtp',
'nonce',
'data',
])


class UDPVoiceClient(LoggingClass):
def __init__(self, vc):
Expand All @@ -95,15 +112,25 @@ def __init__(self, vc):

# RTP Header
self._rtp_audio_header = bytearray(12)
self._rtp_video_header = bytearray(12)
self._rtp_audio_header[0] = RTP_HEADER_VERSION
self._rtp_video_header[0] = RTP_HEADER_VERSION

def set_audio_codec(self, codec):
if codec not in AudioCodecs:
raise Exception('Unsupported audio codec received, {}'.format(codec))

ptype = RTPPayloadTypes.get(codec)
self._rtp_audio_header[1] = ptype.value
self.log.debug('[{}] Set UDP\'s Audio Codec to {}, RTP payload type {}'.format(self.vc.channel_id, ptype.name, ptype.value))
self.log.debug('[{}] Set UDP\'s Audio Codec to {}, RTP payload type {}'.format(self.vc.channel_id, ptype.name.upper(), ptype.value))

def set_video_codec(self, codec):
if codec not in VideoCodecs:
raise Exception(f'Unsupported video codec received, {codec}')

ptype = RTPPayloadTypes.get(codec.lower())
self._rtp_video_header[1] = ptype.value
self.log.debug('[{}] Set UDP\'s Video Codec to {}, RTP payload type {}'.format(self.vc.channel_id, ptype.name.upper(), ptype.value))

def increment_timestamp(self, by):
self.timestamp += by
Expand Down Expand Up @@ -170,7 +197,7 @@ def send_frame(self, frame, sequence=None, timestamp=None, incr_timestamp=None):

def run(self):
while True:
data, addr = self.conn.recvfrom(4096)
data, addr = self.conn.recvfrom(1500) # Max RTP packet length

# Data cannot be less than the bare minimum, just ignore
if len(data) <= 12:
Expand Down Expand Up @@ -311,20 +338,35 @@ def run(self):

# RFC3550 Section 5.3: Profile-Specific Modifications to the RTP Header
# clients send it sometimes, definitely on fresh connects to a server, dunno what to do here
if rtp.marker:
# RFC6184: Marker bits are used to signify the last packet of a frame
if rtp.marker and payload_type.name == 'opus':
self.log.debug('[{}] [VoiceData] Received RTP data with the marker set, skipping'.format(self.vc.channel_id))
continue

payload = VoiceData(
client=self.vc,
user_id=self.vc.audio_ssrcs.get(rtp.ssrc),
payload_type=payload_type.name,
rtp=rtp,
nonce=nonce,
data=data,
)

self.vc.client.events.emit('VoiceData', payload)
if payload_type.name == 'opus':
payload = VoiceData(
client=self.vc,
user_id=self.vc.audio_ssrcs.get(rtp.ssrc),
payload_type=payload_type.name,
rtp=rtp,
nonce=nonce,
data=data,
)

# Raw RTP stream data, still needs conversion to be useful
self.vc.client.events.emit('VoiceData', payload)
else:
payload = VideoData(
client=self.vc,
user_id=self.vc.video_ssrcs.get(rtp.ssrc),
payload_type=payload_type.name,
rtp=rtp,
nonce=nonce,
data=data,
)

# Raw RTP stream data, still needs conversion to be useful
self.vc.client.events.emit('VideoData', payload)

def send(self, data):
self.conn.sendto(data, (self.ip, self.port))
Expand Down
Loading

0 comments on commit 5cb4c5f

Please sign in to comment.