Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature #4151551: TFS plugin refactor streamer.py file #278

Merged
merged 20 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 3 additions & 32 deletions plugins/fluentd_telemetry_plugin/src/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
# pylint: disable=no-name-in-module,import-error
from utils.utils import Utils
from utils.args_parser import ArgsParser
from utils.logger import Logger, LOG_LEVELS
from utils.logger import Logger
from utils.singleton import Singleton

#pylint: disable=too-many-instance-attributes
Expand All @@ -54,7 +54,8 @@ def __init__(self, conf_parser):
self.telem_parser = TelemetryParser(self.config_parser, self.streaming_metrics_mgr,
self.last_streamed_data_sample_per_endpoint,
self.attributes_mngr)
self.init_streaming_attributes()
ananalaghbar marked this conversation as resolved.
Show resolved Hide resolved
self.attributes_mngr.init_streaming_attributes(self.telem_parser,
self.ufm_telemetry_endpoints, self.config_parser)

@property
def ufm_telemetry_host(self):
Expand Down Expand Up @@ -162,36 +163,6 @@ def fluent_sender(self):
use_c=_use_c)
return self._fluent_sender

def init_streaming_attributes(self): # pylint: disable=too-many-locals
Logger.log_message('Updating The streaming attributes', LOG_LEVELS.DEBUG)
# load the saved attributes
self.attributes_mngr.get_saved_streaming_attributes()
telemetry_endpoints = self.ufm_telemetry_endpoints
processed_endpoints = {}
for endpoint in telemetry_endpoints: # pylint: disable=too-many-nested-blocks
_host = endpoint.get(self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_HOST)
_port = endpoint.get(self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_PORT)
_url = endpoint.get(self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_URL)
_msg_tag = endpoint.get(self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_MSG_TAG_NAME)
# the ID of the endpoint is the full URL without filters like the shading,etc...
endpoint_id = f'{_host}:{_port}:{_url.split("?")[0]}'
is_processed = processed_endpoints.get(endpoint_id)
if not is_processed:
telemetry_data = self.telem_parser.get_metrics(_host, _port, _url, _msg_tag)
if telemetry_data:

# CSV format
rows = telemetry_data.split("\n")
if len(rows):
headers = rows[0].split(",")
for attribute in headers:
self.attributes_mngr.add_streaming_attribute(attribute)

processed_endpoints[endpoint_id] = True
# update the streaming attributes files
self.attributes_mngr.update_saved_streaming_attributes()
Logger.log_message('The streaming attributes were updated successfully')


