Skip to content

Commit

Permalink
constraitns for reg-ex db dependent
Browse files Browse the repository at this point in the history
  • Loading branch information
rpiazza committed Dec 9, 2024
1 parent f11cce1 commit 9183447
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ def determine_stix_type(stix_object):
def array_allowed():
return False

@staticmethod
def create_regex_constraint_expression(column, pattern):
pass

def process_value_for_insert(self, stix_type, value):
sql_type = stix_type.determine_sql_type(self)
if sql_type == self.determine_sql_type_for_string_property():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,7 @@ def determine_sql_type_for_timestamp_property(): # noqa: F811
@staticmethod
def array_allowed():
return True

@staticmethod
def create_regex_constraint_expression(column_name, pattern):
return f"{column_name} ~ {pattern}"
64 changes: 18 additions & 46 deletions stix2/datastore/relational_db/table_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ def create_kill_chain_phases_table(name, metadata, db_backend, schema_name, tabl
def create_granular_markings_table(metadata, db_backend, sco_or_sdo):
schema_name = db_backend.schema_for_core()
tables = list()
reg_ex = f"'^marking-definition--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'" # noqa: E131
columns = [
Column(
"id",
Expand All @@ -173,10 +174,7 @@ def create_granular_markings_table(metadata, db_backend, sco_or_sdo):
Column(
"marking_ref",
db_backend.determine_sql_type_for_reference_property(),
CheckConstraint(
"marking_ref ~ '^marking-definition--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'",
# noqa: E131
),
CheckConstraint(db_backend.create_regex_constraint_expression("marking_ref", reg_ex)),
),
]
if db_backend.array_allowed():
Expand Down Expand Up @@ -230,15 +228,13 @@ def create_granular_markings_table(metadata, db_backend, sco_or_sdo):


def create_external_references_tables(metadata, db_backend):
reg_ex = "'^[a-z][a-z0-9-]+[a-z0-9]--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'" # noqa: E131
columns = [
Column(
"id",
db_backend.determine_sql_type_for_key_as_id(),
ForeignKey("common.core_sdo" + ".id", ondelete="CASCADE"),
CheckConstraint(
"id ~ '^[a-z][a-z0-9-]+[a-z0-9]--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'", # noqa: E131
),
),
CheckConstraint(db_backend.create_regex_constraint_expression("id", reg_ex))),
Column("source_name", db_backend.determine_sql_type_for_string_property()),
Column("description", db_backend.determine_sql_type_for_string_property()),
Column("url", db_backend.determine_sql_type_for_string_property()),
Expand All @@ -255,26 +251,23 @@ def create_external_references_tables(metadata, db_backend):
def create_core_table(metadata, db_backend, stix_type_name):
tables = list()
table_name = "core_" + stix_type_name
reg_ex = "'^[a-z][a-z0-9-]+[a-z0-9]--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'" # noqa: E131
columns = [
Column(
"id",
db_backend.determine_sql_type_for_key_as_id(),
CheckConstraint(
"id ~ '^[a-z][a-z0-9-]+[a-z0-9]--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'", # noqa: E131
),
CheckConstraint(db_backend.create_regex_constraint_expression("id", reg_ex)),
primary_key=True,
),
Column("spec_version", db_backend.determine_sql_type_for_string_property(), default="2.1"),
]
if stix_type_name == "sdo":
reg_ex = "'^identity--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'" # noqa: E131
sdo_columns = [
Column(
"created_by_ref",
db_backend.determine_sql_type_for_reference_property(),
CheckConstraint(
"created_by_ref ~ '^identity--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'", # noqa: E131
),
),
CheckConstraint(db_backend.create_regex_constraint_expression("created_by_ref", reg_ex))),
Column("created", db_backend.determine_sql_type_for_timestamp_property()),
Column("modified", db_backend.determine_sql_type_for_timestamp_property()),
Column("revoked", db_backend.determine_sql_type_for_boolean_property()),
Expand Down Expand Up @@ -405,9 +398,8 @@ def generate_table_information(self, name, db_backend, **kwargs): # noqa: F811
return Column(
name,
self.determine_sql_type(db_backend),
CheckConstraint(
# this regular expression might accept or reject some legal base64 strings
f"{name} ~ " + "'^[-A-Za-z0-9+/]*={0,3}$'",
# this regular expression might accept or reject some legal base64 strings
CheckConstraint(db_backend.create_regex_constraint_expression(name, "'^[-A-Za-z0-9+/]*={0,3}$'")
),
nullable=not self.required,
)
Expand Down Expand Up @@ -534,9 +526,7 @@ def generate_table_information(self, name, db_backend, **kwargs): # noqa: F811
return Column(
name,
self.determine_sql_type(db_backend),
CheckConstraint(
f"{name} ~ '^{enum_re}$'",
),
CheckConstraint(db_backend.create_regex_constraint_expression(name, f"'^{enum_re}$'")),
nullable=not self.required,
)

Expand Down Expand Up @@ -609,18 +599,7 @@ def generate_table_information(self, name, db_backend, **kwargs): # noqa: F811
schema_name = kwargs.get('schema_name')
table_name = kwargs.get("table_name")
core_table = kwargs.get("core_table")
# if schema_name == "common":
# return Column(
# name,
# Text,
# CheckConstraint(
# f"{name} ~ '^{table_name}" + "--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'",
# # noqa: E131
# ),
# primary_key=True,
# nullable=not (self.required),
# )
# else:
id_req_exp = f"'^{table_name}" + "--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'" # noqa: E131
if schema_name:
foreign_key_column = f"common.core_{core_table}.id"
else:
Expand All @@ -630,8 +609,7 @@ def generate_table_information(self, name, db_backend, **kwargs): # noqa: F811
db_backend.determine_sql_type_for_key_as_id(),
ForeignKey(foreign_key_column, ondelete="CASCADE"),
CheckConstraint(
f"{name} ~ '^{table_name}" + "--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'",
# noqa: E131
db_backend.create_regex_constraint_expression(name, id_req_exp)
),
primary_key=True,
nullable=not (self.required),
Expand Down Expand Up @@ -743,17 +721,14 @@ def ref_column(name, specifics, db_backend, auth_type=0):
if specifics:
types = "|".join(specifics)
if auth_type == 0:
reg_ex = f"'^({types})" + "--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'" # noqa: F811
constraint = \
CheckConstraint(
f"{name} ~ '^({types})" +
"--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'",
)
CheckConstraint(db_backend.create_regex_constraint_expression(name, reg_ex))
else:
reg_ex = "'--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$')"
constraint = \
CheckConstraint(
f"(NOT({name} ~ '^({types})')) AND ({name} ~ " +
"'--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$')",
)
CheckConstraint(db_backend.create_regex_constraint_expression(f"NOT({name}", f"'^({types})'") + " AND " +
db_backend.create_regex_constraint_expression(name, reg_ex))
return Column(name, db_backend.determine_sql_type_for_reference_property(), constraint)
else:
return Column(
Expand Down Expand Up @@ -789,9 +764,6 @@ def generate_table_information(self, name, db_backend, **kwargs): # noqa: F811
return Column(
name,
self.determine_sql_type(db_backend),
# CheckConstraint(
# f"{name} ~ '^{enum_re}$'"
# ),
nullable=not (self.required),
)

Expand Down

0 comments on commit 9183447

Please sign in to comment.