diff --git a/worker/include/RTC/DataProducer.hpp b/worker/include/RTC/DataProducer.hpp index 952cc0df9f..5aad94d6b7 100644 --- a/worker/include/RTC/DataProducer.hpp +++ b/worker/include/RTC/DataProducer.hpp @@ -8,6 +8,7 @@ #include "RTC/RTCP/Packet.hpp" #include "RTC/SctpDictionaries.hpp" #include "RTC/Shared.hpp" +#include "Utils.hpp" #include #include @@ -69,6 +70,8 @@ namespace RTC // Passed by argument. const std::string id; + Utils::Event Data; + private: // Passed by argument. RTC::Shared* shared{ nullptr }; diff --git a/worker/include/RTC/Plugin.hpp b/worker/include/RTC/Plugin.hpp new file mode 100644 index 0000000000..cc3cd23400 --- /dev/null +++ b/worker/include/RTC/Plugin.hpp @@ -0,0 +1,15 @@ +#ifndef MS_RTC_PLUGIN_HPP +#define MS_RTC_PLUGIN_HPP + +class Worker; + +namespace RTC +{ + class Plugin + { + public: + virtual ~Plugin() = default; + }; +} // namespace RTC + +#endif diff --git a/worker/include/RTC/Router.hpp b/worker/include/RTC/Router.hpp index 4ab7a8e6b2..9e22506381 100644 --- a/worker/include/RTC/Router.hpp +++ b/worker/include/RTC/Router.hpp @@ -15,6 +15,7 @@ #include "RTC/Shared.hpp" #include "RTC/Transport.hpp" #include "RTC/WebRtcServer.hpp" +#include "Utils.hpp" #include #include #include @@ -115,6 +116,8 @@ namespace RTC public: // Passed by argument. const std::string id; + Utils::Event TransportCreated; + Utils::Event TransportClosed; private: // Passed by argument. @@ -124,6 +127,9 @@ namespace RTC absl::flat_hash_map mapTransports; absl::flat_hash_map mapRtpObservers; // Others. + + // TODO: Replace with smart pointers + absl::flat_hash_map> mapProducerConsumers; absl::flat_hash_map mapConsumerProducer; absl::flat_hash_map> mapProducerRtpObservers; diff --git a/worker/include/RTC/Transport.hpp b/worker/include/RTC/Transport.hpp index 08450ae58a..7a51fe40a3 100644 --- a/worker/include/RTC/Transport.hpp +++ b/worker/include/RTC/Transport.hpp @@ -297,6 +297,15 @@ namespace RTC // Passed by argument. const std::string id; + Utils::Event ProducerCreated; + Utils::Event ProducerClosed; + Utils::Event ConsumerCreated; + Utils::Event ConsumerClosed; + Utils::Event DataProducerCreated; + Utils::Event DataProducerClosed; + Utils::Event DataConsumerCreated; + Utils::Event DataConsumerClosed; + protected: RTC::Shared* shared{ nullptr }; size_t maxMessageSize{ 262144u }; diff --git a/worker/include/Utils.hpp b/worker/include/Utils.hpp index d41ecb6551..3302e4c5fb 100644 --- a/worker/include/Utils.hpp +++ b/worker/include/Utils.hpp @@ -375,6 +375,32 @@ namespace Utils } } }; + + template + class Event + { + public: + // Typedef for the function signature. + typedef std::function Listener; + void operator()(Args... args) const + { + for (auto& listener : this->listeners) + { + listener(args...); + } + } + void operator+=(const Listener& listener) + { + this->listeners.push_back(listener); + } + void operator-=(const Listener& listener) + { + std::remove(listeners.begin(), listeners.end(), listener); + } + private: + std::vector listeners; + }; + } // namespace Utils #endif diff --git a/worker/include/Worker.hpp b/worker/include/Worker.hpp index 48d993bf7a..5421d9e803 100644 --- a/worker/include/Worker.hpp +++ b/worker/include/Worker.hpp @@ -7,13 +7,16 @@ #include "PayloadChannel/PayloadChannelNotification.hpp" #include "PayloadChannel/PayloadChannelRequest.hpp" #include "PayloadChannel/PayloadChannelSocket.hpp" +#include "RTC/Plugin.hpp" #include "RTC/Router.hpp" #include "RTC/Shared.hpp" #include "RTC/WebRtcServer.hpp" +#include "Utils.hpp" #include "handles/SignalsHandler.hpp" #include #include #include +#include using json = nlohmann::json; @@ -25,6 +28,8 @@ class Worker : public Channel::ChannelSocket::Listener, public: explicit Worker(Channel::ChannelSocket* channel, PayloadChannel::PayloadChannelSocket* payloadChannel); ~Worker(); + + void RunInContext(std::function func); private: void Close(); @@ -63,6 +68,10 @@ class Worker : public Channel::ChannelSocket::Listener, public: RTC::WebRtcServer* OnRouterNeedWebRtcServer(RTC::Router* router, std::string& webRtcServerId) override; +public: + Utils::Event RouterCreated; + Utils::Event RouterClosed; + private: // Passed by argument. Channel::ChannelSocket* channel{ nullptr }; @@ -72,6 +81,7 @@ class Worker : public Channel::ChannelSocket::Listener, RTC::Shared* shared{ nullptr }; absl::flat_hash_map mapWebRtcServers; absl::flat_hash_map mapRouters; + std::vector plugins; // Others. bool closed{ false }; }; diff --git a/worker/meson.build b/worker/meson.build index 9aaff0da55..0eb44f9664 100644 --- a/worker/meson.build +++ b/worker/meson.build @@ -165,6 +165,7 @@ common_sources = [ 'src/RTC/RTCP/XR.cpp', 'src/RTC/RTCP/XrDelaySinceLastRr.cpp', 'src/RTC/RTCP/XrReceiverReferenceTime.cpp', + 'plugins/DataPing/Plugin.cpp' ] cpp = meson.get_compiler('cpp') @@ -264,7 +265,7 @@ libmediasoup_worker = library( install_tag: 'libmediasoup-worker', dependencies: dependencies, sources: common_sources, - include_directories: include_directories('include'), + include_directories: include_directories('include', 'plugins'), cpp_args: cpp_args, link_whole: link_whole, ) @@ -276,7 +277,7 @@ executable( install_tag: 'mediasoup-worker', dependencies: dependencies, sources: common_sources + ['src/main.cpp'], - include_directories: include_directories('include'), + include_directories: include_directories('include', 'plugins'), cpp_args: cpp_args + ['-DMS_EXECUTABLE'], ) diff --git a/worker/plugins/DataPing/Plugin.cpp b/worker/plugins/DataPing/Plugin.cpp new file mode 100644 index 0000000000..f860d07921 --- /dev/null +++ b/worker/plugins/DataPing/Plugin.cpp @@ -0,0 +1,61 @@ +#include "Plugin.hpp" + +#include + +#include "Worker.hpp" +#include "RTC/Router.hpp" +#include "RTC/DataProducer.hpp" +#include "RTC/DataConsumer.hpp" + +DataPingPlugin::DataPingPlugin(Worker* worker) +{ + worker->RouterCreated += [&](RTC::Router* router) + { + std::cerr << "RouterCreated" << std::endl; + + router->TransportCreated += [&](RTC::Transport* transport) + { + std::cerr << "TransportCreated" << std::endl; + + transport->DataProducerCreated += [&](RTC::DataProducer* producer) + { + producer->Data += [&](uint32_t id, const uint8_t* data, size_t len) + { + auto isPing = len == 4 && std::string((const char*)data, len) == "ping"; + if (!isPing) + { + return; + } + for (auto [ consumerTransport, consumer ] : this->consumers) + { + if (consumerTransport == transport && + consumer->GetSctpStreamParameters().streamId == producer->GetSctpStreamParameters().streamId) + { + consumer->SendMessage(id, data, len); + } + } + // TBD: Implement support to stop processing of this message. + // return false; + }; + }; + + transport->DataConsumerCreated += [&](RTC::DataConsumer* dataConsumer) + { + this->consumers.push_back({ transport, dataConsumer }); + }; + + transport->DataConsumerClosed += [&](RTC::DataConsumer* dataConsumer) + { + for (auto it = this->consumers.begin(); it != this->consumers.end(); ++it) + { + if (it->dataConsumer == dataConsumer) + { + this->consumers.erase(it); + break; + } + } + }; + + }; + }; +} diff --git a/worker/plugins/DataPing/Plugin.hpp b/worker/plugins/DataPing/Plugin.hpp new file mode 100644 index 0000000000..b7c595d04c --- /dev/null +++ b/worker/plugins/DataPing/Plugin.hpp @@ -0,0 +1,23 @@ +#pragma once + +#include "RTC/Plugin.hpp" +#include "RTC/Transport.hpp" +#include "RTC/DataConsumer.hpp" + +#include + +class DataPingPlugin: public RTC::Plugin +{ +public: + DataPingPlugin(Worker* worker); + +private: + struct ConsumerInfo + { + RTC::Transport* transport; + RTC::DataConsumer* dataConsumer; + }; + std::vector consumers; +}; + +// RTC::Plugin* CreatePlugin(Worker* worker); diff --git a/worker/src/RTC/DataProducer.cpp b/worker/src/RTC/DataProducer.cpp index f0888c83ae..ad6ee86f0d 100644 --- a/worker/src/RTC/DataProducer.cpp +++ b/worker/src/RTC/DataProducer.cpp @@ -216,5 +216,7 @@ namespace RTC this->bytesReceived += len; this->listener->OnDataProducerMessageReceived(this, ppid, msg, len); + + this->Data(ppid, msg, len); } } // namespace RTC diff --git a/worker/src/RTC/Router.cpp b/worker/src/RTC/Router.cpp index 61b46a6dd8..04ccdb712c 100644 --- a/worker/src/RTC/Router.cpp +++ b/worker/src/RTC/Router.cpp @@ -41,6 +41,8 @@ namespace RTC auto* transport = kv.second; delete transport; + + TransportClosed(transport); } this->mapTransports.clear(); @@ -297,6 +299,8 @@ namespace RTC request->Accept(data); + TransportCreated(webRtcTransport); + break; } @@ -321,6 +325,8 @@ namespace RTC request->Accept(data); + TransportCreated(plainTransport); + break; } @@ -343,6 +349,8 @@ namespace RTC pipeTransport->FillJson(data); request->Accept(data); + + TransportCreated(pipeTransport); break; } @@ -367,6 +375,8 @@ namespace RTC directTransport->FillJson(data); request->Accept(data); + + TransportCreated(directTransport); break; } @@ -430,6 +440,8 @@ namespace RTC request->Accept(); + TransportClosed(transport); + break; } @@ -718,17 +730,22 @@ namespace RTC // Cloned ref-counted packet that RtpStreamSend will store for as long as // needed avoiding multiple allocations unless absolutely necessary. // Clone only happens if needed. - std::shared_ptr sharedPacket; + std::shared_ptr sharedPacket(packet->Clone()); for (auto* consumer : consumers) { - // Update MID RTP extension value. - const auto& mid = consumer->GetRtpParameters().mid; - - if (!mid.empty()) - packet->UpdateMid(mid); - - consumer->SendRtpPacket(packet, sharedPacket); + // Possible optimizations + // if consumer.worker == producer.worker + // group consumers by worker and call run per worker + consumer->RunInContext([consumer, sharedPacket]() mutable { + // Update MID RTP extension value. + const auto& mid = consumer->GetRtpParameters().mid; + + if (!mid.empty()) + sharedPacket->UpdateMid(mid); + + consumer->SendRtpPacket(sharedPacket.get(), sharedPacket); + }); } } diff --git a/worker/src/RTC/Transport.cpp b/worker/src/RTC/Transport.cpp index a99915b71c..e7df332658 100644 --- a/worker/src/RTC/Transport.cpp +++ b/worker/src/RTC/Transport.cpp @@ -190,6 +190,8 @@ namespace RTC auto* producer = kv.second; delete producer; + + ProducerClosed(producer); } this->mapProducers.clear(); @@ -199,6 +201,8 @@ namespace RTC auto* consumer = kv.second; delete consumer; + + ConsumerClosed(consumer); } this->mapConsumers.clear(); this->mapSsrcConsumer.clear(); @@ -895,6 +899,8 @@ namespace RTC } } + ProducerCreated(producer); + break; } @@ -1169,6 +1175,8 @@ namespace RTC consumer->TransportConnected(); } + ConsumerCreated(consumer); + break; } @@ -1266,6 +1274,8 @@ namespace RTC request->Accept(data); + DataProducerCreated(dataProducer); + break; } @@ -1375,6 +1385,8 @@ namespace RTC this->sctpAssociation->HandleDataConsumer(dataConsumer); } + DataConsumerCreated(dataConsumer); + break; } diff --git a/worker/src/Worker.cpp b/worker/src/Worker.cpp index 9914efb9c2..86bfe28ae4 100644 --- a/worker/src/Worker.cpp +++ b/worker/src/Worker.cpp @@ -10,6 +10,7 @@ #include "Settings.hpp" #include "Channel/ChannelNotifier.hpp" #include "PayloadChannel/PayloadChannelNotifier.hpp" +#include "DataPing/Plugin.hpp" /* Instance methods. */ @@ -47,6 +48,8 @@ Worker::Worker(::Channel::ChannelSocket* channel, PayloadChannel::PayloadChannel // Tell the Node process that we are running. this->shared->channelNotifier->Emit(Logger::pid, "running"); + this->plugins.push_back(new DataPingPlugin(this)); + MS_DEBUG_DEV("starting libuv loop"); DepLibUV::RunLoop(); MS_DEBUG_DEV("libuv loop ended"); @@ -78,6 +81,8 @@ void Worker::Close() auto* router = kv.second; delete router; + + RouterClosed(router); } this->mapRouters.clear(); @@ -394,6 +399,8 @@ inline void Worker::HandleRequest(Channel::ChannelRequest* request) request->Accept(); + RouterCreated(router); + break; } @@ -419,6 +426,8 @@ inline void Worker::HandleRequest(Channel::ChannelRequest* request) request->Accept(); + RouterClosed(router); + break; }