Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(be): sync api update #288

Merged
merged 9 commits into from
Oct 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion source/constructs/api/catalog/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def sync_crawler_result(
rds_engine_type = rds_database.engine

if database_type.startswith(DatabaseType.JDBC.value):
from data_source.service import convert_database_type_provider
from common.abilities import convert_database_type_provider
provider_id = convert_database_type_provider(database_type)
jdbc_database = data_source_crud.get_jdbc_instance_source(
provider_id, account_id, region, database_name
Expand Down
2 changes: 1 addition & 1 deletion source/constructs/api/catalog/service_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def agg_catalog_summary_by_attr(database_type: str, agg_attribute: str, need_me
def get_catalog_summay_by_provider_region(provider_id: int, region: str):
summy = crud.get_catalog_summay_by_provider_region(region)
logger.info(summy)
from data_source.service import convert_database_type_provider
from common.abilities import convert_database_type_provider
database_type_list = []
for member in DatabaseType.__members__.values():
if convert_database_type_provider(member.value) == provider_id:
Expand Down
18 changes: 18 additions & 0 deletions source/constructs/api/common/abilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from common.enum import (Provider,
DatabaseType)

def convert_database_type_provider(database_type: str) -> int:
530051970 marked this conversation as resolved.
Show resolved Hide resolved
if database_type == DatabaseType.JDBC_ALIYUN.value:
return Provider.ALI_CLOUD.value
elif database_type == DatabaseType.JDBC_TENCENT.value:
return Provider.TENCENT_CLOUD.value
else:
return Provider.AWS_CLOUD.value

def convert_provider_id_str(provider: int) -> str:
if provider == Provider.TENCENT_CLOUD.value:
return DatabaseType.JDBC_TENCENT.value
elif Provider.ALI_CLOUD.value:
return DatabaseType.JDBC_ALIYUN.value
else:
return DatabaseType.JDBC_AWS.value
8 changes: 4 additions & 4 deletions source/constructs/api/common/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def __setattr__(self, name, value):
const.EXPORT_CSV_MAX_LINES = 60000
const.EXPORT_S3_MARK_STR = "Amazon_S3"
const.EXPORT_RDS_MARK_STR = "Amazon_RDS"
const.SECURITY_GROUP_JDBC = "glue-jdbc-con"
const.AWS_PID = 1
const.ALI_PID = 3
const.TENCENT_PID = 2
const.SECURITY_GROUP_JDBC = "SDPS-CustomDB"
# const.AWS_PID = 1
# const.ALI_PID = 3
# const.TENCENT_PID = 2
11 changes: 8 additions & 3 deletions source/constructs/api/common/enum.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,15 @@ class MessageEnum(Enum):
SOURCE_SUBNET_JDBC_NOT_EXISTS = {1240: "Subnet for jdbc connection is not existed"}
SOURCE_AVAILABILITY_ZONE_NOT_EXISTS = {1241: "AZ for jdbc connection is not existed"}
SOURCE_SUBNET_NOT_PRIVATE = {1242: "Subnet for jdbc connection is not private subnet"}
SOURCE_SUBNET_NOT_CONTAIN_NAT = {1243: "Subnet for jdbc connection not contain NAT Gateway"}
SOURCE_VPC_NOT_CONTAIN_NAT = {1243: "Vpc for jdbc connection not contain NAT Gateway"}
SOURCE_JDBC_NO_SCHEMA = {1244: "There is no user created schema found in the jdbc connection"}
SOURCE_JDBC_CREATE_FAIL = {1245: "JDBC connection create failed"}
SOURCE_NOT_AWS_ACCOUNT = {1246: "Could not import connection from non-aws account"}
SOURCE_CONNECTION_NOT_FOUND = {1247: "Connection not found"}
SOURCE_SECURITYGROUP_NOT_FOUND = {1248: "SecurityGroup not found"}
SOURCE_JDBC_CONNECTION_NOT_EXIST = {1249: "JDBC connection not exist"}
SOURCE_SUBNET_NOT_EXIST = {1250: "Subnet for JDBC connection not exist in target account"}
SOURCE_UNCONNECTED = {1251: "UNCONNECTED"}
# label
LABEL_EXIST_FAILED = {1611: "Cannot create duplicated label"}

Expand Down Expand Up @@ -170,8 +176,6 @@ class DatabaseType(Enum):
GLUE = "glue"
DDB = "ddb"
EMR = "emr"
# GLUE_DATABASE = "glue_database"
# JDBC is a virtual type
JDBC = "jdbc"
JDBC_AWS = "jdbc-aws"
JDBC_TENCENT = "jdbc-tencent"
Expand Down Expand Up @@ -294,6 +298,7 @@ class Provider(Enum):
AWS_CLOUD = 1
TENCENT_CLOUD = 2
GOOGLE_CLOUD = 3
ALI_CLOUD = 4

@unique
class SourceCreateType(Enum):
Expand Down
265 changes: 182 additions & 83 deletions source/constructs/api/data_source/crud.py

Large diffs are not rendered by default.

9 changes: 4 additions & 5 deletions source/constructs/api/data_source/glue_database_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from db.database import get_session
from db.models_data_source import DetectionHistory, RdsInstanceSource, Account
from . import crud, schemas
from . import service
from catalog.service import delete_catalog_by_database_region
from sqlalchemy.orm import Session
import asyncio
Expand All @@ -31,8 +30,6 @@ async def detect_glue_database_connection(session: Session, aws_account_id: str)
regions = crud.get_account_agent_regions(aws_account_id)
glue_database_list = []
refresh_list = []
logger.info("=================")
logger.info(regions)
for region in regions:
client = boto3.client(
'glue',
Expand All @@ -43,11 +40,13 @@ async def detect_glue_database_connection(session: Session, aws_account_id: str)
)
glue_database_list.append(client.get_databases()['DatabaseList'])
logger.info("detect_glue_database")
db_glue_list = crud.list_glue_database_ar(account_id=aws_account_id, region=admin_account_region)
glue_database_name_list = [item.glue_database_name for item in db_glue_list]
# list exist rds not exist insert
for glue_database_item in glue_database_list:
glue_database = glue_database_item[0]
refresh_list.append(glue_database["Name"])
if not crud.list_glue_database_by_name(glue_database["Name"]):
if glue_database["Name"] not in glue_database_name_list:
source_glue_database = schemas.SourceGlueDatabase
source_glue_database.glue_database_name = glue_database["Name"]
source_glue_database.glue_database_location_uri = glue_database["LocationUri"]
Expand All @@ -59,7 +58,7 @@ async def detect_glue_database_connection(session: Session, aws_account_id: str)
source_glue_database.permissions = "|".join(sub_info["Permissions"])
source_glue_database.region = ''
source_glue_database.account_id = aws_account_id
crud.add_glue_database(source_glue_database)
crud.import_glue_database(source_glue_database)
# list not exist rds exist delete
crud.delete_not_exist_glue_database(refresh_list)
crud.update_glue_database_count(account=aws_account_id, region=admin_account_region)
Expand Down
22 changes: 16 additions & 6 deletions source/constructs/api/data_source/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def delete_glue_database(glueDatabase: schemas.SourceDeteteGlueDatabase):

@router.post("/sync-glue-database", response_model=BaseResponse)
@inject_session
def sync_glue_database(glueDatabase: schemas.SourceGlueDatabase):
def sync_glue_database(glueDatabase: schemas.SourceGlueDatabaseBase):
return service.sync_glue_database(
glueDatabase.account_id,
glueDatabase.region,
Expand All @@ -135,7 +135,7 @@ def delete_jdbc_connection(jdbc: schemas.SourceDeteteJDBCConnection):

@router.post("/sync-jdbc", response_model=BaseResponse)
@inject_session
def sync_jdbc_connection(jdbc: schemas.SourceJDBCConnection):
def sync_jdbc_connection(jdbc: schemas.JDBCInstanceSourceBase):
return service.sync_jdbc_connection(jdbc)

@router.post("/refresh", response_model=BaseResponse)
Expand Down Expand Up @@ -196,16 +196,26 @@ def get_secrets(account: str, region: str):
def get_admin_account_info():
return service.get_admin_account_info()

@router.post("/add-glue-database", response_model=BaseResponse)
@router.post("/import-glue-database", response_model=BaseResponse)
@inject_session
def add_glue_database(glueDataBase: schemas.SourceGlueDatabase):
return service.add_glue_database(glueDataBase)
def import_glue_database(glueDataBase: schemas.SourceGlueDatabaseBase):
return service.import_glue_database(glueDataBase)

@router.post("/add-jdbc-conn", response_model=BaseResponse)
@inject_session
def add_jdbc_conn(jdbcConn: schemas.JDBCInstanceSource):
return service.add_jdbc_conn(jdbcConn)

@router.post("/update-jdbc-conn", response_model=BaseResponse)
@inject_session
def update_jdbc_conn(jdbcConn: schemas.JDBCInstanceSource):
return service.update_jdbc_conn(jdbcConn)

@router.post("/import-jdbc-conn", response_model=BaseResponse)
@inject_session
def import_jdbc_conn(jdbcConn: schemas.JDBCInstanceSourceBase):
return service.import_jdbc_conn(jdbcConn)

@router.post("/query-glue-connections", response_model=BaseResponse)
@inject_session
def query_glue_connections(account: schemas.AdminAccountInfo):
Expand All @@ -218,7 +228,7 @@ def query_glue_databases(account: schemas.AdminAccountInfo):

@router.post("/query-account-network", response_model=BaseResponse)
@inject_session
def query_account_network(account: schemas.AdminAccountInfo):
def query_account_network(account: schemas.AccountInfo):
return service.query_account_network(account)

@router.post("/test-glue-conn", response_model=BaseResponse)
Expand Down
9 changes: 5 additions & 4 deletions source/constructs/api/data_source/s3_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,19 @@ async def detect_s3_data_source(session: Session, aws_account_id: str):
s3_db_bucket_name_list.append(row.bucket_name)
s3_db_region_list.append(row.region)
s3_db_glue_state.append(row.glue_state) # None is unconnected

# Compare S3 bucket in data source table with S3 bucket in agent account
deleted_s3_bucket_source_list = list(set(s3_db_bucket_name_list) - set(s3_agent_bucket_name_list))

for deleted_s3_bucket_source in deleted_s3_bucket_source_list:
index = s3_db_bucket_name_list.index(deleted_s3_bucket_source)
s3_connection_state = s3_db_glue_state[index]
deleted_s3_region = s3_db_region_list[index]

if s3_connection_state is not None:
# The bucket is a connected data source
service.delete_s3_connection(aws_account_id, deleted_s3_region, deleted_s3_bucket_source)

# Delete data catalog in case the user first connect it and generate data catalog then disconnect it
try:
delete_catalog_by_database_region(deleted_s3_bucket_source, deleted_s3_region, DatabaseType.S3)
Expand All @@ -125,4 +125,5 @@ async def detect_multiple_account_in_async(accounts):


def detect(accounts):
logger.info(f"***************{accounts}")
asyncio.run(detect_multiple_account_in_async(accounts))
97 changes: 70 additions & 27 deletions source/constructs/api/data_source/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,21 @@ class DynamodbTableSource(BaseModel):
class Config:
orm_mode = True

class SourceGlueDatabase(BaseModel):
class SourceGlueDatabaseBase(BaseModel):
glue_database_name: Optional[str]
account_id: Optional[str]
region: Optional[str]

class SourceGlueDatabase(SourceGlueDatabaseBase):
glue_database_description: Optional[str]
glue_database_location_uri: Optional[str]
glue_database_create_time: Optional[str]
glue_database_catalog_id: Optional[str]
data_lake_principal_identifier: Optional[str]
permissions: Optional[str]
account_id: Optional[str]
region: Optional[str]

class Config:
orm_mode = True

class SourceGlueDatabaseFullInfo(SourceGlueDatabase):
glue_state: Optional[str]
Expand Down Expand Up @@ -136,8 +141,16 @@ class Config:
orm_mode = True


class JDBCInstanceSource(BaseModel):
class JDBCInstanceSourceBase(BaseModel):
instance_id: Optional[str]
account_provider_id: Optional[int]
account_id: Optional[str]
region: Optional[str]

class Config:
orm_mode = True

class JDBCInstanceSource(JDBCInstanceSourceBase):
description: Optional[str]
jdbc_connection_url: Optional[str]
jdbc_enforce_ssl: Optional[str]
Expand All @@ -155,14 +168,33 @@ class JDBCInstanceSource(BaseModel):
jdbc_driver_class_name: Optional[str]
jdbc_driver_jar_uri: Optional[str]
create_type: Optional[int]
account_provider_id: Optional[int]
account_id: Optional[str]
region: Optional[str]

class Config:
orm_mode = True

class JDBCInstanceSourceUpdate(JDBCInstanceSourceBase):
description: Optional[str]
jdbc_connection_url: Optional[str]
jdbc_enforce_ssl: Optional[str]
kafka_ssl_enabled: Optional[str]
master_username: Optional[str]
password: Optional[str]
skip_custom_jdbc_cert_validation: Optional[str]
custom_jdbc_cert: Optional[str]
custom_jdbc_cert_string: Optional[str]
network_availability_zone: Optional[str]
network_subnet_id: Optional[str]
network_sg_id: Optional[str]
jdbc_driver_class_name: Optional[str]
jdbc_driver_jar_uri: Optional[str]

class Config:
orm_mode = True

class JDBCInstanceSourceFullInfo(JDBCInstanceSource):
# def __init__(self):
# JDBCInstanceSource.__init__(self)

data_source_id: Optional[int]
detection_history_id: Optional[int]
glue_database: Optional[str]
Expand Down Expand Up @@ -212,23 +244,32 @@ class SourceRdsConnection(BaseModel):
rds_password: Optional[str]
rds_secret: Optional[str]

class SourceJDBCConnection(BaseModel):
account_provider: Optional[int]
account_id: Optional[str]
region: Optional[str]
instance: Optional[str]
secret: Optional[str]
network_availability_zone: Optional[str]
network_subnet_id: Optional[str]
network_sg_id: Optional[str]
jdbc_connection_url: Optional[str]
jdbc_enforce_ssl: Optional[str]
kafka_ssl_enabled: Optional[str]
master_username: Optional[str]
password: Optional[str]
skip_custom_jdbc_cert_validation: Optional[str]
custom_jdbc_cert: Optional[str]
custom_jdbc_cert_string: Optional[str]
# class SourceJDBCConnectionBase(BaseModel):
# account_provider: Optional[int]
# account_id: Optional[str]
# region: Optional[str]
# instance: Optional[str]

# class SourceJDBCConnection(BaseModel):
# account_provider: Optional[int]
# account_id: Optional[str]
# region: Optional[str]
# instance: Optional[str]
# secret: Optional[str]
# network_availability_zone: Optional[str]
# network_subnet_id: Optional[str]
# network_sg_id: Optional[str]
# jdbc_connection_url: Optional[str]
# jdbc_enforce_ssl: Optional[str]
# kafka_ssl_enabled: Optional[str]
# master_username: Optional[str]
# password: Optional[str]
# skip_custom_jdbc_cert_validation: Optional[str]
# custom_jdbc_cert: Optional[str]
# custom_jdbc_cert_string: Optional[str]

# class Config:
# orm_mode = True


class SourceDeteteGlueDatabase(BaseModel):
Expand Down Expand Up @@ -270,6 +311,11 @@ class AdminAccountInfo(BaseModel):
account_id: str
region: str

class AccountInfo(BaseModel):
account_provider_id: int
account_id: str
region: str

class SourceProvider(BaseModel):

id: int
Expand Down Expand Up @@ -327,6 +373,3 @@ class DataLocationInfo(BaseModel):
region: Optional[str]
account_count: Optional[int]
coordinate: Optional[str]
region_alias: Optional[str]
provider_id: Optional[str]
provider_name: Optional[str]
Loading