Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(be): API Test for Datasource #285

Merged
merged 3 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions source/constructs/api/common/enum.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class MessageEnum(Enum):
SOURCE_SUBNET_NOT_PRIVATE = {1242: "Subnet for jdbc connection is not private subnet"}
SOURCE_SUBNET_NOT_CONTAIN_NAT = {1243: "Subnet for jdbc connection not contain NAT Gateway"}
SOURCE_JDBC_NO_SCHEMA = {1244: "There is no user created schema found in the jdbc connection"}
SOURCE_JDBC_CREATE_FAIL = {1245: "JDBC connection create failed"}
# label
LABEL_EXIST_FAILED = {1611: "Cannot create duplicated label"}

Expand Down Expand Up @@ -327,3 +328,8 @@ class DataSourceType(str, Enum):
glue_database = "glue_database"
jdbc = "jdbc"
all = "all"

@unique
class JDBCCreateType(Enum):
ADD = 1
IMPORT = 0
57 changes: 34 additions & 23 deletions source/constructs/api/data_source/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ def list_glue_database(condition: QueryCondition):
account_ids.append(account.account_id)
instances = get_session().query(SourceGlueDatabase).filter(
SourceGlueDatabase.account_id.in_(account_ids))
instances = query_with_condition(instances, condition)
return instances
return query_with_condition(instances, condition)


