diff --git a/stix2/datastore/relational_db/input_creation.py b/stix2/datastore/relational_db/input_creation.py index e6b6ee90..7913de3c 100644 --- a/stix2/datastore/relational_db/input_creation.py +++ b/stix2/datastore/relational_db/input_creation.py @@ -356,7 +356,7 @@ def generate_insert_for_sub_object( name, stix_object, data_sink=data_sink, - table_name=table_name if isinstance(prop, ListProperty) else parent_table_name, + table_name=table_name if isinstance(prop, (DictionaryProperty, ListProperty)) else parent_table_name, schema_name=None, foreign_key_value=foreign_key_value, is_embedded_object=is_embedded_object, diff --git a/stix2/datastore/relational_db/table_creation.py b/stix2/datastore/relational_db/table_creation.py index 0b8b7446..0b6e605d 100644 --- a/stix2/datastore/relational_db/table_creation.py +++ b/stix2/datastore/relational_db/table_creation.py @@ -69,29 +69,75 @@ def create_ref_table(metadata, specifics, table_name, foreign_key_name, schema_n def create_hashes_table(name, metadata, schema_name, table_name, key_type=Text, level=1): + columns = list() + # special case, perhaps because its a single embedded object with hashes, and not a list of embedded object + # making the parent table's primary key does seem to worl + if table_name == "windows-pebinary-ext_WindowsPEOptionalHeaderType": + columns.append( + Column( + "id", + key_type, + # ForeignKey( + # canonicalize_table_name(table_name, schema_name) + (".hash_ref_id" if table_name == "external_references" else ".id"), + # ondelete="CASCADE", + # ), + + nullable=False, + ), + ) + else: + columns.append( + Column( + "id", + key_type, + ForeignKey( + canonicalize_table_name(table_name, schema_name) + (".hash_ref_id" if table_name == "external_references" else ".id"), + ondelete="CASCADE", + ), + + nullable=False, + ), + ) + columns.append( + Column( + "hash_name", + Text, + nullable=False, + ), + ) + columns.append( + Column( + "hash_value", + Text, + nullable=False, + ), + ) + return Table(canonicalize_table_name(table_name + "_" + name), metadata, *columns, schema=schema_name) + + +def create_kill_chain_phases_table(name, metadata, schema_name, table_name): columns = list() columns.append( Column( "id", - key_type, + Text, ForeignKey( - canonicalize_table_name(table_name, schema_name) + (".hash_ref_id" if table_name == "external_references" else ".id"), + canonicalize_table_name(table_name, schema_name) + ".id", ondelete="CASCADE", ), - nullable=False, ), ) columns.append( Column( - "hash_name", + "kill_chain_name", Text, nullable=False, ), ) columns.append( Column( - "hash_value", + "phase_name", Text, nullable=False, ), @@ -246,16 +292,34 @@ def determine_sql_type(self): # noqa: F811 # ----------------------------- generate_table_information methods ---------------------------- +@add_method(KillChainPhase) +def generate_table_information( # noqa: F811 + self, name, metadata, schema_name, table_name, is_extension=False, is_list=False, + **kwargs, +): + level = kwargs.get("level") + return generate_object_table( + self.type, metadata, schema_name, table_name, is_extension, True, is_list, + parent_table_name=table_name, level=level + 1 if is_list else level, + ) + @add_method(Property) -def generate_table_information(self, name, **kwargs): +def generate_table_information(self, name, **kwargs): # noqa: F811 pass @add_method(BinaryProperty) def generate_table_information(self, name, **kwargs): # noqa: F811 - print("BinaryProperty not handled, yet") - return None + return Column( + name, + Text, + CheckConstraint( + # this regular expression might accept or reject some legal base64 strings + f"{name} ~ " + "'^[-A-Za-z0-9+/]*={0,3}$'", + ), + nullable=not self.required, + ) @add_method(BooleanProperty) @@ -374,12 +438,9 @@ def generate_table_information(self, name, **kwargs): # noqa: F811 @add_method(HashesProperty) def generate_table_information(self, name, metadata, schema_name, table_name, is_extension=False, **kwargs): # noqa: F811 level = kwargs.get("level") - parent_table_name = kwargs.get("parent_table_name") if kwargs.get("is_embedded_object"): if not kwargs.get("is_list") or level == 0: key_type = Text - # querky case where a property of an object is a single embedded objects - table_name = parent_table_name else: key_type = Integer else: @@ -524,6 +585,9 @@ def generate_table_information(self, name, metadata, schema_name, table_name, ** ), ) return tables + elif self.contained == KillChainPhase: + tables.append(create_kill_chain_phases_table("kill_chain_phase", metadata, schema_name, table_name)) + return tables else: if isinstance(self.contained, Property): sql_type = self.contained.determine_sql_type() @@ -617,7 +681,9 @@ def generate_object_table( table_name = table_name.replace("extension-definition-", "ext_def") if parent_table_name: table_name = parent_table_name + "_" + table_name - if schema_name in ["sdo", "sro"]: + if is_embedded_object: + core_properties = list() + elif schema_name in ["sdo", "sro"]: core_properties = SDO_COMMON_PROPERTIES elif schema_name == "sco": core_properties = SCO_COMMON_PROPERTIES @@ -670,6 +736,8 @@ def generate_object_table( canonicalize_table_name(foreign_key_name, schema_name) + (".ref_id" if is_list else ".id"), ondelete="CASCADE", ), + # if it is a not list, then it is a single embedded object, and the primary key is unique + # primary_key=not is_list ) elif level > 0 and is_embedded_object: column = Column( @@ -693,9 +761,12 @@ def generate_object_table( ) columns.append(column) - all_tables = [Table(canonicalize_table_name(table_name), metadata, *columns, schema=schema_name)] - all_tables.extend(tables) - return all_tables + # all_tables = [Table(canonicalize_table_name(table_name), metadata, *columns, schema=schema_name)] + # all_tables.extend(tables) + # return all_tables + + tables.append(Table(canonicalize_table_name(table_name), metadata, *columns, schema=schema_name)) + return tables def create_core_tables(metadata):