def _stream_data_to_fluentd(self, data_to_stream, fluentd_msg_tag=''):
logging.info('Streaming to Fluentd IP: %s port: %s timeout: %s',
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# pylint: disable=no-name-in-module,import-error
import logging

# pylint: disable=no-name-in-module,import-error
from utils.config_parser import ConfigParser

class UFMTelemetryStreamingConfigParser(ConfigParser):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
@author: Miryam Schwartz
@date: Nov 13, 2024
"""
# pylint: disable=no-name-in-module,import-error
import os

# pylint: disable=no-name-in-module,import-error
from utils.utils import Utils
from utils.logger import Logger, LOG_LEVELS

class TelemetryAttributesManager:
""""
Expand All @@ -27,7 +29,7 @@ def __init__(self):
self.streaming_attributes_file = "/config/tfs_streaming_attributes.json" # this path on the docker
self.streaming_attributes = {}

def get_saved_streaming_attributes(self):
def _get_saved_streaming_attributes(self):
attr = {}
if os.path.exists(self.streaming_attributes_file):
attr = Utils.read_json_from_file(self.streaming_attributes_file)
Expand All @@ -37,7 +39,7 @@ def get_saved_streaming_attributes(self):
def update_saved_streaming_attributes(self):
Utils.write_json_to_file(self.streaming_attributes_file, self.streaming_attributes)

def add_streaming_attribute(self, attribute):
def _add_streaming_attribute(self, attribute):
if self.streaming_attributes.get(attribute, None) is None:
# if the attribute is new and wasn't set before --> set default values for the new attribute
self.streaming_attributes[attribute] = {
Expand All @@ -47,3 +49,33 @@ def add_streaming_attribute(self, attribute):

def get_attr_obj(self, key):
return self.streaming_attributes.get(key)


def init_streaming_attributes(self, telemetry_parser, telemetry_endpoints, config_parser): # pylint: disable=too-many-locals
Logger.log_message('Updating The streaming attributes', LOG_LEVELS.DEBUG)
# load the saved attributes
self._get_saved_streaming_attributes()
processed_endpoints = {}
for endpoint in telemetry_endpoints: # pylint: disable=too-many-nested-blocks
_host = endpoint.get(config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_HOST)
_port = endpoint.get(config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_PORT)
_url = endpoint.get(config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_URL)
_msg_tag = endpoint.get(config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_MSG_TAG_NAME)
# the ID of the endpoint is the full URL without filters like the shading,etc...
endpoint_id = f'{_host}:{_port}:{_url.split("?")[0]}'
is_processed = processed_endpoints.get(endpoint_id)
if not is_processed:
telemetry_data = telemetry_parser.get_metrics(_host, _port, _url, _msg_tag)
if telemetry_data:

# CSV format
rows = telemetry_data.split("\n")
if len(rows):
headers = rows[0].split(",")
Miryam-Schwartz marked this conversation as resolved.
Show resolved Hide resolved
for attribute in headers:
self._add_streaming_attribute(attribute)

processed_endpoints[endpoint_id] = True
# update the streaming attributes files
self.update_saved_streaming_attributes()
Logger.log_message('The streaming attributes were updated successfully')
57 changes: 0 additions & 57 deletions plugins/fluentd_telemetry_plugin/src/telemetry_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,62 +18,5 @@ class UFMTelemetryConstants:
"""UFMTelemetryConstants Class"""

PLUGIN_NAME = "UFM_Telemetry_Streaming"

args_list = [
{
"name": '--ufm_telemetry_host',
"help": "Host or IP of UFM Telemetry endpoint"
},{
"name": '--ufm_telemetry_port',
"help": "Port of UFM Telemetry endpoint"
},{
"name": '--ufm_telemetry_url',
"help": "URL of UFM Telemetry endpoint"
},{
"name": '--ufm_telemetry_xdr_mode',
"help": "Telemetry XDR mode flag, "
"i.e., if True, the enabled ports types in `xdr_ports_types` "
"will be collected from the telemetry and streamed to fluentd"
},{
"name": '--ufm_telemetry_xdr_ports_types',
"help": "Telemetry XDR ports types, "
"i.e., List of XDR ports types that should be collected and streamed, "
"separated by `;`. For example legacy;aggregated;plane"
},{
"name": '--streaming_interval',
"help": "Interval for telemetry streaming in seconds"
},{
"name": '--bulk_streaming',
"help": "Bulk streaming flag, i.e. if True all telemetry rows will be streamed in one message; "
"otherwise, each row will be streamed in a separated message"
},{
"name": '--compressed_streaming',
"help": "Compressed streaming flag, i.e. if True the streamed data will be sent gzipped json; "
"otherwise, will be sent plain text as json"
},{
"name": '--c_fluent_streamer',
"help": "C Fluent Streamer flag, i.e. if True the C fluent streamer will be used; "
"otherwise, the native python streamer will be used"
},{
"name": '--enable_streaming',
"help": "If true, the streaming will be started once the required configurations have been set"
},{
"name": '--stream_only_new_samples',
"help": "If True, the data will be streamed only in case new samples were pulled from the telemetry"
},{
"name": '--fluentd_host',
"help": "Host name or IP of fluentd endpoint"
},{
"name": '--fluentd_port',
"help": "Port of fluentd endpoint"
},{
"name": '--fluentd_timeout',
"help": "Fluentd timeout in seconds"
},{
"name": '--fluentd_message_tag_name',
"help": "Tag name of fluentd endpoint message"
}
]

CSV_LINE_SEPARATOR = "\n"
CSV_ROW_ATTRS_SEPARATOR = ","
Loading