def list_rds_instance_source(condition: QueryCondition):
Expand Down Expand Up @@ -160,6 +159,19 @@ def set_jdbc_connection_glue_state(provider_id: int, account_id: str, region: st
else:
return None

def set_glue_database_glue_state(provider_id: int, account_id: str, region: str, instance_id: str, state: str):
session = get_session()
glue_database_source = session.query(SourceGlueDatabase).filter(SourceGlueDatabase.account_provider_id == provider_id,
SourceGlueDatabase.instance_id == instance_id,
SourceGlueDatabase.region == region,
SourceGlueDatabase.account_id == account_id).order_by(
desc(SourceGlueDatabase.detection_history_id)).first()
if glue_database_source is not None:
glue_database_source.glue_state = state
session.merge(glue_database_source)
session.commit()
else:
return None

def set_rds_instance_source_glue_state(account: str, region: str, instance_id: str, state: str):
session = get_session()
Expand Down Expand Up @@ -338,7 +350,6 @@ def update_s3_bucket_count(account: str, region: str):
session.merge(account)
session.commit()


def update_glue_database_count(account: str, region: str):
session = get_session()
total = session.query(SourceGlueDatabase).filter(SourceGlueDatabase.region == region,
Expand Down Expand Up @@ -666,18 +677,19 @@ def get_source_rds_account_region():
.all()
)

def add_glue_database(glue_database_param: schemas.SourceGlueDatabase):

def add_glue_database(glueDatabase: schemas.SourceGlueDatabase):
session = get_session()

glue_database = SourceGlueDatabase()
glue_database.glue_database_name = glueDatabase.glue_database_name
glue_database.glue_database_location_uri = glueDatabase.glue_database_location_uri
glue_database.glue_database_description = glueDatabase.glue_database_description
glue_database.glue_database_create_time = glueDatabase.glue_database_create_time
glue_database.glue_database_catalog_id = glueDatabase.glue_database_catalog_id
glue_database.region = glueDatabase.region
glue_database.account_id = glueDatabase.account_id
glue_database.glue_database_name = glue_database_param.glue_database_name
glue_database.glue_database_location_uri = glue_database_param.glue_database_location_uri
glue_database.glue_database_description = glue_database_param.glue_database_description
glue_database.glue_database_create_time = glue_database_param.glue_database_create_time
glue_database.glue_database_catalog_id = glue_database_param.glue_database_catalog_id
glue_database.data_lake_principal_identifier = glue_database_param.data_lake_principal_identifier
glue_database.permissions = glue_database_param.permissions
glue_database.region = glue_database_param.region
glue_database.account_id = glue_database_param.account_id

session.add(glue_database)
session.commit()
Expand All @@ -704,18 +716,18 @@ def add_jdbc_conn(jdbcConn: schemas.JDBCInstanceSource):
jdbc_instance_source.network_sg_id = jdbcConn.network_sg_id
jdbc_instance_source.jdbc_driver_class_name = jdbcConn.jdbc_driver_class_name
jdbc_instance_source.jdbc_driver_jar_uri = jdbcConn.jdbc_driver_jar_uri
jdbc_instance_source.instance_class = jdbcConn.instance_class
jdbc_instance_source.instance_status = jdbcConn.instance_status
jdbc_instance_source.account_provider_id = jdbcConn.account_provider
# jdbc_instance_source.instance_class = jdbcConn.instance_class
# jdbc_instance_source.instance_status = jdbcConn.instance_status
jdbc_instance_source.account_provider_id = jdbcConn.account_provider_id
jdbc_instance_source.account_id = jdbcConn.account_id
jdbc_instance_source.region = jdbcConn.region
jdbc_instance_source.data_source_id = jdbcConn.data_source_id
jdbc_instance_source.detection_history_id = jdbcConn.detection_history_id
jdbc_instance_source.glue_database = jdbcConn.glue_database
jdbc_instance_source.glue_crawler = jdbcConn.glue_crawler
jdbc_instance_source.glue_connection = jdbcConn.glue_connection
jdbc_instance_source.glue_vpc_endpoint = jdbcConn.glue_vpc_endpoint
jdbc_instance_source.glue_state = jdbcConn.glue_state
# jdbc_instance_source.data_source_id = jdbcConn.data_source_id
# jdbc_instance_source.detection_history_id = jdbcConn.detection_history_id
# jdbc_instance_source.glue_database = jdbcConn.glue_database
# jdbc_instance_source.glue_crawler = jdbcConn.glue_crawler
# jdbc_instance_source.glue_connection = jdbcConn.glue_connection
# jdbc_instance_source.glue_vpc_endpoint = jdbcConn.glue_vpc_endpoint
# jdbc_instance_source.glue_state = jdbcConn.glue_state
jdbc_instance_source.create_type = jdbcConn.create_type
# jdbc_instance_source.

Expand Down Expand Up @@ -775,4 +787,3 @@ def get_total_jdbc_instances_count(provider_id):
def get_connected_jdbc_instances_count(provider_id):
list = list_jdbc_instance_source(provider_id)
return 0 if list is None else list.filter(JDBCInstanceSource.glue_state == ConnectionState.ACTIVE.value).count()

26 changes: 16 additions & 10 deletions source/constructs/api/data_source/glue_database_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ async def detect_glue_database_connection(session: Session, aws_account_id: str)
regions = crud.get_account_agent_regions(aws_account_id)
glue_database_list = []
refresh_list = []
logger.info("=================")
logger.info(regions)
for region in regions:
client = boto3.client(
'glue',
Expand All @@ -42,18 +44,22 @@ async def detect_glue_database_connection(session: Session, aws_account_id: str)
glue_database_list.append(client.get_databases()['DatabaseList'])
logger.info("detect_glue_database")
# list exist rds not exist insert
for glue_database in glue_database_list:
for glue_database_item in glue_database_list:
glue_database = glue_database_item[0]
refresh_list.append(glue_database["Name"])
if not crud.list_glue_database_by_name(glue_database["Name"]):
glue_database = schemas.SourceGlueDatabase
glue_database.glue_database_name = glue_database["Name"]
glue_database.glue_database_location_uri = glue_database["LocationUri"]
glue_database.glue_database_description = glue_database["Description"]
glue_database.glue_database_create_time = glue_database["CreateTime"]
glue_database.glue_database_catalog_id = glue_database["CatalogId"]
glue_database.region = ''
glue_database.account_id = aws_account_id
crud.add_glue_database(glue_database)
source_glue_database = schemas.SourceGlueDatabase
source_glue_database.glue_database_name = glue_database["Name"]
source_glue_database.glue_database_location_uri = glue_database["LocationUri"]
source_glue_database.glue_database_description = glue_database["Description"]
source_glue_database.glue_database_create_time = glue_database["CreateTime"]
source_glue_database.glue_database_catalog_id = glue_database["CatalogId"]
sub_info = glue_database["CreateTableDefaultPermissions"][0]
source_glue_database.data_lake_principal_identifier = sub_info["Principal"]["DataLakePrincipalIdentifier"]
source_glue_database.permissions = "|".join(sub_info["Permissions"])
source_glue_database.region = ''
source_glue_database.account_id = aws_account_id
crud.add_glue_database(source_glue_database)
# list not exist rds exist delete
crud.delete_not_exist_glue_database(refresh_list)
crud.update_glue_database_count(account=aws_account_id, region=admin_account_region)
Expand Down
15 changes: 5 additions & 10 deletions source/constructs/api/data_source/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def list_rds_instances(condition: QueryCondition):
page=condition.page,
))

