Skip to content
This repository has been archived by the owner on Oct 5, 2021. It is now read-only.

Commit

Permalink
PCA - Message broker Ha callback implementation, agent graceful termi…
Browse files Browse the repository at this point in the history
…nation and test improvements
  • Loading branch information
chamilad committed Dec 15, 2015
1 parent c67aea5 commit bd865fd
Show file tree
Hide file tree
Showing 19 changed files with 466 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
# specific language governing permissions and limitations
# under the License.

import os
import time

from threading import Thread

import publisher
from logpublisher import *
from modules.event.application.signup.events import *
Expand Down Expand Up @@ -43,8 +48,6 @@ def __init__(self):
self.__topology_event_subscriber = EventSubscriber(constants.TOPOLOGY_TOPIC, mb_urls, mb_uname, mb_pwd)

def run_agent(self):
self.__log.info("Starting Cartridge Agent...")

# Start topology event receiver thread
self.register_topology_event_listeners()

Expand Down Expand Up @@ -192,7 +195,7 @@ def register_application_signup_event_listeners(self):
time.sleep(1)

def wait_for_complete_topology(self):
while not TopologyContext.topology.initialized:
while not TopologyContext.initialized:
self.__log.info("Waiting for complete topology event...")
time.sleep(5)
self.__log.info("Complete topology event received")
Expand Down Expand Up @@ -239,15 +242,15 @@ def on_member_initialized(msg):
Handlers.__log.debug("Member initialized event received: %r" % msg.payload)
event_obj = MemberInitializedEvent.create_from_json(msg.payload)

if not TopologyContext.topology.initialized:
if not TopologyContext.initialized:
return

event_handler.on_member_initialized_event(event_obj)

@staticmethod
def on_member_activated(msg):
Handlers.__log.debug("Member activated event received: %r" % msg.payload)
if not TopologyContext.topology.initialized:
if not TopologyContext.initialized:
return

event_obj = MemberActivatedEvent.create_from_json(msg.payload)
Expand All @@ -256,7 +259,7 @@ def on_member_activated(msg):
@staticmethod
def on_member_terminated(msg):
Handlers.__log.debug("Member terminated event received: %r" % msg.payload)
if not TopologyContext.topology.initialized:
if not TopologyContext.initialized:
return

event_obj = MemberTerminatedEvent.create_from_json(msg.payload)
Expand All @@ -265,7 +268,7 @@ def on_member_terminated(msg):
@staticmethod
def on_member_suspended(msg):
Handlers.__log.debug("Member suspended event received: %r" % msg.payload)
if not TopologyContext.topology.initialized:
if not TopologyContext.initialized:
return

event_obj = MemberSuspendedEvent.create_from_json(msg.payload)
Expand All @@ -275,17 +278,17 @@ def on_member_suspended(msg):
def on_complete_topology(msg):
event_obj = CompleteTopologyEvent.create_from_json(msg.payload)
TopologyContext.update(event_obj.topology)
if not TopologyContext.topology.initialized:
if not TopologyContext.initialized:
TopologyContext.initialized = True
Handlers.__log.info("Topology initialized from complete topology event")
TopologyContext.topology.initialized = True
event_handler.on_complete_topology_event(event_obj)

Handlers.__log.debug("Topology context updated with [topology] %r" % event_obj.topology.json_str)

@staticmethod
def on_member_started(msg):
Handlers.__log.debug("Member started event received: %r" % msg.payload)
if not TopologyContext.topology.initialized:
if not TopologyContext.initialized:
return

event_obj = MemberStartedEvent.create_from_json(msg.payload)
Expand Down Expand Up @@ -327,12 +330,31 @@ def on_application_signup_removed(msg):
event_handler.on_application_signup_removed_event(event_obj)


def check_termination(agent_obj):
terminate = False
terminator_file_path = os.path.abspath(os.path.dirname(__file__)) + "/terminator.txt"
while not terminate:
time.sleep(60)
try:
with open(terminator_file_path, 'r') as f:
file_output = f.read()
terminate = True if "true" in file_output else False
except IOError:
pass

