diff --git a/database/connection.py b/database/connection.py index 04e5688..c529aa4 100644 --- a/database/connection.py +++ b/database/connection.py @@ -2,6 +2,7 @@ from pymongo import MongoClient from config.loader import Configuration import platform +import logging def load_configuration(): if platform.system() == 'Windows': @@ -21,4 +22,6 @@ def get_collection(tenant_name: str, collection_name: str): mongo_tenant_details = BASE_TENANT_STRING % tenant_name database = mongo_client[mongo_tenant_details] generic_collection = database[collection_name] + # logging.debug(f"got database {database}") + # logging.debug(f"info of database {mongo_tenant_details}") return generic_collection \ No newline at end of file diff --git a/database/watcher/watcher.py b/database/watcher/watcher.py index 889dce8..d6775b8 100644 --- a/database/watcher/watcher.py +++ b/database/watcher/watcher.py @@ -196,7 +196,6 @@ def process_new_sync_data_for_recon(tenant, new_document): redis_key = f"recon_{tenant}_{rcId}" logging.debug(f"Redis key found: {redis_key}") - recon_report_metadata_id = redis_client.hget(redis_key, 'reconReportMetadataId') reconciliationId = redis_client.hget(redis_key, "reconciliationId") #batch_id = redis_client.hget(key, 'batchId') appId = redis_client.hget(redis_key, 'appId') @@ -259,7 +258,8 @@ def process_new_sync_data_for_recon(tenant, new_document): returnval = True login_break_count = getBreakCount(tenant,reconciliationId,login) - + + present_in_target_only = [] if login not in all_cymmetri_logins_redis_data: logging.debug(f"User login {login} found in target app only.") record = { @@ -273,10 +273,18 @@ def process_new_sync_data_for_recon(tenant, new_document): "login":login, "break_count":login_break_count } - shReconBreakRecords.insert_one(record) - + #shReconBreakRecords.insert_one(record) + present_in_target_only.append(record) + try: + shReconBreakRecords.insert_many(present_in_target_only) + logging.debug("Data Inserted for present_in_target_only into shReconBreakRecords succesfully.") + except Exception as e: + logging.error(f"Failed to insert data for present_in_target_only into shReconBreakRecords collection: {e}") + + + app_overdue = [] if login in final_app_overdue_redis_data: logging.debug(f'user {login} should have been disabled but is still in target app') record = { @@ -290,7 +298,14 @@ def process_new_sync_data_for_recon(tenant, new_document): "login":login, "break_count": login_break_count } - shReconBreakRecords.insert_one(record) + #shReconBreakRecords.insert_one(record) + app_overdue.append(record) + try: + shReconBreakRecords.insert_many(app_overdue) + logging.debug("Data Inserted for app_overdue into shReconBreakRecords succesfully.") + except Exception as e: + logging.error(f"Failed to insert data for app_overdue into shReconBreakRecords collection: {e}") + logging.debug("Process for syncdata for recon insert data is completed.") return returnval @@ -302,6 +317,8 @@ def getBreakCount(tenant, reconciliationId, login): def execute_final_code(tenant, reconciliationId): + login_data = [] + logging.info(f"Starting final execution for tenant: {tenant}, reconciliationId: {reconciliationId}") shReconBreakRecords = get_collection(tenant, 'shReconBreakRecords') @@ -328,7 +345,6 @@ def execute_final_code(tenant, reconciliationId): logging.debug(f"Retrieved from Redis - appId: {appId}, appName: {appName}, reconReportMetadataId: {rrMetaId}") - for i in full_set_of_target_logins: try: full_set_of_cymmetri_logins.remove(i) @@ -339,7 +355,7 @@ def execute_final_code(tenant, reconciliationId): #only_present_in_Cymmetri = full_set_of_cymmetri_logins - full_set_of_target_logins #print(full_set_of_cymmetri_logins) - + for login in full_set_of_cymmetri_logins: #print(f'{login} only found in cymmetri') login_break_count = getBreakCount(tenant, reconciliationId,login) @@ -354,8 +370,17 @@ def execute_final_code(tenant, reconciliationId): "login":login, "break_count": login_break_count } - shReconBreakRecords.insert_one(record) - #logging.debug(f"Inserted break record for login {login}: {record.inserted_id}") + #shReconBreakRecords.insert_one(record) + + login_data.append(record) + try: + shReconBreakRecords.insert_many(login_data) + logging.debug("Data Inserted into shReconBreakRecords succesfully.") + except Exception as e: + logging.error(f"Failed to insert data into shReconBreakRecords collection: {e}") + + + break_types = ["present_in_target_only", "present_in_Cymmetri_only", "app_overdue"] diff --git a/modal2.py b/modal2.py index 4481af8..c0efbc2 100644 --- a/modal2.py +++ b/modal2.py @@ -4,7 +4,7 @@ from pymongo import MongoClient,DESCENDING import logging from logging.config import dictConfig -from fastapi import FastAPI,HTTPException,Header, Request +from fastapi import FastAPI,HTTPException,Header, Request, status from database.connection import get_collection from bson import ObjectId from pydantic import BaseModel @@ -40,6 +40,32 @@ def load_configuration(): format='[%(asctime)s] %(levelname)s in %(filename)s on %(lineno)d: %(message)s', ) +def create_bad_request_response(response_val): + return Response( + content=json.dumps(response_val), + status_code=status.HTTP_400_BAD_REQUEST, + headers={'Content-Type': 'application/json'} + ) + +def ResponseModel(data, message, code=200, errorCode=None): + return { + "success": True, + "data": data, + "code": code, + "message": message, + "errorCode": errorCode + } + +def ErrorResponseModel(error, code, message, errorCode): + return { + "data": None, + "success": False, + "code": code, + "message": message, + "errorCode": errorCode, + "error": error + } + #print(f'Log level is {log_level_from_data}') # dictConfig({ # "version": 1, @@ -78,11 +104,20 @@ class Config: # Pydantic model for request body validation class ModalRequestSchema(BaseModel): - Id: str + reconReportMetadataID: str + class Config: + schema_extra = { + "example": { + "reconReportMetadataID": "6126126126166af11", + } + } + +class ModalRequestSchema_get_data(BaseModel): + reconReportMetadataID: str class Config: schema_extra = { "example": { - "reconReportMetadata": "6126126126166af11", + "reconReportMetadataID": "6126126126166af11", } } @@ -113,7 +148,7 @@ class Config: # } class BreakRecordWithSearchAndSortRequestSchema(BaseModel): - Id: str + reconReportMetadataID: str pageNumber: int pageSize: int filters: Dict = {} # Allow both string and integer values for filters @@ -150,14 +185,24 @@ def breakReportMetadata(tenant: str): # API endpoint to fetch data by ObjectId # API endpoint to fetch data by ObjectId @app.post('/reconsrvc/get_data') -async def get_data(request: ModalRequestSchema, tenant: str = Header(...)): +async def get_data(request: ModalRequestSchema_get_data, tenant: str = Header(...)): logging.info("get_data API Started") - logging.info(f"Fetching data for tenant: {tenant} and request ID: {request.Id}") - + logging.info(f"Fetching data for tenant: {tenant} and request ID: {request.reconReportMetadataID}") + reconReportMetadataID = request.reconReportMetadataID + if not reconReportMetadataID: + response_val = { + "data": None, + "success": False, + "errorCode": "RECONMETADATAID_MISSING_ERROR", + "message": "Missing reconReportMetadataID in request" + } + return create_bad_request_response(response_val) + + breakReportMetadata_collection = breakReportMetadata(tenant) logging.debug("Attempting to retrieve document from the database.") - data = breakReportMetadata_collection.find_one({'reconReportMetadata': request.Id}) + data = breakReportMetadata_collection.find_one({'reconReportMetadata': reconReportMetadataID}) if data: logging.info("Document found, processing data.") @@ -169,83 +214,99 @@ async def get_data(request: ModalRequestSchema, tenant: str = Header(...)): data['startTime'] = start_date data['endTime'] = last_date logging.debug(f"Data processed with updated times: Start - {start_date}, End - {last_date}") - return data + return ResponseModel(data=data, message="get data excecuted succesfully") + else: - error_message = f"No records found matching Id: {request.Id}" + error_message = f"No records found matching Id: {reconReportMetadataID}" logging.error(error_message) - raise HTTPException(status_code=404, detail=error_message) - -# Ensure that appropriate logging configuration is set up to capture and store logs. -logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') + @app.post("/reconsrvc/matching_records") async def get_matching_records(request: MatchingRecordRequestSchema, tenant: str = Header(...)): # Log the start of the operation with tenant information - logging.info("matching_record API satrted") + logging.info("matching_record API started") logging.info(f"Starting 'get_matching_records' for tenant: {tenant} at {datetime.datetime.now()}") - # Initialize the collection references - reconciliationPull_collection = reconciliationPull(tenant) # Presumably initializes the collection based on tenant - shReconReport_collection = get_collection(tenant, 'shReconReport') # Gets the 'shReconReport' collection for the tenant - - # Step 2: Create a pipeline to aggregate data for the specified appId and calculate the total number of unique reconciliationIds - pipeline_for_count = [ - {"$match": {"appId": request.appId, "createdAt": {"$exists": True}}}, - {"$group": {"_id": "$reconciliationId"}}, - {"$group": {"_id": None, "count": {"$sum": 1}}} - ] - - # Run the aggregation pipeline for counting documents - count_Result = shReconReport_collection.aggregate(pipeline_for_count) - count = 0 - for doc in count_Result: - count = doc['count'] # Extract the total count of unique documents - logging.debug(f"Total records found: {count}") - - # Step 3: Define the pipeline to retrieve the latest document per reconciliationId and paginate the results - pipeline = [ - {"$match": {"appId": request.appId, "createdAt": {"$exists": True}}}, - {"$group": {"_id": "$reconciliationId", "maxStartTime": {"$max": "$createdAt"}, "latestDocument": {"$last": "$$ROOT"}}}, - {"$sort": {"_id": 1}}, - {"$skip": request.pageNumber * request.pageSize}, - {"$limit": request.pageSize}, - {"$replaceRoot": {"newRoot": "$latestDocument"}} - ] - - # Execute the main data retrieval pipeline - result = shReconReport_collection.aggregate(pipeline) - response = [] - - # Iterate through the results, fetch additional data, and prepare the response - for doc in result: - recon_id = doc['reconciliationId'] - last_date = doc['createdAt'] + datetime.timedelta(hours=5, minutes=30) # Adjust time based on timezone - logging.debug(f"Processing document for reconciliationId: {recon_id}") - - recon_data = reconciliationPull_collection.find_one({'_id': ObjectId(recon_id)}) - if recon_data: - response.append({ - "Id": str(doc['reconReportMetadata']), - "name": recon_data.get('name'), - "mode": recon_data.get('reconMode'), - "type": recon_data.get('type'), - "recon_type": recon_data.get('reconType'), - "last_run_datetime": last_date - }) + try: + # Initialize the collection references + reconciliationPull_collection = reconciliationPull(tenant) # Presumably initializes the collection based on tenant + shReconReport_collection = get_collection(tenant, 'shReconReport') # Gets the 'shReconReport' collection for the tenant + logging.info("Collections successfully initialized.") + + # Step 2: Create a pipeline to aggregate data for the specified appId and calculate the total number of unique reconciliationIds + pipeline_for_count = [ + {"$match": {"appId": request.appId, "createdAt": {"$exists": True}}}, + {"$group": {"_id": "$reconciliationId"}}, + {"$group": {"_id": None, "count": {"$sum": 1}}} + ] + + # Run the aggregation pipeline for counting documents + count_Result = shReconReport_collection.aggregate(pipeline_for_count) + count = 0 + for doc in count_Result: + count = doc['count'] # Extract the total count of unique documents + logging.debug(f"Total records found: {count}") + + if not count: + response_val = { + "data": None, + "success": False, + "errorCode": "DATA_MISSING_ERROR", + "message": "Missing data for requested appId" + } + return create_bad_request_response(response_val) + + # Step 3: Define the pipeline to retrieve the latest document per reconciliationId and paginate the results + pipeline = [ + {"$match": {"appId": request.appId, "createdAt": {"$exists": True}}}, + {"$group": {"_id": "$reconciliationId", "maxStartTime": {"$max": "$createdAt"}, "latestDocument": {"$last": "$$ROOT"}}}, + {"$sort": {"maxStartTime": -1}}, + {"$skip": request.pageNumber * request.pageSize}, + {"$limit": request.pageSize}, + {"$replaceRoot": {"newRoot": "$latestDocument"}} + ] + + # Execute the main data retrieval pipeline + result = shReconReport_collection.aggregate(pipeline) + response = [] + + # Iterate through the results, fetch additional data, and prepare the response + for doc in result: + recon_id = doc['reconciliationId'] + last_date = doc['createdAt'] + datetime.timedelta(hours=5, minutes=30) # Adjust time based on timezone + #logging.debug(f"Processing document for reconciliationId: {recon_id}") + + recon_data = reconciliationPull_collection.find_one({'_id': ObjectId(recon_id)}) + if recon_data: + response.append({ + "Id": str(doc['reconReportMetadata']), + "name": recon_data.get('name'), + "mode": recon_data.get('reconMode'), + "type": recon_data.get('type'), + "recon_type": recon_data.get('reconType'), + "last_run_datetime": last_date + }) + + # Construct the return value + return_val = { + 'success': True, + 'data': {'content': response}, + 'totalRecords': count, + 'totalPages': math.ceil(count / request.pageSize) + } - # Construct the return value - return_val = { - 'success': True, - 'data': {'content': response}, - 'totalRecords': count, - 'totalPages': math.ceil(count / request.pageSize) - } + # Log the successful completion of the operation + logging.info(f"Ending 'get_matching_records' for tenant: {tenant} at {datetime.datetime.now()}") - # Log the successful completion of the operation - logging.info(f"Ending 'get_matching_records' for tenant: {tenant} at {datetime.datetime.now()}") + return ResponseModel(data=return_val, message="Matching Record executed successfully") + + + + except Exception as e: + logging.error(f"An error occurred: {str(e)}") + return ErrorResponseModel(error=str(e), code=500, message="Exception Occured.", errorCode= "Invalid") - return return_val @app.post('/reconsrvc/modal') async def populating_data(request: ModalRequestSchema, tenant: str = Header(...)) -> Response: @@ -254,10 +315,22 @@ async def populating_data(request: ModalRequestSchema, tenant: str = Header(...) syncDataForRecon = get_collection(tenant, "syncDataForRecon") user = get_collection(tenant, "user") + + try: # Convert _id from string to ObjectId - object_id = bson.ObjectId(request.Id) - logging.debug(f"Converted {request.Id} to ObjectId: {object_id}") + object_id = bson.ObjectId(request.reconReportMetadataID) + logging.debug(f"Converted {request.reconReportMetadataID} to ObjectId: {object_id}") + print("object_id",object_id) + if not object_id: + response_val = { + "data": None, + "success": False, + "errorCode": "RECONREPORTMETADATAID_MISSING_ERROR", + "message": "Missing reconreportmetadataID in request" + } + return create_bad_request_response(response_val) + # Retrieve the break record breakReconRecord = shReconBreakRecords.find_one({"_id": object_id}) @@ -317,181 +390,212 @@ async def populating_data(request: ModalRequestSchema, tenant: str = Header(...) 'breakCount': shReconBreakRecords.count_documents(query_breakCount_Cymm) } } - logging.debug(f"Return value prepared: {return_val}") - return return_val - + data_response = return_val + return ResponseModel(data=data_response, message="Policy mapping generated successfully") + except Exception as e: logging.error(f"An error occurred: {str(e)}") - raise HTTPException(status_code=500, detail=str(e)) + return ErrorResponseModel(error=str(e), code=500, message="Exception Occured.", errorCode= "Invalid") + @app.post('/reconsrvc/break_data') async def break_data_table_records(request: BreakRecordWithSearchAndSortRequestSchema, tenant: str = Header(...)) -> Dict: logging.info(f"Processing break data for tenant: {tenant} with request: {request}") + try: - shReconBreakRecords_collection = get_collection(tenant, 'shReconBreakRecords') + shReconBreakRecords_collection = get_collection(tenant, 'shReconBreakRecords') - search_keyword = request.keyword - sortOn_field = "" - sortDirection_value = "" - if request.sortOn == "": - sortOn_field = request.sort - else: - sortOn_field = request.sortOn + search_keyword = request.keyword + sortOn_field = "" + sortDirection_value = "" + if request.sortOn == "": + sortOn_field = request.sort + else: + sortOn_field = request.sortOn - if request.sortDirection == "": - sortDirection_value = request.direction - else: - sortDirection_value = request.sortDirection + if request.sortDirection == "": + sortDirection_value = request.direction + else: + sortDirection_value = request.sortDirection - - #logging.debug(f'found keyword {search_keyword}') - reconReportMetadataId = request.Id - #logging.debug(f"reconReportMetadataId: {reconReportMetadataId}") - search_filters_dict = request.filters - - '''if search_filters_dict['break_count_min']: - search_filters_dict['break_count_min'] = {'$gte': search_filters_dict['break_count_min']} - del search_filters_dict['break_count_min'] - if search_filters_dict['break_count_max']: - search_filters_dict['break_count_max'] = {'$lte': search_filters_dict['break_count_max']} - del search_filters_dict['break_count_max'] - ''' - # Constructing the $and conditions to match reconReportMetadataId, login, and break_type - and_conditions = [{"reconReportMetadataId": reconReportMetadataId}] - - if "login" in search_filters_dict and search_filters_dict["login"]: - and_conditions.append({"$or": [ - {"cymmetri_login": search_filters_dict["login"]}, - {"target_app_login": search_filters_dict["login"]}, - {"break_type": search_filters_dict["login"]}, - ]}) - - if "break_type" in search_filters_dict and search_filters_dict["break_type"]: - and_conditions.append({"break_type": search_filters_dict["break_type"]}) - - if "break_count_min" in search_filters_dict and search_filters_dict["break_count_min"]: - and_conditions.append({"break_count": {"$gte": search_filters_dict["break_count_min"]}}) - - if "break_count_max" in search_filters_dict and search_filters_dict["break_count_max"]: - and_conditions.append({"break_count": {"$lte": search_filters_dict["break_count_max"]}}) - - if search_keyword: + #logging.debug(f'found keyword {search_keyword}') - # Construct a regex pattern to match the provided keyword in a continuous manner as a whole word - #keyword_regex ="" - # Add a new condition to match any field that contains the keyword - or_conditions = [] - for field in ["cymmetri_login", "target_app_login", "break_type"]: - value = f'.*{search_keyword}.*' - logging.debug(f'value is {value}') - or_conditions.append({field: {"$regex": value}}) - and_conditions.append({"$or": or_conditions}) - - #logging.debug(f'And conditions are {and_conditions}') - - if sortDirection_value == "ASC": - sortDirection_value = 1 - else: - sortDirection_value = -1 - - pipeline = [ - {"$match": {"$and": and_conditions}}, - {"$sort": {sortOn_field: sortDirection_value}}, - {"$skip": request.pageNumber * request.pageSize}, - {"$limit": request.pageSize} - ] - - logging.debug(f"Constructed MongoDB query pipeline: {pipeline}") - - result = shReconBreakRecords_collection.aggregate(pipeline) - #logging.debug(f"Query executed successfully") - response = [] - - #counts = {} - - for doc in result: - id_str = str(doc.get('_id')) - #counts[id_str] = await get_count_of_doc(shReconBreakRecords_collection, id_str) - - #count_a = doc.get('count') - #doc = doc.get('latestRecord') - response.append({ - "Id": id_str, - "target_app_login": doc.get('target_app_login'), - "source_app_login": doc.get('cymmetri_login'), - "cymmetri_login": doc.get('cymmetri_login'), - "break_type": doc.get('break_type'), - "break_count": doc.get('break_count') - }) + reconReportMetadataId = request.reconReportMetadataID + + if not reconReportMetadataId: + response_val = { + "data": None, + "success": False, + "errorCode": "RECONMETADATAID_MISSING_ERROR", + "message": "Missing reconReportMetadataId in request" + } + return create_bad_request_response(response_val) + + + #logging.debug(f"reconReportMetadataId: {reconReportMetadataId}") + search_filters_dict = request.filters + + '''if search_filters_dict['break_count_min']: + search_filters_dict['break_count_min'] = {'$gte': search_filters_dict['break_count_min']} + del search_filters_dict['break_count_min'] + if search_filters_dict['break_count_max']: + search_filters_dict['break_count_max'] = {'$lte': search_filters_dict['break_count_max']} + del search_filters_dict['break_count_max'] + ''' + # Constructing the $and conditions to match reconReportMetadataId, login, and break_type + and_conditions = [{"reconReportMetadataId": reconReportMetadataId}] + + if "login" in search_filters_dict and search_filters_dict["login"]: + and_conditions.append({"$or": [ + {"cymmetri_login": search_filters_dict["login"]}, + {"target_app_login": search_filters_dict["login"]}, + {"break_type": search_filters_dict["login"]}, + ]}) + + if "break_type" in search_filters_dict and search_filters_dict["break_type"]: + and_conditions.append({"break_type": search_filters_dict["break_type"]}) + + if "break_count_min" in search_filters_dict and search_filters_dict["break_count_min"]: + and_conditions.append({"break_count": {"$gte": search_filters_dict["break_count_min"]}}) + + if "break_count_max" in search_filters_dict and search_filters_dict["break_count_max"]: + and_conditions.append({"break_count": {"$lte": search_filters_dict["break_count_max"]}}) + + if search_keyword: + #logging.debug(f'found keyword {search_keyword}') + # Construct a regex pattern to match the provided keyword in a continuous manner as a whole word + #keyword_regex ="" + # Add a new condition to match any field that contains the keyword + or_conditions = [] + for field in ["cymmetri_login", "target_app_login", "break_type"]: + value = f'.*{search_keyword}.*' + logging.debug(f'value is {value}') + or_conditions.append({field: {"$regex": value}}) + and_conditions.append({"$or": or_conditions}) + + #logging.debug(f'And conditions are {and_conditions}') + + if sortDirection_value == "ASC": + sortDirection_value = 1 + else: + sortDirection_value = -1 - count_pipeline = pipeline.copy() # Creating a copy to avoid modifying the original pipeline - count_pipeline[-2] = {"$count": "count"} # Replacing the last stage to get count directly - count_pipeline.pop() - count_result = list(shReconBreakRecords_collection.aggregate(count_pipeline)) - total_records = count_result[0]["count"] if count_result else 0 - logging.debug(f"Total records found: {total_records}") + pipeline = [ + {"$match": {"$and": and_conditions}}, + {"$sort": {sortOn_field: sortDirection_value}}, + {"$skip": request.pageNumber * request.pageSize}, + {"$limit": request.pageSize} + ] - total_pages = math.ceil(total_records / request.pageSize) - logging.debug("Returning processed data to client.") + logging.debug(f"Constructed MongoDB query pipeline: {pipeline}") - return { - "success": True, - "data": {"content": response}, - "totalRecords": total_records, - "totalPages": total_pages - } + result = shReconBreakRecords_collection.aggregate(pipeline) + #logging.debug(f"Query executed successfully") + response = [] + + #counts = {} + + for doc in result: + id_str = str(doc.get('_id')) + #counts[id_str] = await get_count_of_doc(shReconBreakRecords_collection, id_str) + + #count_a = doc.get('count') + #doc = doc.get('latestRecord') + response.append({ + "Id": id_str, + "target_app_login": doc.get('target_app_login'), + "source_app_login": doc.get('cymmetri_login'), + "cymmetri_login": doc.get('cymmetri_login'), + "break_type": doc.get('break_type'), + "break_count": doc.get('break_count') + }) + + count_pipeline = pipeline.copy() # Creating a copy to avoid modifying the original pipeline + count_pipeline[-2] = {"$count": "count"} # Replacing the last stage to get count directly + count_pipeline.pop() + count_result = list(shReconBreakRecords_collection.aggregate(count_pipeline)) + total_records = count_result[0]["count"] if count_result else 0 + logging.debug(f"Total records found: {total_records}") + + total_pages = math.ceil(total_records / request.pageSize) + logging.debug("Returning processed data to client.") + + + data = { + "success": True, + "data": {"content": response}, + "totalRecords": total_records, + "totalPages": total_pages + } + return ResponseModel(data=data, message="Break Data Executed succesfully.") + + except Exception as e: + return ErrorResponseModel(error=str(e), code=500, message="Exception Occured.", errorCode= "Invalid") @app.post('/reconsrvc/start_recon') async def start_recon(request: StartReconRequestSchema, fullRequest: Request) -> Response: logging.debug("Start of 360 degree recon for reconciliationId: %s", request.reconciliationId) - - # Fetch job history collection - job_history_collection = get_collection(fullRequest.headers.get('tenant'), 'recon_360_job_history') - - # Check for existing jobs - count = job_history_collection.count_documents({'reconciliationId': request.reconciliationId, 'completed': False}) - logging.debug("Existing job count for reconciliationId %s: %d", request.reconciliationId, count) - - if count > 0: - response_val = {} - response_val['success'] = 'false' - response_val['error'] = 'JOB_RUNNING' - response_val['errorMessage'] = '360 degree reconciliation is already running for given reconciliationId' - raise HTTPException(status_code=400, detail=response_val) - - # Call to external service - baseUrl_for_java_srvc = data['RECON_PROV_SRVC'] - url = baseUrl_for_java_srvc + '/reconciliation/userReconciliation' - payload = json.dumps({ - "reconciliationId": request.reconciliationId, - "reconType": request.reconType - }) - - logging.info("Calling external service at URL %s with payload %s", url, payload) - response = requests.post(url, headers=fullRequest.headers, data=payload) - logging.info("Received response status %s from external service for reconciliationId %s", response.status_code, request.reconciliationId) - - if response.status_code == 200: - job_history_collection.insert_one({ + try: + + reconciliationId = request.reconciliationId + + # Fetch job history collection + job_history_collection = get_collection(fullRequest.headers.get('tenant'), 'recon_360_job_history') + + if not reconciliationId: + response_val = { + "data": None, + "success": False, + "errorCode": "RECONCILIATIONID_MISSING_ERROR", + "message": "Missing reconciliationID in request" + } + return create_bad_request_response(response_val) + + # Check for existing jobs + count = job_history_collection.count_documents({'reconciliationId': reconciliationId, 'completed': False}) + logging.debug("Existing job count for reconciliationId %s: %d", reconciliationId, count) + + if count > 0: + response_val = {} + response_val['success'] = 'false' + response_val['error'] = 'JOB_RUNNING' + response_val['errorMessage'] = '360 degree reconciliation is already running for given reconciliationId' + raise HTTPException(status_code=400, detail=response_val) + + # Call to external service + baseUrl_for_java_srvc = data['RECON_PROV_SRVC'] + url = baseUrl_for_java_srvc + '/reconciliation/userReconciliation' + payload = json.dumps({ "reconciliationId": request.reconciliationId, - "completed": False, - "startTime": datetime.datetime.now() + "reconType": request.reconType }) - logging.info("Inserted new job history record for reconciliationId %s", request.reconciliationId) - # Start background task - logging.debug("Starting background task for tenant %s and reconciliationId %s", fullRequest.headers.get('tenant'), request.reconciliationId) - create_task(printTenant(tenant=fullRequest.headers.get('tenant'), reconciliationId=request.reconciliationId)) + logging.info("Calling external service at URL %s with payload %s", url, payload) + response = requests.post(url, headers=fullRequest.headers, data=payload) + logging.info("Received response status %s from external service for reconciliationId %s", response.status_code, request.reconciliationId) - print("res",response) - - return response.json() + if response.status_code == 200: + job_history_collection.insert_one({ + "reconciliationId": request.reconciliationId, + "completed": False, + "startTime": datetime.datetime.now() + }) + logging.info("Inserted new job history record for reconciliationId %s", request.reconciliationId) + + # Start background task + logging.debug("Starting background task for tenant %s and reconciliationId %s", fullRequest.headers.get('tenant'), request.reconciliationId) + create_task(printTenant(tenant=fullRequest.headers.get('tenant'), reconciliationId=request.reconciliationId)) + + data_response = response.json() + return ResponseModel(data=data_response, message="360 Recon Started successfully") + except Exception as e: + return ErrorResponseModel(error=str(e), code=500, message="Exception while starting 360 Recon.", errorCode= "Invalid") if __name__ == "__main__": import uvicorn - uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file + uvicorn.run(app, host="127.0.0.1", port=8000) \ No newline at end of file