From ddd070ad2776caedf354987a5a44c18f160753fc Mon Sep 17 00:00:00 2001 From: cuihubin <530051970@qq.com> Date: Wed, 27 Sep 2023 14:45:11 +0800 Subject: [PATCH 1/2] update code for sync --- source/constructs/api/common/enum.py | 6 + source/constructs/api/data_source/crud.py | 55 +- .../api/data_source/glue_database_detector.py | 26 +- source/constructs/api/data_source/main.py | 15 +- source/constructs/api/data_source/schemas.py | 76 +-- source/constructs/api/data_source/service.py | 612 ++++++++++-------- .../constructs/api/db/models_data_source.py | 2 + .../api/lambda/sync_crawler_results.py | 4 +- source/constructs/api/main.py | 2 + .../admin/database/1.0.x-1.1.0/20_update.sql | 2 + .../admin/database/whole/10_data_source.sql | 6 +- 11 files changed, 446 insertions(+), 360 deletions(-) diff --git a/source/constructs/api/common/enum.py b/source/constructs/api/common/enum.py index c761963f..1ff0748f 100644 --- a/source/constructs/api/common/enum.py +++ b/source/constructs/api/common/enum.py @@ -105,6 +105,7 @@ class MessageEnum(Enum): SOURCE_SUBNET_NOT_PRIVATE = {1242: "Subnet for jdbc connection is not private subnet"} SOURCE_SUBNET_NOT_CONTAIN_NAT = {1243: "Subnet for jdbc connection not contain NAT Gateway"} SOURCE_JDBC_NO_SCHEMA = {1244: "There is no user created schema found in the jdbc connection"} + SOURCE_JDBC_CREATE_FAIL = {1245: "JDBC connection create failed"} # label LABEL_EXIST_FAILED = {1611: "Cannot create duplicated label"} @@ -317,3 +318,8 @@ class DataSourceType(str, Enum): glue_database = "glue_database" jdbc = "jdbc" all = "all" + +@unique +class JDBCCreateType(Enum): + ADD = 1 + IMPORT = 0 diff --git a/source/constructs/api/data_source/crud.py b/source/constructs/api/data_source/crud.py index a79dba42..3a35859c 100644 --- a/source/constructs/api/data_source/crud.py +++ b/source/constructs/api/data_source/crud.py @@ -103,8 +103,7 @@ def list_glue_database(condition: QueryCondition): account_ids.append(account.account_id) instances = get_session().query(SourceGlueDatabase).filter( SourceGlueDatabase.account_id.in_(account_ids)) - instances = query_with_condition(instances, condition) - return instances + return query_with_condition(instances, condition) def list_rds_instance_source(condition: QueryCondition): @@ -150,6 +149,20 @@ def set_jdbc_connection_glue_state(provider_id: int, account_id: str, region: st else: return None +def set_glue_database_glue_state(provider_id: int, account_id: str, region: str, instance_id: str, state: str): + session = get_session() + glue_database_source = session.query(SourceGlueDatabase).filter(SourceGlueDatabase.account_provider_id == provider_id, + SourceGlueDatabase.instance_id == instance_id, + SourceGlueDatabase.region == region, + SourceGlueDatabase.account_id == account_id).order_by( + desc(SourceGlueDatabase.detection_history_id)).first() + if glue_database_source is not None: + glue_database_source.glue_state = state + session.merge(glue_database_source) + session.commit() + else: + return None + def set_rds_instance_source_glue_state(account: str, region: str, instance_id: str, state: str): session = get_session() rds_instance_source = session.query(RdsInstanceSource).filter(RdsInstanceSource.instance_id == instance_id, @@ -638,17 +651,19 @@ def get_source_rds_account_region(): .all() ) -def add_glue_database(glueDatabase: schemas.SourceGlueDatabase): +def add_glue_database(glue_database_param: schemas.SourceGlueDatabase): session = get_session() glue_database = SourceGlueDatabase() - glue_database.glue_database_name = glueDatabase.glue_database_name - glue_database.glue_database_location_uri = glueDatabase.glue_database_location_uri - glue_database.glue_database_description = glueDatabase.glue_database_description - glue_database.glue_database_create_time = glueDatabase.glue_database_create_time - glue_database.glue_database_catalog_id = glueDatabase.glue_database_catalog_id - glue_database.region = glueDatabase.region - glue_database.account_id = glueDatabase.account_id + glue_database.glue_database_name = glue_database_param.glue_database_name + glue_database.glue_database_location_uri = glue_database_param.glue_database_location_uri + glue_database.glue_database_description = glue_database_param.glue_database_description + glue_database.glue_database_create_time = glue_database_param.glue_database_create_time + glue_database.glue_database_catalog_id = glue_database_param.glue_database_catalog_id + glue_database.data_lake_principal_identifier = glue_database_param.data_lake_principal_identifier + glue_database.permissions = glue_database_param.permissions + glue_database.region = glue_database_param.region + glue_database.account_id = glue_database_param.account_id session.add(glue_database) session.commit() @@ -674,18 +689,18 @@ def add_jdbc_conn(jdbcConn: schemas.JDBCInstanceSource): jdbc_instance_source.network_sg_id = jdbcConn.network_sg_id jdbc_instance_source.jdbc_driver_class_name = jdbcConn.jdbc_driver_class_name jdbc_instance_source.jdbc_driver_jar_uri = jdbcConn.jdbc_driver_jar_uri - jdbc_instance_source.instance_class = jdbcConn.instance_class - jdbc_instance_source.instance_status = jdbcConn.instance_status - jdbc_instance_source.account_provider_id = jdbcConn.account_provider + # jdbc_instance_source.instance_class = jdbcConn.instance_class + # jdbc_instance_source.instance_status = jdbcConn.instance_status + jdbc_instance_source.account_provider_id = jdbcConn.account_provider_id jdbc_instance_source.account_id = jdbcConn.account_id jdbc_instance_source.region = jdbcConn.region - jdbc_instance_source.data_source_id = jdbcConn.data_source_id - jdbc_instance_source.detection_history_id = jdbcConn.detection_history_id - jdbc_instance_source.glue_database = jdbcConn.glue_database - jdbc_instance_source.glue_crawler = jdbcConn.glue_crawler - jdbc_instance_source.glue_connection = jdbcConn.glue_connection - jdbc_instance_source.glue_vpc_endpoint = jdbcConn.glue_vpc_endpoint - jdbc_instance_source.glue_state = jdbcConn.glue_state + # jdbc_instance_source.data_source_id = jdbcConn.data_source_id + # jdbc_instance_source.detection_history_id = jdbcConn.detection_history_id + # jdbc_instance_source.glue_database = jdbcConn.glue_database + # jdbc_instance_source.glue_crawler = jdbcConn.glue_crawler + # jdbc_instance_source.glue_connection = jdbcConn.glue_connection + # jdbc_instance_source.glue_vpc_endpoint = jdbcConn.glue_vpc_endpoint + # jdbc_instance_source.glue_state = jdbcConn.glue_state jdbc_instance_source.create_type = jdbcConn.create_type # jdbc_instance_source. diff --git a/source/constructs/api/data_source/glue_database_detector.py b/source/constructs/api/data_source/glue_database_detector.py index 54c6e2ab..b3270f04 100644 --- a/source/constructs/api/data_source/glue_database_detector.py +++ b/source/constructs/api/data_source/glue_database_detector.py @@ -31,6 +31,8 @@ async def detect_glue_database_connection(session: Session, aws_account_id: str) regions = crud.get_account_agent_regions(aws_account_id) glue_database_list = [] refresh_list = [] + logger.info("=================") + logger.info(regions) for region in regions: client = boto3.client( 'glue', @@ -42,18 +44,22 @@ async def detect_glue_database_connection(session: Session, aws_account_id: str) glue_database_list.append(client.get_databases()['DatabaseList']) logger.info("detect_glue_database") # list exist rds not exist insert - for glue_database in glue_database_list: + for glue_database_item in glue_database_list: + glue_database = glue_database_item[0] refresh_list.append(glue_database["Name"]) if not crud.list_glue_database_by_name(glue_database["Name"]): - glue_database = schemas.SourceGlueDatabase - glue_database.glue_database_name = glue_database["Name"] - glue_database.glue_database_location_uri = glue_database["LocationUri"] - glue_database.glue_database_description = glue_database["Description"] - glue_database.glue_database_create_time = glue_database["CreateTime"] - glue_database.glue_database_catalog_id = glue_database["CatalogId"] - glue_database.region = '' - glue_database.account_id = aws_account_id - crud.add_glue_database(glue_database) + source_glue_database = schemas.SourceGlueDatabase + source_glue_database.glue_database_name = glue_database["Name"] + source_glue_database.glue_database_location_uri = glue_database["LocationUri"] + source_glue_database.glue_database_description = glue_database["Description"] + source_glue_database.glue_database_create_time = glue_database["CreateTime"] + source_glue_database.glue_database_catalog_id = glue_database["CatalogId"] + sub_info = glue_database["CreateTableDefaultPermissions"][0] + source_glue_database.data_lake_principal_identifier = sub_info["Principal"]["DataLakePrincipalIdentifier"] + source_glue_database.permissions = "|".join(sub_info["Permissions"]) + source_glue_database.region = '' + source_glue_database.account_id = aws_account_id + crud.add_glue_database(source_glue_database) # list not exist rds exist delete crud.delete_not_exist_glue_database(refresh_list) crud.update_glue_database_count(account=aws_account_id, region=admin_account_region) diff --git a/source/constructs/api/data_source/main.py b/source/constructs/api/data_source/main.py index 0d439ca9..df1b8c21 100644 --- a/source/constructs/api/data_source/main.py +++ b/source/constructs/api/data_source/main.py @@ -33,7 +33,7 @@ def list_rds_instances(condition: QueryCondition): page=condition.page, )) -@router.post("/list-glue-database", response_model=BaseResponse[Page[schemas.JDBCInstanceSource]]) +@router.post("/list-glue-database", response_model=BaseResponse[Page[schemas.SourceGlueDatabaseFullInfo]]) @inject_session def list_glue_databases(condition: QueryCondition): instances = crud.list_glue_database(condition) @@ -44,10 +44,10 @@ def list_glue_databases(condition: QueryCondition): page=condition.page, )) -@router.post("/list-jdbc", response_model=BaseResponse[Page[schemas.JDBCInstanceSource]]) +@router.post("/list-jdbc", response_model=BaseResponse[Page[schemas.JDBCInstanceSourceFullInfo]]) @inject_session -def list_jdbc_instances(condition: QueryCondition): - instances = crud.list_jdbc_instance_source(condition) +def list_jdbc_instances(provider_id: int, condition: QueryCondition): + instances = crud.list_jdbc_instance_source(provider_id) if instances is None: return None return paginate(instances, Params( @@ -142,7 +142,7 @@ def sync_jdbc_connection(jdbc: schemas.SourceJDBCConnection): @inject_session def refresh_data_source(type: schemas.NewDataSource): service.refresh_data_source( - type.provider, + type.provider_id, type.accounts, type.type ) @@ -241,8 +241,3 @@ def query_full_provider_infos(): @inject_session def list_providers(): return service.list_providers() - -@router.post("/list-jdbc-schemas", response_model=BaseResponse) -@inject_session -def list_jdbc_schema(account_id: str): - return service.list_jdbc_schema(account_id) diff --git a/source/constructs/api/data_source/schemas.py b/source/constructs/api/data_source/schemas.py index 42364bf1..95fd17a8 100644 --- a/source/constructs/api/data_source/schemas.py +++ b/source/constructs/api/data_source/schemas.py @@ -92,22 +92,23 @@ class Config: orm_mode = True class SourceGlueDatabase(BaseModel): - - id: int glue_database_name: Optional[str] glue_database_description: Optional[str] glue_database_location_uri: Optional[str] glue_database_create_time: Optional[str] glue_database_catalog_id: Optional[str] + data_lake_principal_identifier: Optional[str] + permissions: Optional[str] + account_id: Optional[str] + region: Optional[str] + +class SourceGlueDatabaseFullInfo(SourceGlueDatabase): glue_state: Optional[str] account_id: Optional[str] region: Optional[str] - detection_history_id: Optional[int] - version: Optional[int] - create_by: Optional[str] - create_time: Optional[datetime.datetime] - modify_by: Optional[str] - modify_time: Optional[datetime.datetime] + + class Config: + orm_mode = True class RdsInstanceSource(BaseModel): id: int @@ -138,19 +139,30 @@ class Config: class JDBCInstanceSource(BaseModel): instance_id: Optional[str] description: Optional[str] - jdbc_connection_url: Optional[str] # "jdbc:mysql://81.70.179.114:9000/" - jdbc_enforce_ssl: Optional[str] # "false" Require SSL connection 如果连不上connection会报错 - kafka_ssl_enabled: Optional[str] # "false" + jdbc_connection_url: Optional[str] + jdbc_enforce_ssl: Optional[str] + kafka_ssl_enabled: Optional[str] master_username: Optional[str] password: Optional[str] - skip_custom_jdbc_cert_validation: Optional[str] # "false" - custom_jdbc_cert: Optional[str] # SSL证书地址 - custom_jdbc_cert_string: Optional[str] # For Oracle Database this maps to SSL_SERVER_CERT_DN, and for SQL Server it maps to hostNameInCertificate. + skip_custom_jdbc_cert_validation: Optional[str] + custom_jdbc_cert: Optional[str] + custom_jdbc_cert_string: Optional[str] network_availability_zone: Optional[str] network_subnet_id: Optional[str] network_sg_id: Optional[str] + creation_time: Optional[str] + last_updated_time: Optional[str] jdbc_driver_class_name: Optional[str] - jdbc_driver_jar_uri: Optional[str] # s3://mysql-connector-2023/mysql-connector-j-8.1.0.jar 必须校验为jar文件 + jdbc_driver_jar_uri: Optional[str] + create_type: Optional[int] + account_provider_id: Optional[int] + account_id: Optional[str] + region: Optional[str] + + class Config: + orm_mode = True + +class JDBCInstanceSourceFullInfo(JDBCInstanceSource): data_source_id: Optional[int] detection_history_id: Optional[int] glue_database: Optional[str] @@ -158,17 +170,12 @@ class JDBCInstanceSource(BaseModel): glue_vpc_endpoint: Optional[str] glue_crawler: Optional[str] glue_state: Optional[str] - create_type: int instance_class: Optional[str] instance_status: Optional[str] - account_provider: Optional[str] - account_id: Optional[str] - region: Optional[str] class Config: orm_mode = True - class Region(BaseModel): id: Optional[int] region: Optional[str] @@ -206,28 +213,23 @@ class SourceRdsConnection(BaseModel): rds_secret: Optional[str] class SourceJDBCConnection(BaseModel): - account_provider: int - account_id: str - region: str - instance: str - engine: Optional[str] - address: Optional[str] - port: Optional[int] - username: Optional[str] - password: Optional[str] + account_provider: Optional[int] + account_id: Optional[str] + region: Optional[str] + instance: Optional[str] secret: Optional[str] network_availability_zone: Optional[str] network_subnet_id: Optional[str] network_sg_id: Optional[str] - jdbc_connection_url: Optional[str] # "jdbc:mysql://81.70.179.114:9000/" - jdbc_enforce_ssl: Optional[str] # "false" Require SSL connection 如果连不上connection会报错 - kafka_ssl_enabled: Optional[str] # "false" + jdbc_connection_url: Optional[str] + jdbc_enforce_ssl: Optional[str] + kafka_ssl_enabled: Optional[str] master_username: Optional[str] password: Optional[str] - skip_custom_jdbc_cert_validation: Optional[str] # "false" - custom_jdbc_cert: Optional[str] # SSL证书地址 - custom_jdbc_cert_string: Optional[str] # For Oracle Database this maps to SSL_SERVER_CERT_DN, and for SQL Server it maps to hostNameInCertificate. - + skip_custom_jdbc_cert_validation: Optional[str] + custom_jdbc_cert: Optional[str] + custom_jdbc_cert_string: Optional[str] + class SourceDeteteGlueDatabase(BaseModel): account_provider: int @@ -260,7 +262,7 @@ class SourceOrgAccount(BaseModel): organization_management_account_id: str class NewDataSource(BaseModel): - provider: Optional[str] + provider_id: Optional[int] accounts: List[str] type: DataSourceType = DataSourceType.all diff --git a/source/constructs/api/data_source/service.py b/source/constructs/api/data_source/service.py index 4303605a..1b192ca1 100644 --- a/source/constructs/api/data_source/service.py +++ b/source/constructs/api/data_source/service.py @@ -3,11 +3,8 @@ import os import traceback from time import sleep -import psycopg2 -import cx_Oracle import boto3 -import pymysql from catalog.service import delete_catalog_by_account_region as delete_catalog_by_account from catalog.service import delete_catalog_by_database_region as delete_catalog_by_database_region @@ -16,7 +13,8 @@ ConnectionState, Provider, DataSourceType, - DatabaseType) + DatabaseType, + JDBCCreateType) from common.exception_handler import BizException from common.query_condition import QueryCondition from discovery_job.service import delete_account as delete_job_by_account @@ -346,11 +344,11 @@ def sync_glue_database(account_id, region, glue_database_name): def sync_jdbc_connection(jdbc: SourceJDBCConnection): pre_sync(jdbc) sync(jdbc) - # post_sync() + def pre_sync(jdbc: SourceJDBCConnection): - ec2_client = boto3.client('ec2', region_name=jdbc.region) - if not jdbc.username: + ec2_client = boto3.client('ec2', region_name=_admin_account_region) + if not jdbc.master_username: raise BizException(MessageEnum.SOURCE_JDBC_NO_CREDENTIAL.get_code(), MessageEnum.SOURCE_JDBC_NO_CREDENTIAL.get_msg()) @@ -371,7 +369,7 @@ def pre_sync(jdbc: SourceJDBCConnection): MessageEnum.SOURCE_CONNECTION_CRAWLING.get_msg()) credentials = None try: - iam_role_name = crud.get_iam_role(jdbc.account_id) + iam_role_name = crud.get_iam_role(_admin_account_id) assumed_role = sts.assume_role( RoleArn=f"{iam_role_name}", @@ -388,9 +386,9 @@ def pre_sync(jdbc: SourceJDBCConnection): if not security_groups: raise BizException(MessageEnum.SOURCE_SECURITY_GROUP_NOT_EXISTS.get_code(), MessageEnum.SOURCE_SECURITY_GROUP_NOT_EXISTS.get_msg()) - security_group = filter(lambda sg: sg["GroupName"] == const.SECURITY_GROUP_JDBC, security_groups)[0] + security_group = list(filter(lambda sg: sg["GroupName"] == const.SECURITY_GROUP_JDBC, security_groups))[0] inbound_route = security_group["IpPermissions"][0] - if inbound_route["IpProtocol"] != "tcp" or not inbound_route["FromPort"] != 0 or not inbound_route["ToPort"] != 65535 \ + if inbound_route["IpProtocol"] != "tcp" or inbound_route["FromPort"] != 0 or inbound_route["ToPort"] != 65535 \ or not inbound_route["IpRanges"] or inbound_route["IpRanges"][0]["CidrIp"] != "0.0.0.0/0": raise BizException(MessageEnum.SOURCE_SG_INBOUND_ROUTE_NOT_VALID.get_code(), MessageEnum.SOURCE_SG_INBOUND_ROUTE_NOT_VALID.get_msg()) @@ -405,216 +403,219 @@ def pre_sync(jdbc: SourceJDBCConnection): if subnet[0]["MapPublicIpOnLaunch"]: raise BizException(MessageEnum.SOURCE_SUBNET_NOT_PRIVATE.get_code(), MessageEnum.SOURCE_SUBNET_NOT_PRIVATE.get_msg()) - if not ec2_client.describe_nat_gateways(Filters=[{'Name': 'subnet-id', 'Values': [jdbc.network_subnet_id]}])['NatGateways']: - raise BizException(MessageEnum.SOURCE_SUBNET_NOT_CONTAIN_NAT.get_code(), - MessageEnum.SOURCE_SUBNET_NOT_CONTAIN_NAT.get_msg()) + # if not ec2_client.describe_nat_gateways(Filters=[{'Name': 'subnet-id', 'Values': [jdbc.network_subnet_id]}])['NatGateways']: + # raise BizException(MessageEnum.SOURCE_SUBNET_NOT_CONTAIN_NAT.get_code(), + # MessageEnum.SOURCE_SUBNET_NOT_CONTAIN_NAT.get_msg()) if not ec2_client.describe_availability_zones(Filters=[{'Name': 'zone-name', 'Values': [jdbc.network_availability_zone]}])["AvailabilityZones"]: raise BizException(MessageEnum.SOURCE_AVAILABILITY_ZONE_NOT_EXISTS.get_code(), MessageEnum.SOURCE_AVAILABILITY_ZONE_NOT_EXISTS.get_msg()) def sync(jdbc: SourceJDBCConnection): - glue_connection_name = f"jdbc-{jdbc.account_provider}-{jdbc.instance}-connection" - glue_database_name = f"jdbc-{jdbc.account_provider}-{jdbc.instance}-database" - crawler_name = f"jdbc-{jdbc.account_provider}-{jdbc.instance}-crawler" + logger.info("START SYNC ...") + provider_str = gen_jdbc_provider_str(jdbc.account_provider) + glue_connection_name = jdbc.instance + glue_database_name = f"{provider_str}-{jdbc.instance}-database" + crawler_name = f"{provider_str}-{jdbc.instance}-crawler" state = crud.get_jdbc_connection_glue_state(jdbc.account_provider, jdbc.account_id, jdbc.region, jdbc.instance) - crawler_role_arn = __gen_role_arn(account_id=jdbc.account_id, - region=jdbc.region, + crawler_role_arn = __gen_role_arn(account_id=_admin_account_id, + region=_admin_account_region, role_name='GlueDetectionJobRole') crud.set_jdbc_connection_glue_state(jdbc.account_provider, jdbc.account_id, jdbc.region, jdbc.instance, ConnectionState.PENDING.value) + credentials = gen_credentials(_admin_account_id) + # grant_lake_formation_permission(credentials, crawler_name, glue_database_name) + try: - if jdbc.secret is not None: - credentials = gen_credentials(jdbc.account_id) - secretsmanager = boto3.client('secretsmanager', - aws_access_key_id=credentials['AccessKeyId'], - aws_secret_access_key=credentials['SecretAccessKey'], - aws_session_token=credentials['SessionToken'], - region_name=jdbc.region - ) - """ :type : pyboto3.secretsmanager """ - secret_value = secretsmanager.get_secret_value( - SecretId=jdbc.secret - ) - secret_values = json.loads(secret_value['SecretString']) + schema = get_schema_from_url(jdbc.jdbc_connection_url) + if schema: + glue = boto3.client('glue', + aws_access_key_id=credentials['AccessKeyId'], + aws_secret_access_key=credentials['SecretAccessKey'], + aws_session_token=credentials['SessionToken'], + region_name=_admin_account_region + ) + """ :type : pyboto3.glue """ + try: + conn = glue.get_connection(Name=glue_connection_name) + logger.info(f"connection exists! is{conn}") + except Exception as e: + logger.info("sync_jdbc_connection get_connection error and create:") + logger.info(str(e)) + if jdbc.secret is None: + response = glue.create_connection( + ConnectionInput={ + 'Name': glue_connection_name, + 'Description': glue_connection_name, + 'ConnectionType': 'JDBC', + 'ConnectionProperties': { + 'USERNAME': jdbc.master_username, + 'PASSWORD': jdbc.password, + 'JDBC_CONNECTION_URL': jdbc.jdbc_connection_url, + 'JDBC_ENFORCE_SSL': 'false', + }, - payload = { - "engine": jdbc.engine, - "host": jdbc.address, - "port": jdbc.port, - "username": secret_values['username'], - "password": secret_values['password'], - } - schema_list = __list_rds_schema(jdbc.account_id, - jdbc.region, - credentials, - jdbc.instance, - payload, - [jdbc.network_sg_id], - jdbc.network_subnet_id) - # schema_list = list_jdbc_schema(jdbc) - - logger.info("sync_jdbc_connection schema_list :") - logger.info(schema_list) - if len(schema_list) > 0: - glue = boto3.client('glue', - aws_access_key_id=credentials['AccessKeyId'], - aws_secret_access_key=credentials['SecretAccessKey'], - aws_session_token=credentials['SessionToken'], - region_name=jdbc.region - ) - """ :type : pyboto3.glue """ - - # jdbc_url = __create_jdbc_url(engine=engine, host=host, port=port) - try: - glue.get_connection(Name=glue_connection_name) - except Exception as e: - logger.info("sync_rds_connection get_connection error and create:") - logger.info(str(e)) - if jdbc.secret is None: - response = glue.create_connection( - ConnectionInput={ - 'Name': glue_connection_name, - 'Description': glue_connection_name, - 'ConnectionType': 'JDBC', - 'ConnectionProperties': { - 'USERNAME': jdbc.username, - 'PASSWORD': jdbc.password, - 'JDBC_CONNECTION_URL': jdbc.jdbc_connection_url, - 'JDBC_ENFORCE_SSL': 'false', - }, - - 'PhysicalConnectionRequirements': { - 'SubnetId': jdbc.network_subnet_id, - 'AvailabilityZone': jdbc.network_availability_zone, - 'SecurityGroupIdList': [jdbc.network_sg_id] - } - } - ) - logger.info(response) - else: - response = glue.create_connection( - ConnectionInput={ - 'Name': glue_connection_name, - 'Description': glue_connection_name, - 'ConnectionType': 'JDBC', - 'ConnectionProperties': { - 'SECRET_ID': jdbc.secret, - 'JDBC_CONNECTION_URL': jdbc.jdbc_connection_url, - 'JDBC_ENFORCE_SSL': 'false', - }, - - 'PhysicalConnectionRequirements': { - 'SubnetId': jdbc.network_subnet_id, - 'AvailabilityZone': jdbc.network_availability_zone, - 'SecurityGroupIdList': [jdbc.network_sg_id] - } + 'PhysicalConnectionRequirements': { + 'SubnetId': jdbc.network_subnet_id, + 'AvailabilityZone': jdbc.network_availability_zone, + 'SecurityGroupIdList': [jdbc.network_sg_id] } - ) - logger.info(response) - try: - glue.get_database(Name=glue_database_name) - except Exception as e: - logger.info("sync_rds_connection get_database error and create:") - logger.info(str(e)) - response = glue.create_database(DatabaseInput={'Name': glue_database_name}) + } + ) logger.info(response) - lakeformation = boto3.client('lakeformation', - aws_access_key_id=credentials['AccessKeyId'], - aws_secret_access_key=credentials['SecretAccessKey'], - aws_session_token=credentials['SessionToken'], - region_name=jdbc.region) - """ :type : pyboto3.lakeformation """ - # retry for grant permissions - num_retries = GRANT_PERMISSIONS_RETRIES - while num_retries > 0: - try: - response = lakeformation.grant_permissions( - Principal={ - 'DataLakePrincipalIdentifier': f"{crawler_role_arn}" - }, - Resource={ - 'Database': { - 'Name': glue_database_name - } - }, - Permissions=['ALL'], - PermissionsWithGrantOption=['ALL'] - ) - except Exception as e: - sleep(SLEEP_MIN_TIME) - num_retries -= 1 - else: - break else: - raise Exception('UNCONNECTED') - jdbc_targets = [] - for schema in schema_list: - jdbc_targets.append( - { - 'ConnectionName': glue_connection_name, - 'Path': f"{schema}/%", + response = glue.create_connection( + ConnectionInput={ + 'Name': glue_connection_name, + 'Description': glue_connection_name, + 'ConnectionType': 'JDBC', + 'ConnectionProperties': { + 'SECRET_ID': jdbc.secret, + 'JDBC_CONNECTION_URL': jdbc.jdbc_connection_url, + 'JDBC_ENFORCE_SSL': 'false', + }, + + 'PhysicalConnectionRequirements': { + 'SubnetId': jdbc.network_subnet_id, + 'AvailabilityZone': jdbc.network_availability_zone, + 'SecurityGroupIdList': [jdbc.network_sg_id] + } } ) - logger.info("sync_rds_connection jdbc_targets:") - logger.info(jdbc_targets) + try: + glue.get_database(Name=glue_database_name) + except Exception as e: + logger.info("sync_jdbc_connection get_database error and create:") + logger.info(str(e)) + response = glue.create_database(DatabaseInput={'Name': glue_database_name}) + logger.info("creat response is:") + logger.info(response) + # grant_lake_formation_permission(credentials, crawler_name, glue_database_name) + lakeformation = boto3.client('lakeformation', + aws_access_key_id=credentials['AccessKeyId'], + aws_secret_access_key=credentials['SecretAccessKey'], + aws_session_token=credentials['SessionToken'], + region_name=_admin_account_region) + """ :type : pyboto3.lakeformation """ + # retry for grant permissions + num_retries = GRANT_PERMISSIONS_RETRIES + while num_retries > 0: try: - response = glue.get_crawler(Name=crawler_name) - logger.info("sync_rds_connection get_crawler:") - logger.info(response) - try: - if state == ConnectionState.ACTIVE.value or state == ConnectionState.UNSUPPORTED.value \ - or state == ConnectionState.ERROR.value or state == ConnectionState.STOPPING.value: - up_cr_response = glue.update_crawler( - Name=crawler_name, - Role=crawler_role_arn, - DatabaseName=glue_database_name, - Targets={ - 'JdbcTargets': jdbc_targets, - }, - SchemaChangePolicy={ - 'UpdateBehavior': 'UPDATE_IN_DATABASE', - 'DeleteBehavior': 'DELETE_FROM_DATABASE' - } - ) - logger.info("update rds crawler:") - logger.info(up_cr_response) - except Exception as e: - logger.info("update_crawler error") - logger.info(str(e)) - st_cr_response = glue.start_crawler( - Name=crawler_name + response = lakeformation.grant_permissions( + Principal={ + 'DataLakePrincipalIdentifier': f"{crawler_role_arn}" + }, + Resource={ + 'Database': { + 'Name': glue_database_name + } + }, + Permissions=['ALL'], + PermissionsWithGrantOption=['ALL'] ) - logger.info(st_cr_response) except Exception as e: - logger.info("sync_rds_connection get_crawler and create:") - logger.info(str(e)) - response = glue.create_crawler( + sleep(SLEEP_MIN_TIME) + num_retries -= 1 + else: + break + else: + raise Exception('UNCONNECTED') + # lakeformation = boto3.client('lakeformation', + # aws_access_key_id=credentials['AccessKeyId'], + # aws_secret_access_key=credentials['SecretAccessKey'], + # aws_session_token=credentials['SessionToken'], + # region_name=_admin_account_region) + # """ :type : pyboto3.lakeformation """ + # # retry for grant permissions + # num_retries = GRANT_PERMISSIONS_RETRIES + # while num_retries > 0: + # try: + # response = lakeformation.grant_permissions( + # Principal={ + # 'DataLakePrincipalIdentifier': f"{crawler_role_arn}" + # }, + # Resource={ + # 'Database': { + # 'Name': glue_database_name + # } + # }, + # Permissions=['ALL'], + # PermissionsWithGrantOption=['ALL'] + # ) + # except Exception as e: + # sleep(SLEEP_MIN_TIME) + # num_retries -= 1 + # else: + # break + # else: + # raise Exception('UNCONNECTED') + jdbc_targets = [] + # for schema in schema_list: + jdbc_targets.append( + { + 'ConnectionName': glue_connection_name, + 'Path': f"{schema}/%", + } + ) + logger.info("sync_jdbc_connection jdbc_targets:") + logger.info(jdbc_targets) + try: + response = glue.get_crawler(Name=crawler_name) + logger.info("sync_jdbc_connection get_crawler:") + logger.info(response) + try: + if state == ConnectionState.ACTIVE.value or state == ConnectionState.UNSUPPORTED.value \ + or state == ConnectionState.ERROR.value or state == ConnectionState.STOPPING.value: + up_cr_response = glue.update_crawler( Name=crawler_name, Role=crawler_role_arn, DatabaseName=glue_database_name, Targets={ 'JdbcTargets': jdbc_targets, }, - Tags={ - 'AdminAccountId': _admin_account_id + SchemaChangePolicy={ + 'UpdateBehavior': 'UPDATE_IN_DATABASE', + 'DeleteBehavior': 'DELETE_FROM_DATABASE' } ) - logger.info(response) - start_response = glue.start_crawler( - Name=crawler_name - ) - logger.info(start_response) - crud.create_jdbc_connection(jdbc.account_provider, - jdbc.account_id, - jdbc.region, - jdbc.instance, - glue_connection_name, - glue_database_name, - None, - crawler_name) - else: - crud.set_jdbc_connection_glue_state(jdbc.account_provider, jdbc.account_id, jdbc.region, jdbc.instance, - MessageEnum.SOURCE_JDBC_NO_SCHEMA.get_msg()) - raise BizException(MessageEnum.SOURCE_JDBC_NO_SCHEMA.get_code(), MessageEnum.SOURCE_JDBC_NO_SCHEMA.get_msg()) + logger.info("update jdbc crawler:") + logger.info(up_cr_response) + except Exception as e: + logger.info("update_crawler error") + logger.info(str(e)) + st_cr_response = glue.start_crawler( + Name=crawler_name + ) + logger.info(st_cr_response) + except Exception as e: + logger.info("sync_jdbc_connection get_crawler and create:") + logger.info(str(e)) + response = glue.create_crawler( + Name=crawler_name, + Role=crawler_role_arn, + DatabaseName=glue_database_name, + Targets={ + 'JdbcTargets': jdbc_targets, + }, + Tags={ + 'AdminAccountId': _admin_account_id + } + ) + logger.info(response) + start_response = glue.start_crawler( + Name=crawler_name + ) + logger.info(start_response) + crud.create_jdbc_connection(jdbc.account_provider, + jdbc.account_id, + jdbc.region, + jdbc.instance, + glue_connection_name, + glue_database_name, + None, + crawler_name) + else: + crud.set_jdbc_connection_glue_state(jdbc.account_provider, jdbc.account_id, jdbc.region, jdbc.instance, + MessageEnum.SOURCE_JDBC_NO_SCHEMA.get_msg()) + raise BizException(MessageEnum.SOURCE_JDBC_NO_SCHEMA.get_code(), MessageEnum.SOURCE_JDBC_NO_SCHEMA.get_msg()) except Exception as err: crud.set_jdbc_connection_glue_state(jdbc.account_provider, jdbc.account_id, jdbc.region, jdbc.instance, str(err)) glue = boto3.client('glue', @@ -641,59 +642,59 @@ def sync(jdbc: SourceJDBCConnection): raise BizException(MessageEnum.SOURCE_CONNECTION_FAILED.get_code(), str(err)) -def list_jdbc_schema(account_id: str): - schemas = [] - # postgreSQL - # conn = psycopg2.connect(database="数据库名", - # user="数据库账号", - # password="数据库密码", - # host="xx.xx.xx.xx", - # port="端口号") - # mysql - conn = psycopg2.connect(host='81.70.179.114', - port=9000, - user='root', - password='Temp123456!') - # oracle - # username="用户名" - # userpwd="用户名密码" - # host="主机IP" - # port=1521 - # dbname="数据库名称" - # dsn=cx_Oracle.makedsn(host, port) - # connection=cx_Oracle.connect(username, userpwd, dsn) - # cursor = connection.cursor() - - try: - # 创建一个新的游标 - cursor = conn.cursor() - # 执行SQL查询 - cursor.execute("SHOW DATABASES") - # 获取所有的行 - rows = cursor.fetchall() - for row in rows: - print(row[0]) - schemas.append(row[0]) - finally: - # 关闭连接 - conn.close() - # credentials = gen_credentials(account_id) - # logger.info(credentials) - # glue_client = boto3.client('glue', - # aws_access_key_id=credentials['AccessKeyId'], - # aws_secret_access_key=credentials['SecretAccessKey'], - # aws_session_token=credentials['SessionToken'], - # region_name="cn-northwest-1") - # connection_name = 'glue-tencent-host-mysql-5.7' - # # connection_name = jdbc.instance - # schemas = None - # response = glue_client.get_connection( - # Name=connection_name - # ) - # connection_properties = response['Connection']['ConnectionProperties'] - # if 'JDBC_SCHEMAS' in connection_properties: - # schemas = connection_properties['JDBC_SCHEMAS'].split(',') - return schemas +# def list_jdbc_schema(account_id: str): +# schemas = [] +# # postgreSQL +# # conn = psycopg2.connect(database="数据库名", +# # user="数据库账号", +# # password="数据库密码", +# # host="xx.xx.xx.xx", +# # port="端口号") +# # mysql +# # conn = pymysql.connect(host='81.70.179.114', +# # port=9000, +# # user='root', +# # password='Temp123456!') +# # oracle +# # username="用户名" +# # userpwd="用户名密码" +# # host="主机IP" +# # port=1521 +# # dbname="数据库名称" +# # dsn=cx_Oracle.makedsn(host, port) +# # connection=cx_Oracle.connect(username, userpwd, dsn) +# # cursor = connection.cursor() + +# try: +# # 创建一个新的游标 +# cursor = conn.cursor() +# # 执行SQL查询 +# cursor.execute("SHOW DATABASES") +# # 获取所有的行 +# rows = cursor.fetchall() +# for row in rows: +# print(row[0]) +# schemas.append(row[0]) +# finally: +# # 关闭连接 +# conn.close() +# # credentials = gen_credentials(account_id) +# # logger.info(credentials) +# # glue_client = boto3.client('glue', +# # aws_access_key_id=credentials['AccessKeyId'], +# # aws_secret_access_key=credentials['SecretAccessKey'], +# # aws_session_token=credentials['SessionToken'], +# # region_name="cn-northwest-1") +# # connection_name = 'glue-tencent-host-mysql-5.7' +# # # connection_name = jdbc.instance +# # schemas = None +# # response = glue_client.get_connection( +# # Name=connection_name +# # ) +# # connection_properties = response['Connection']['ConnectionProperties'] +# # if 'JDBC_SCHEMAS' in connection_properties: +# # schemas = connection_properties['JDBC_SCHEMAS'].split(',') +# return schemas def before_delete_glue_database(provider, account, region, name): @@ -830,7 +831,7 @@ def delete_jdbc_connection(provider_id: int, account: str, region: str, instance def gen_credentials(account: str): try: iam_role_name = crud.get_iam_role(account) - + logger.info(f"+++++++++++++++++:{iam_role_name}") assumed_role = sts.assume_role( RoleArn=f"{iam_role_name}", RoleSessionName="glue-rds-connection" @@ -1317,9 +1318,9 @@ def delete_glue_connection(account: str, region: str, glue_crawler: str, return True -def refresh_data_source(provider: str, accounts: list[str], type: str): - tmp_provider = int(provider) - if tmp_provider == Provider.AWS_CLOUD.value: +def refresh_data_source(provider_id: int, accounts: list[str], type: str): + # tmp_provider = int(provider) + if provider_id == Provider.AWS_CLOUD.value: refresh_aws_data_source(accounts, type) else: pass @@ -1594,35 +1595,39 @@ def add_jdbc_conn(jdbcConn: JDBCInstanceSource): if list: raise BizException(MessageEnum.SOURCE_JDBC_ALREADY_EXISTS.get_code(), MessageEnum.SOURCE_JDBC_ALREADY_EXISTS.get_msg()) - response = boto3.client('glue', region_name=jdbcConn.region).create_connection( - CatalogId=jdbcConn.account_id, - ConnectionInput={ - 'Name': jdbcConn.instance_id, - 'Description': jdbcConn.description, - 'ConnectionType': 'JDBC', - 'ConnectionProperties': { - # 'CUSTOM_JDBC_CERT': jdbcConn.custom_jdbc_cert, - # 'CUSTOM_JDBC_CERT_STRING': jdbcConn.custom_jdbc_cert_string, - 'JDBC_CONNECTION_URL': jdbcConn.jdbc_connection_url, - 'JDBC_ENFORCE_SSL': jdbcConn.jdbc_enforce_ssl, - # 'KAFKA_SSL_ENABLED': jdbcConn.kafka_ssl_enabled, - # 'SKIP_CUSTOM_JDBC_CERT_VALIDATION': jdbcConn.skip_custom_jdbc_cert_validation, - 'USERNAME': jdbcConn.master_username, - 'PASSWORD': jdbcConn.password, - # 'JDBC_DRIVER_CLASS_NAME': jdbcConn.jdbc_driver_class_name, - # 'JDBC_DRIVER_JAR_URI': jdbcConn.jdbc_driver_jar_uri - }, - 'PhysicalConnectionRequirements': { - 'SubnetId': jdbcConn.network_subnet_id, - 'SecurityGroupIdList': [ - jdbcConn.network_sg_id - ], - 'AvailabilityZone': jdbcConn.network_availability_zone + if jdbcConn.create_type == JDBCCreateType.ADD.value: + admin_account = get_admin_account_info() + response = boto3.client('glue', region_name=admin_account.region).create_connection( + CatalogId=admin_account.account_id, + ConnectionInput={ + 'Name': jdbcConn.instance_id, + 'Description': jdbcConn.description, + 'ConnectionType': 'JDBC', + 'ConnectionProperties': { + # 'CUSTOM_JDBC_CERT': jdbcConn.custom_jdbc_cert, + # 'CUSTOM_JDBC_CERT_STRING': jdbcConn.custom_jdbc_cert_string, + 'JDBC_CONNECTION_URL': jdbcConn.jdbc_connection_url, + 'JDBC_ENFORCE_SSL': jdbcConn.jdbc_enforce_ssl, + # 'KAFKA_SSL_ENABLED': jdbcConn.kafka_ssl_enabled, + # 'SKIP_CUSTOM_JDBC_CERT_VALIDATION': jdbcConn.skip_custom_jdbc_cert_validation, + 'USERNAME': jdbcConn.master_username, + 'PASSWORD': jdbcConn.password, + # 'JDBC_DRIVER_CLASS_NAME': jdbcConn.jdbc_driver_class_name, + # 'JDBC_DRIVER_JAR_URI': jdbcConn.jdbc_driver_jar_uri + }, + 'PhysicalConnectionRequirements': { + 'SubnetId': jdbcConn.network_subnet_id, + 'SecurityGroupIdList': [ + jdbcConn.network_sg_id + ], + 'AvailabilityZone': jdbcConn.network_availability_zone + } } - } - ) - if response['ResponseMetadata']['HTTPStatusCode'] == 200: - crud.add_jdbc_conn(jdbcConn) + ) + if response['ResponseMetadata']['HTTPStatusCode'] != 200: + raise BizException(MessageEnum.SOURCE_JDBC_CREATE_FAIL.get_code(), + MessageEnum.SOURCE_JDBC_CREATE_FAIL.get_msg()) + crud.add_jdbc_conn(jdbcConn) def __glue(account: str, region: str): iam_role_name = crud.get_iam_role(account) @@ -2035,3 +2040,52 @@ def query_full_provider_resource_infos(): def list_providers(): return crud.query_provider_list() + + +def get_schema_from_url(url): + res = '' + # jdbc:mysql://81.70.179.114:9000/sdps-glue + if url.startswith("jdbc:mysql://"): + res = url[url.rindex("/") + 1:] + return res + + +def gen_jdbc_provider_str(provider: int): + if (provider == 2): + return DatabaseType.JDBC_TENCENT.value + elif (provider == 3): + return DatabaseType.JDBC_ALIYUN.value + else: + return DatabaseType.JDBC_AWS.value + + +def grant_lake_formation_permission(credentials, crawler_role_arn, glue_database_name): + lakeformation = boto3.client('lakeformation', + aws_access_key_id=credentials['AccessKeyId'], + aws_secret_access_key=credentials['SecretAccessKey'], + aws_session_token=credentials['SessionToken'], + region_name=_admin_account_region) + """ :type : pyboto3.lakeformation """ + # retry for grant permissions + num_retries = GRANT_PERMISSIONS_RETRIES + while num_retries > 0: + try: + response = lakeformation.grant_permissions( + Principal={ + 'DataLakePrincipalIdentifier': f"{crawler_role_arn}" + }, + Resource={ + 'Database': { + 'Name': glue_database_name + } + }, + Permissions=['ALL'], + PermissionsWithGrantOption=['ALL'] + ) + except Exception as e: + sleep(SLEEP_MIN_TIME) + num_retries -= 1 + else: + break + else: + raise Exception('UNCONNECTED') diff --git a/source/constructs/api/db/models_data_source.py b/source/constructs/api/db/models_data_source.py index 16f4b77e..48d676a0 100644 --- a/source/constructs/api/db/models_data_source.py +++ b/source/constructs/api/db/models_data_source.py @@ -270,6 +270,8 @@ class SourceGlueDatabase(Base): glue_database_location_uri = sa.Column(sa.String(255)) glue_database_create_time = sa.Column(sa.String(255)) glue_database_catalog_id = sa.Column(sa.String(255)) + data_lake_principal_identifier = sa.Column(sa.String(255)) + permissions = sa.Column(sa.String(255)) glue_state = sa.Column(sa.String(255), info={'searchable': True}) account_id = sa.Column(sa.String(255)) region = sa.Column(sa.String(255)) diff --git a/source/constructs/api/lambda/sync_crawler_results.py b/source/constructs/api/lambda/sync_crawler_results.py index 737f25dd..86f5e0dc 100644 --- a/source/constructs/api/lambda/sync_crawler_results.py +++ b/source/constructs/api/lambda/sync_crawler_results.py @@ -65,11 +65,11 @@ def sync_result(input_event): state=state ) elif database_type == DatabaseType.GLUE.value: - data_source_crud.update_custom_glue_database_count( + data_source_crud.update_glue_database_count( account=input_event['detail']['accountId'], region=input_event['region'], ) - data_source_crud.set_custom_glue_database_glue_state( + data_source_crud.set_glue_database_glue_state( account=input_event['detail']['accountId'], region=input_event['region'], database=database_name, diff --git a/source/constructs/api/main.py b/source/constructs/api/main.py index c5ba47b1..0230a440 100644 --- a/source/constructs/api/main.py +++ b/source/constructs/api/main.py @@ -95,6 +95,8 @@ async def validate(request: Request, call_next): pass else: return resp_err(MessageEnum.BIZ_INVALID_TOKEN.get_code(), MessageEnum.BIZ_INVALID_TOKEN.get_msg()) + logger.info("%%%%%%%%%%%%%%%%%%%%%%") + logger.info(request) response = await call_next(request) if os.getenv(const.MODE) == const.MODE_DEV: process_time = time.time() - start_time diff --git a/source/constructs/lib/admin/database/1.0.x-1.1.0/20_update.sql b/source/constructs/lib/admin/database/1.0.x-1.1.0/20_update.sql index 565ddb6e..689ac051 100644 --- a/source/constructs/lib/admin/database/1.0.x-1.1.0/20_update.sql +++ b/source/constructs/lib/admin/database/1.0.x-1.1.0/20_update.sql @@ -60,6 +60,8 @@ create table source_glue_database glue_database_location_uri varchar(255) null, glue_database_create_time varchar(255) null, glue_database_catalog_id varchar(255) null, + data_lake_principal_identifier varchar(255) null, + permissions varchar(255) null, glue_state varchar(255) null, account_id varchar(255) null, region varchar(255) null, diff --git a/source/constructs/lib/admin/database/whole/10_data_source.sql b/source/constructs/lib/admin/database/whole/10_data_source.sql index fadf7b75..d6184f96 100644 --- a/source/constructs/lib/admin/database/whole/10_data_source.sql +++ b/source/constructs/lib/admin/database/whole/10_data_source.sql @@ -208,10 +208,12 @@ create table source_glue_database glue_database_location_uri varchar(255) null, glue_database_create_time varchar(255) null, glue_database_catalog_id varchar(255) null, - -- glue_state varchar(255) null, + data_lake_principal_identifier varchar(255) null, + permissions varchar(255) null, + glue_state varchar(255) null, account_id varchar(255) null, region varchar(255) null, - -- detection_history_id int null, + detection_history_id int null, version int null, create_by varchar(255) null, create_time timestamp null, From 90d375b4e8dc69d11ffc1024558a37055a5282ec Mon Sep 17 00:00:00 2001 From: cuihubin <530051970@qq.com> Date: Wed, 27 Sep 2023 14:52:51 +0800 Subject: [PATCH 2/2] reslove conflict --- source/constructs/api/data_source/service.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/source/constructs/api/data_source/service.py b/source/constructs/api/data_source/service.py index 2d380716..98ddef38 100644 --- a/source/constructs/api/data_source/service.py +++ b/source/constructs/api/data_source/service.py @@ -3,13 +3,7 @@ import os import traceback from time import sleep -<<<<<<< HEAD - -======= -# import psycopg2 -# import cx_Oracle ->>>>>>> 084a5db0172f30110de58ba4b959f021566329a1 import boto3 import common.enum