-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
9685913
commit 8d65d72
Showing
11 changed files
with
2,574 additions
and
7,744 deletions.
There are no files selected for viewing
File renamed without changes.
52 changes: 52 additions & 0 deletions
52
buildingmotif/semantic_graph_synthesizer/bindings_utils.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
from typing import List, Union | ||
|
||
from rdflib import Namespace | ||
|
||
from buildingmotif.dataclasses import Model, Template | ||
from buildingmotif.semantic_graph_synthesizer.classes import Bindings, UnifiedBindings | ||
|
||
BLDG = Namespace("urn:building/") | ||
|
||
|
||
def unify_bindings(bindings_list: List[Bindings]) -> List[UnifiedBindings]: | ||
"""Combine all the bindings for the same template with the same name""" | ||
unified_bindings_list: List[UnifiedBindings] = [] | ||
for bindings in bindings_list: | ||
if bindings.template is None: | ||
continue | ||
|
||
unified_bindings = next( | ||
( | ||
unified_bindings | ||
for unified_bindings in unified_bindings_list | ||
if unified_bindings.template.name == bindings.template.name | ||
and unified_bindings.bindings["name"] == bindings.bindings["name"] | ||
), | ||
None, | ||
) | ||
|
||
if unified_bindings is None: | ||
unified_bindings = UnifiedBindings( | ||
labels=[], | ||
template=bindings.template, | ||
bindings={}, | ||
cost=bindings.cost, | ||
) | ||
unified_bindings_list.append(unified_bindings) | ||
|
||
unified_bindings.labels.append(bindings.label) | ||
unified_bindings.bindings.update(bindings.bindings) | ||
|
||
return unified_bindings_list | ||
|
||
|
||
def evaluate_bindings( | ||
bindings: Union[Bindings, UnifiedBindings] | ||
) -> Union[Template, Model]: | ||
"""evaluate bindings""" | ||
if bindings.template is None: | ||
raise ValueError("bindings have no template.") | ||
|
||
return bindings.template.evaluate( | ||
{p: BLDG[t.identifier] for p, t in bindings.bindings.items()} | ||
) |
155 changes: 155 additions & 0 deletions
155
buildingmotif/semantic_graph_synthesizer/bipartite_token_mapper.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
import logging | ||
from pathlib import Path | ||
from typing import Dict, List, Optional, Tuple | ||
|
||
import numpy as np | ||
import pandas as pd | ||
from rdflib import Graph | ||
from scipy.optimize import linear_sum_assignment | ||
|
||
from buildingmotif.dataclasses import Template | ||
from buildingmotif.namespaces import PARAM | ||
from buildingmotif.semantic_graph_synthesizer.classes import Cost, Param, Token, URIRef | ||
|
||
brick = Graph() | ||
PROJECT_DIR = Path(__file__).resolve().parents[1] | ||
brick.parse(PROJECT_DIR / "libraries/brick/Brick.ttl") | ||
|
||
|
||
logger = logging.getLogger() | ||
|
||
|
||
def get_typed_params(template) -> List[Param]: | ||
query = """ | ||
SELECT ?s ?o | ||
WHERE { | ||
?s a ?o | ||
FILTER (strstarts(str(?s), 'urn:___param___')) | ||
} | ||
""" | ||
params = [] | ||
for s, c in template.body.query(query): | ||
params.append(Param(name=s[len(PARAM) :], classname=c)) | ||
return params | ||
|
||
|
||
class BipartiteTokenMapper: | ||
@staticmethod | ||
def _get_parent_class(brick_class: URIRef) -> Optional[URIRef]: | ||
"""Get the immediate parent of brick class""" | ||
query = """ | ||
SELECT ?parent | ||
WHERE {{ | ||
?child rdfs:subClassOf ?parent | ||
}} | ||
""" | ||
result = brick.query(query, initBindings={"child": brick_class}) | ||
parents = (parent for (parent,) in result) | ||
|
||
return next(parents, None) | ||
|
||
@staticmethod | ||
def _get_edge_cost( | ||
token_class: URIRef, param_class: URIRef, cost_power: int = 0 | ||
) -> float: | ||
""" | ||
Return the cost between brick classes token_class and param_class where cost is: | ||
- inf if token_class is not covariant of param_class. | ||
- 2 to the power of the number of hops between the classes. | ||
""" | ||
if str(token_class) == str(param_class): | ||
return 2**cost_power - 1 | ||
|
||
parent_class = BipartiteTokenMapper._get_parent_class(token_class) | ||
if parent_class is None: | ||
return np.inf | ||
|
||
return BipartiteTokenMapper._get_edge_cost( | ||
parent_class, param_class, cost_power + 1 | ||
) | ||
|
||
@staticmethod | ||
def _create_cost_matrix(tokens: List[Token], params: List[Param]) -> pd.DataFrame: | ||
"""Create cost matrix of the above classes.""" | ||
cost_matrix = pd.DataFrame( | ||
index=params, | ||
columns=tokens, | ||
) | ||
|
||
for i, token in enumerate(cost_matrix.columns): | ||
for j, param in enumerate(cost_matrix.index): | ||
cost_matrix.iloc[j, i] = BipartiteTokenMapper._get_edge_cost( | ||
token.classname, param.classname | ||
) | ||
|
||
logger.debug("cost matrix:") | ||
logger.debug( | ||
cost_matrix.rename( | ||
columns=lambda x: x.class_, | ||
index=lambda x: x.class_, | ||
) | ||
) | ||
return cost_matrix | ||
|
||
@staticmethod | ||
def find_bindings_for_tokens_and_params( | ||
tokens: List[Token], | ||
params: List[Param], | ||
) -> Tuple[Dict[URIRef, Token], Cost]: | ||
"""Get the cost of mapping token_classes to param_classes.""" | ||
cost_matrix = BipartiteTokenMapper._create_cost_matrix(tokens, params) | ||
# # uncomment / comment for v different results. | ||
cost_matrix = ( | ||
cost_matrix.replace(np.inf, np.nan) | ||
.dropna(axis=0, how="all") | ||
.replace(np.nan, np.inf) | ||
) | ||
row_ind, col_ind = linear_sum_assignment(cost_matrix) | ||
kept_costs = list(zip(row_ind, col_ind)) | ||
bindings = { | ||
cost_matrix.index[x].name: cost_matrix.columns[y] for x, y in kept_costs | ||
} | ||
|
||
logger.debug("\nkept edges:") | ||
for token_idx, param_idx in kept_costs: | ||
token = cost_matrix.index[token_idx] | ||
param = cost_matrix.columns[param_idx] | ||
logger.debug( | ||
f"{token.class_} <- {cost_matrix.iloc[token_idx, param_idx]} -> {param.class_}" | ||
) | ||
|
||
# if no edges | ||
if len(kept_costs) <= 0: | ||
edge_cost = np.Inf | ||
else: | ||
edge_cost = cost_matrix.to_numpy()[row_ind, col_ind].sum() | ||
|
||
return ( | ||
bindings, | ||
Cost( | ||
edge_cost=edge_cost, | ||
params_dropped=len(params) - len(kept_costs), | ||
tokens_dropped=len(tokens) - len(kept_costs), | ||
), | ||
) | ||
|
||
@staticmethod | ||
def find_bindings_for_tokens_and_template( | ||
tokens: List[Token], | ||
template: Template, | ||
) -> Tuple[Dict[Param, Token], Cost]: | ||
"""Finds the bindings for tokens and template""" | ||
params = get_typed_params(template) | ||
try: | ||
mapping, cost = BipartiteTokenMapper.find_bindings_for_tokens_and_params( | ||
tokens, params | ||
) | ||
except ValueError: | ||
mapping, cost = {}, Cost( | ||
edge_cost=np.inf, | ||
params_dropped=len(params), | ||
tokens_dropped=len(tokens), | ||
) | ||
|
||
return mapping, cost |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
from dataclasses import dataclass | ||
from typing import Dict, List, Optional | ||
|
||
import numpy as np | ||
from rdflib import URIRef | ||
|
||
from buildingmotif.dataclasses import Template | ||
|
||
|
||
@dataclass | ||
class Cost: | ||
edge_cost: float | ||
params_dropped: int | ||
tokens_dropped: int | ||
|
||
@property | ||
def scalar(self): | ||
return ( | ||
self.edge_cost + self.tokens_dropped | ||
) # self.params_dropped + self.tokens_dropped | ||
|
||
@classmethod | ||
def inf(self): | ||
return Cost(edge_cost=np.inf, params_dropped=np.inf, tokens_dropped=np.inf) | ||
|
||
|
||
@dataclass | ||
class Param: | ||
name: URIRef | ||
classname: URIRef | ||
|
||
@property | ||
def class_(self): | ||
return self.classname[self.classname.find("#") + 1 :] | ||
|
||
def __repr__(self): | ||
return f"{self.name} a {self.class_}" | ||
|
||
|
||
@dataclass | ||
class Token: | ||
identifier: str | ||
classname: URIRef | ||
|
||
@property | ||
def class_(self): | ||
return self.classname[self.classname.find("#") + 1 :] | ||
|
||
def __repr__(self): | ||
return f"{self.identifier} (type {self.classname})" | ||
|
||
|
||
@dataclass | ||
class TokenizedLabel: | ||
label: str | ||
tokens: list[Token] | ||
|
||
@staticmethod | ||
def from_dict(d): | ||
return TokenizedLabel( | ||
label=d["label"], | ||
tokens=[ | ||
Token(identifier=t["identifier"], classname=URIRef(t["type"])) | ||
for t in d["tokens"] | ||
], | ||
) | ||
|
||
def __repr__(self): | ||
r = "" | ||
for token in self.tokens: | ||
r += f" - {token}\n" | ||
|
||
return f"{self.label}:\n{r}" | ||
|
||
|
||
@dataclass | ||
class LabelSet: | ||
token_classes: List[URIRef] | ||
labels: List[TokenizedLabel] | ||
|
||
|
||
@dataclass | ||
class Bindings: | ||
label: TokenizedLabel | ||
template: Optional[Template] | ||
bindings: Dict[URIRef, Token] | ||
cost: Cost | ||
|
||
|
||
@dataclass | ||
class UnifiedBindings: | ||
labels: List[TokenizedLabel] | ||
template: Template | ||
bindings: Dict[str, Token] | ||
cost: Cost |
Oops, something went wrong.