log.info("Shutting down Stratos cartridge agent...")
agent_obj.terminate()


if __name__ == "__main__":
log = LogFactory().get_log(__name__)
cartridge_agent = CartridgeAgent()
try:
log.info("Starting Stratos cartridge agent...")
cartridge_agent = CartridgeAgent()
task_thread = Thread(target=check_termination, args=(cartridge_agent,))
task_thread.start()
cartridge_agent.run_agent()
except Exception as e:
log.exception("Cartridge Agent Exception: %r" % e)
# cartridge_agent.terminate()
log.info("Terminating Stratos cartridge agent...")
cartridge_agent.terminate()
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ class TopologyContext:
Handles and maintains a model of the topology provided by the Cloud Controller
"""
topology = Topology()
initialized = False

@staticmethod
def get_topology():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,17 @@ def __init__(self, publish_interval):
self.publish_interval = publish_interval
""":type : int"""
self.terminated = False
self.setDaemon(True)
self.setName("HealthStatPublisherManagerThread")
self.publisher = HealthStatisticsPublisher()
""":type : HealthStatisticsPublisher"""

""":type : IHealthStatReaderPlugin"""
self.stats_reader = Config.health_stat_plugin
self.log.debug("Created a HealthStatisticsPublisherManager thread")

def run(self):
self.log.debug("Starting the HealthStatisticsPublisherManager thread")
while not self.terminated:
time.sleep(self.publish_interval)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ def __init__(self, file_path, stream_definition, tenant_id, alias, date_time, me
self.member_id = member_id

self.terminated = False
self.setName("LogPublisherThread")
self.setDaemon(True)
self.log.debug("Created a LogPublisher thread")

def run(self):
self.log.debug("Starting the LogPublisher threads")
if os.path.isfile(self.file_path) and os.access(self.file_path, os.R_OK):
self.log.info("Starting log publisher for file: " + self.file_path + ", thread: " + str(current_thread()))
# open file and keep reading for new entries
Expand Down Expand Up @@ -132,6 +136,7 @@ def define_stream(tenant_id, alias, date_time):

def __init__(self, logfile_paths):
Thread.__init__(self)
self.setDaemon(True)

self.log = LogFactory().get_log(__name__)

Expand All @@ -158,8 +163,11 @@ def __init__(self, logfile_paths):
self.date_time = LogPublisherManager.get_current_date()

self.stream_definition = self.define_stream(self.tenant_id, self.alias, self.date_time)
self.setName("LogPublisherManagerThread")
self.log.debug("Created a LogPublisherManager thread")

def run(self):
self.log.debug("Starting the LogPublisherManager thread")
if self.logfile_paths is not None and len(self.logfile_paths):
for log_path in self.logfile_paths:
# thread for each log file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,12 @@ def schedule_artifact_update_task(repo_info, auto_checkout, auto_commit, update_
return

if git_repo.scheduled_update_task is None:
AgentGitHandler.log.info(
"ADC configuration: [auto-commit] %s, [auto-checkout] %s, [interval] %s",
auto_commit, auto_checkout, update_interval)
artifact_update_task = ArtifactUpdateTask(repo_info, auto_checkout, auto_commit)
async_task = ScheduledExecutor(update_interval, artifact_update_task)
AgentGitHandler.log.info("Starting a Scheduled Executor thread for Git polling task")

git_repo.scheduled_update_task = async_task
async_task.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,21 +133,14 @@ def on_artifact_updated_event(artifacts_updated_event):
update_artifacts = Config.read_property(constants.ENABLE_ARTIFACT_UPDATE, True)
auto_commit = Config.is_commits_enabled
auto_checkout = Config.is_checkout_enabled
log.info("ADC configuration: [update_artifacts] %s, [auto-commit] %s, [auto-checkout] %s"
% (update_artifacts, auto_commit, auto_checkout))

if update_artifacts:
try:
update_interval = int(Config.artifact_update_interval)
except ValueError:
log.exception("Invalid artifact sync interval specified: %s" % ValueError)
log.debug("Invalid artifact sync interval specified: %s, defaulting to 10 seconds" % ValueError)
update_interval = 10

log.info("Artifact updating task enabled, update interval: %s seconds" % update_interval)

log.info("Auto Commit is turned %s " % ("on" if auto_commit else "off"))
log.info("Auto Checkout is turned %s " % ("on" if auto_checkout else "off"))

AgentGitHandler.schedule_artifact_update_task(
repo_info,
auto_checkout,
Expand Down Expand Up @@ -178,7 +171,7 @@ def on_member_activated_event(member_activated_event):
member_activated_event.member_id)

if not member_initialized:
log.error("Member has not initialized, failed to execute member activated event")
log.debug("Member has not initialized, failed to execute member activated event")
return

execute_event_extendables(constants.MEMBER_ACTIVATED_EVENT, {})
Expand All @@ -203,6 +196,8 @@ def on_complete_topology_event(complete_topology_event):
log.info(
"Member initialized [member id] %s, [cluster-id] %s, [service] %s"
% (member_id_in_payload, cluster_id_in_payload, service_name_in_payload))
else:
log.info("Member not initialized in topology.......")

topology = complete_topology_event.get_topology()
service = topology.get_service(service_name_in_payload)
Expand Down Expand Up @@ -273,7 +268,7 @@ def on_member_terminated_event(member_terminated_event):
)

if not member_initialized:
log.error("Member has not initialized, failed to execute member terminated event")
log.debug("Member has not initialized, failed to execute member terminated event")
return

execute_event_extendables(constants.MEMBER_TERMINATED_EVENT, {})
Expand All @@ -291,7 +286,7 @@ def on_member_suspended_event(member_suspended_event):
)

if not member_initialized:
log.error("Member has not initialized, failed to execute member suspended event")
log.debug("Member has not initialized, failed to execute member suspended event")
return

execute_event_extendables(constants.MEMBER_SUSPENDED_EVENT, {})
Expand All @@ -309,7 +304,7 @@ def on_member_started_event(member_started_event):
)

if not member_initialized:
log.error("Member has not initialized, failed to execute member started event")
log.debug("Member has not initialized, failed to execute member started event")
return

execute_event_extendables(constants.MEMBER_STARTED_EVENT, {})
Expand All @@ -324,7 +319,7 @@ def start_server_extension():
service_name_in_payload, cluster_id_in_payload, member_id_in_payload)

if not member_initialized:
log.error("Member has not initialized, failed to execute start server event")
log.debug("Member has not initialized, failed to execute start server event")
return

execute_event_extendables("StartServers", {})
Expand Down Expand Up @@ -416,6 +411,7 @@ def execute_event_extendables(event, input_values):
input_values = add_common_input_values(input_values)
except Exception as e:
log.error("Error while adding common input values for event extendables: %s" % e)

input_values["EVENT"] = event
log.debug("Executing extensions for [event] %s with [input values] %s" % (event, input_values))
# Execute the extension
Expand All @@ -436,6 +432,8 @@ def execute_plugins_for_event(event, input_values):
for plugin_info in plugins_for_event:
log.debug("Executing plugin %s for event %s" % (plugin_info.name, event))
plugin_thread = PluginExecutor(plugin_info, input_values)
plugin_thread.setName("PluginExecutorThreadForPlugin%s" % plugin_info.name)
log.debug("Starting a PluginExecutor Thread for event %s" % event.__class__.__name__)
plugin_thread.start()

# block till plugin run completes.
Expand All @@ -456,6 +454,8 @@ def execute_extension_for_event(event, extension_values):
if Config.extension_executor is not None:
log.debug("Executing extension for event [%s]" % event)
extension_thread = PluginExecutor(Config.extension_executor, extension_values)
extension_thread.setName("ExtensionExecutorThreadForExtension%s" % event)
log.debug("Starting a PluginExecutor Thread for event extension %s" % event.__class__.__name__)
extension_thread.start()

# block till plugin run completes.
Expand Down Expand Up @@ -535,9 +535,11 @@ def is_member_initialized_in_topology(service_name, cluster_id, member_id):
if member is None:
raise Exception("Member id not found in topology [member] %s" % member_id)

log.info("Found member: " + member.to_json())
log.debug("Found member: " + member.to_json())
if member.status == MemberStatus.Initialized:
return True

log.debug("Member doesn't exist in topology")
return False


Expand All @@ -551,9 +553,9 @@ def member_exists_in_topology(service_name, cluster_id, member_id):
if cluster is None:
raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id)

activated_member = cluster.get_member(member_id)
if activated_member is None:
log.error("Member id not found in topology [member] %s" % member_id)
member = cluster.get_member(member_id)
if member is None:
log.debug("Member id not found in topology [member] %s" % member_id)
return False

return True
Expand Down Expand Up @@ -589,8 +591,6 @@ def add_common_input_values(plugin_values):

# Values for the plugins to use in case they want to connect to the MB.
plugin_values["MB_IP"] = Config.mb_ip
plugin_values["MB_PORT"] = Config.mb_port
plugin_values["MB_URLS"] = Config.mb_urls

plugin_values["APPLICATION_PATH"] = Config.app_path
plugin_values["PARAM_FILE_PATH"] = Config.read_property(constants.PARAM_FILE_PATH, False)
Expand Down Expand Up @@ -687,11 +687,13 @@ class PluginExecutor(Thread):

def __init__(self, plugin_info, values):
Thread.__init__(self)
self.setDaemon(True)
self.__plugin_info = plugin_info
self.__values = values
self.__log = LogFactory().get_log(__name__)

def run(self):
self.__log.debug("Starting the PluginExecutor thread")
try:
self.__plugin_info.plugin_object.run_plugin(self.__values)
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

import time
from threading import Thread
from log import LogFactory


log = LogFactory().get_log(__name__)


class AbstractAsyncScheduledTask:
Expand Down Expand Up @@ -53,6 +57,9 @@ def __init__(self, delay, task):
""" :type : AbstractAsyncScheduledTask """
self.terminated = False
""" :type : bool """
self.setName("ScheduledExecutorForTask%s" % self.task.__class__.__name__)
self.setDaemon(True)
log.debug("Created a ScheduledExecutor thread for task %s" % self.task.__class__.__name__)

def run(self):
"""
Expand All @@ -62,6 +69,9 @@ def run(self):
while not self.terminated:
time.sleep(self.delay)
task_thread = Thread(target=self.task.execute_task)
task_thread.setName("WorkerThreadForTask%s" % self.task.__class__.__name__)
task_thread.setDaemon(True)
log.debug("Starting a worker thread for the Scheduled Executor for task %s" % self.task.__class__.__name__)
task_thread.start()

def terminate(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ def publish_instance_activated_event():
publisher = get_publisher(constants.INSTANCE_STATUS_TOPIC + constants.INSTANCE_ACTIVATED_EVENT)
publisher.publish(instance_activated_event)

log.info("Starting health statistics notifier")
health_stat_publishing_enabled = Config.read_property(constants.CEP_PUBLISHER_ENABLED, True)

if health_stat_publishing_enabled:
Expand Down Expand Up @@ -231,6 +230,9 @@ def publish(self, event):

# start a thread to execute publish event
publisher_thread = Thread(target=self.__publish_event, args=(event, mb_ip, mb_port, auth, payload))
publisher_thread.setDaemon(True)
publisher_thread.setName("MBEventPublisherThreadForEvent%s" % event.__class__.__name__)
self.__log.debug("Starting a publisher thread for event %s " % event.__class__.__name__)
publisher_thread.start()

# give sometime for the thread to complete
Expand Down
Loading

0 comments on commit bd865fd

Please sign in to comment.