Skip to content

Commit

Permalink
enable USE_FLOW_CONTROL on aggregator, rewriter and relay
Browse files Browse the repository at this point in the history
  • Loading branch information
bucko909 committed Aug 21, 2024
1 parent 819bff7 commit 4e105a0
Showing 1 changed file with 12 additions and 0 deletions.
12 changes: 12 additions & 0 deletions lib/carbon/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,21 @@ def setupAggregatorProcessor(root_service, settings):
"aggregation processor: file does not exist {0}".format(aggregation_rules_path))
RuleManager.read_from(aggregation_rules_path)

if settings.USE_FLOW_CONTROL:
events.cacheFull.addHandler(events.pauseReceivingMetrics)
events.cacheSpaceAvailable.addHandler(events.resumeReceivingMetrics)


def setupRewriterProcessor(root_service, settings):
from carbon.rewrite import RewriteRuleManager

rewrite_rules_path = settings["rewrite-rules"]
RewriteRuleManager.read_from(rewrite_rules_path)

if settings.USE_FLOW_CONTROL:
events.cacheFull.addHandler(events.pauseReceivingMetrics)
events.cacheSpaceAvailable.addHandler(events.resumeReceivingMetrics)


def setupRelayProcessor(root_service, settings):
from carbon.routers import DatapointRouter
Expand All @@ -191,6 +199,10 @@ def setupRelayProcessor(root_service, settings):
for destination in util.parseDestinations(settings.DESTINATIONS):
state.client_manager.startClient(destination)

if settings.USE_FLOW_CONTROL:
events.cacheFull.addHandler(events.pauseReceivingMetrics)
events.cacheSpaceAvailable.addHandler(events.resumeReceivingMetrics)


def setupWriterProcessor(root_service, settings):
from carbon import cache # NOQA Register CacheFeedingProcessor
Expand Down

0 comments on commit 4e105a0

Please sign in to comment.