@router.post("/list-glue-database", response_model=BaseResponse[Page[schemas.JDBCInstanceSource]])
@router.post("/list-glue-database", response_model=BaseResponse[Page[schemas.SourceGlueDatabaseFullInfo]])
@inject_session
def list_glue_databases(condition: QueryCondition):
instances = crud.list_glue_database(condition)
Expand All @@ -44,10 +44,10 @@ def list_glue_databases(condition: QueryCondition):
page=condition.page,
))

@router.post("/list-jdbc", response_model=BaseResponse[Page[schemas.JDBCInstanceSource]])
@router.post("/list-jdbc", response_model=BaseResponse[Page[schemas.JDBCInstanceSourceFullInfo]])
@inject_session
def list_jdbc_instances(condition: QueryCondition):
instances = crud.list_jdbc_instance_source(condition)
def list_jdbc_instances(provider_id: int, condition: QueryCondition):
instances = crud.list_jdbc_instance_source(provider_id)
if instances is None:
return None
return paginate(instances, Params(
Expand Down Expand Up @@ -142,7 +142,7 @@ def sync_jdbc_connection(jdbc: schemas.SourceJDBCConnection):
@inject_session
def refresh_data_source(type: schemas.NewDataSource):
service.refresh_data_source(
type.provider,
type.provider_id,
type.accounts,
type.type
)
Expand Down Expand Up @@ -261,8 +261,3 @@ def query_full_provider_infos():
@inject_session
def list_providers():
return service.list_providers()

@router.post("/list-jdbc-schemas", response_model=BaseResponse)
@inject_session
def list_jdbc_schema(account_id: str):
return service.list_jdbc_schema(account_id)
76 changes: 39 additions & 37 deletions source/constructs/api/data_source/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,22 +92,23 @@ class Config:
orm_mode = True

class SourceGlueDatabase(BaseModel):

id: int
glue_database_name: Optional[str]
glue_database_description: Optional[str]
glue_database_location_uri: Optional[str]
glue_database_create_time: Optional[str]
glue_database_catalog_id: Optional[str]
data_lake_principal_identifier: Optional[str]
permissions: Optional[str]
account_id: Optional[str]
region: Optional[str]

class SourceGlueDatabaseFullInfo(SourceGlueDatabase):
glue_state: Optional[str]
account_id: Optional[str]
region: Optional[str]
detection_history_id: Optional[int]
version: Optional[int]
create_by: Optional[str]
create_time: Optional[datetime.datetime]
modify_by: Optional[str]
modify_time: Optional[datetime.datetime]

class Config:
orm_mode = True

class RdsInstanceSource(BaseModel):
id: int
Expand Down Expand Up @@ -138,37 +139,43 @@ class Config:
class JDBCInstanceSource(BaseModel):
instance_id: Optional[str]
description: Optional[str]
jdbc_connection_url: Optional[str] # "jdbc:mysql://81.70.179.114:9000/"
jdbc_enforce_ssl: Optional[str] # "false" Require SSL connection 如果连不上connection会报错
kafka_ssl_enabled: Optional[str] # "false"
jdbc_connection_url: Optional[str]
jdbc_enforce_ssl: Optional[str]
kafka_ssl_enabled: Optional[str]
master_username: Optional[str]
password: Optional[str]
skip_custom_jdbc_cert_validation: Optional[str] # "false"
custom_jdbc_cert: Optional[str] # SSL证书地址
custom_jdbc_cert_string: Optional[str] # For Oracle Database this maps to SSL_SERVER_CERT_DN, and for SQL Server it maps to hostNameInCertificate.
skip_custom_jdbc_cert_validation: Optional[str]
custom_jdbc_cert: Optional[str]
custom_jdbc_cert_string: Optional[str]
network_availability_zone: Optional[str]
network_subnet_id: Optional[str]
network_sg_id: Optional[str]
creation_time: Optional[str]
last_updated_time: Optional[str]
jdbc_driver_class_name: Optional[str]
jdbc_driver_jar_uri: Optional[str] # s3://mysql-connector-2023/mysql-connector-j-8.1.0.jar 必须校验为jar文件
jdbc_driver_jar_uri: Optional[str]
create_type: Optional[int]
account_provider_id: Optional[int]
account_id: Optional[str]
region: Optional[str]

class Config:
orm_mode = True

class JDBCInstanceSourceFullInfo(JDBCInstanceSource):
data_source_id: Optional[int]
detection_history_id: Optional[int]
glue_database: Optional[str]
glue_connection: Optional[str]
glue_vpc_endpoint: Optional[str]
glue_crawler: Optional[str]
glue_state: Optional[str]
create_type: int
instance_class: Optional[str]
instance_status: Optional[str]
account_provider: Optional[str]
account_id: Optional[str]
region: Optional[str]

class Config:
orm_mode = True


class Region(BaseModel):
id: Optional[int]
region: Optional[str]
Expand Down Expand Up @@ -206,28 +213,23 @@ class SourceRdsConnection(BaseModel):
rds_secret: Optional[str]

class SourceJDBCConnection(BaseModel):
account_provider: int
account_id: str
region: str
instance: str
engine: Optional[str]
address: Optional[str]
port: Optional[int]
username: Optional[str]
password: Optional[str]
account_provider: Optional[int]
account_id: Optional[str]
region: Optional[str]
instance: Optional[str]
secret: Optional[str]
network_availability_zone: Optional[str]
network_subnet_id: Optional[str]
network_sg_id: Optional[str]
jdbc_connection_url: Optional[str] # "jdbc:mysql://81.70.179.114:9000/"
jdbc_enforce_ssl: Optional[str] # "false" Require SSL connection 如果连不上connection会报错
kafka_ssl_enabled: Optional[str] # "false"
jdbc_connection_url: Optional[str]
jdbc_enforce_ssl: Optional[str]
kafka_ssl_enabled: Optional[str]
master_username: Optional[str]
password: Optional[str]
skip_custom_jdbc_cert_validation: Optional[str] # "false"
custom_jdbc_cert: Optional[str] # SSL证书地址
custom_jdbc_cert_string: Optional[str] # For Oracle Database this maps to SSL_SERVER_CERT_DN, and for SQL Server it maps to hostNameInCertificate.
skip_custom_jdbc_cert_validation: Optional[str]
custom_jdbc_cert: Optional[str]
custom_jdbc_cert_string: Optional[str]


class SourceDeteteGlueDatabase(BaseModel):
account_provider: int
Expand Down Expand Up @@ -260,7 +262,7 @@ class SourceOrgAccount(BaseModel):
organization_management_account_id: str

class NewDataSource(BaseModel):
provider: Optional[str]
provider_id: Optional[int]
accounts: List[str]
type: DataSourceType = DataSourceType.all

Expand Down
Loading