From a509e4190324582dc7fafe5b706338792e7e2854 Mon Sep 17 00:00:00 2001 From: Paige Gulley Date: Tue, 23 Jul 2024 16:14:29 -0400 Subject: [PATCH 1/6] Overview query breakout, untested --- api.py | 57 ++++++++++++++++++++++++++ client.py | 120 ++++++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 147 insertions(+), 30 deletions(-) diff --git a/api.py b/api.py index ec92e87..d009af8 100755 --- a/api.py +++ b/api.py @@ -249,6 +249,63 @@ def search_overview_via_payload(collection: Collection, req: Request, payload: Q return ES.search_overview(collection.name, payload.q) +@v1.get("/{collection}/search/daily_counts", tags=["data"]) +@v1.head("/{collection}/search/daily_counts", include_in_schema=False) +def search_daily_counts(collection: Collection, q: str, req: Request): + """ + Report overview summary of the search result + """ + return ES.daily_counts(collection.name, q) + + +@v1.post("/{collection}/search/daily_counts", tags=["data"]) +def search_daily_counts_via_payload( + collection: Collection, req: Request, payload: Query +): + """ + Report summary of the search result + """ + return ES.daily_counts(collection.name, payload.q) + + +@v1.get("/{collection}/search/top_languages", tags=["data"]) +@v1.head("/{collection}/search/top_languages", include_in_schema=False) +def search_top_languages(collection: Collection, q: str, req: Request): + """ + Report overview summary of the search result + """ + return ES.top_languages(collection.name, q) + + +@v1.post("/{collection}/search/top_languages", tags=["data"]) +def search_top_languages_via_payload( + collection: Collection, req: Request, payload: Query +): + """ + Report summary of the search result + """ + return ES.top_languages(collection.name, payload.q) + + +@v1.get("/{collection}/search/top_domains", tags=["data"]) +@v1.head("/{collection}/search/top_domains", include_in_schema=False) +def search_top_domains(collection: Collection, q: str, req: Request): + """ + Report overview summary of the search result + """ + return ES.top_domains(collection.name, q) + + +@v1.post("/{collection}/search/top_domains", tags=["data"]) +def search_top_domains_via_payload( + collection: Collection, req: Request, payload: Query +): + """ + Report summary of the search result + """ + return ES.top_domains(collection.name, payload.q) + + @v1.get("/{collection}/search/result", tags=["data"]) @v1.head("/{collection}/search/result", include_in_schema=False) def search_result_via_query_params( diff --git a/client.py b/client.py index 50630dc..40a49f5 100644 --- a/client.py +++ b/client.py @@ -43,6 +43,21 @@ class QueryBuilder: VALID_SORT_ORDERS = ["asc", "desc"] VALID_SORT_FIELDS = ["publication_date", "indexed_date"] + class Aggregators(Enum): + DAILY_COUNTS = { + "dailycounts": { + "date_histogram": { + "field": "publication_date", + "calendar_interval": "day", + "min_doc_count": 1, + } + } + } + TOP_LANGS = {"toplangs": {"terms": {"field": "language.keyword", "size": 100}}} + TOP_DOMAINS = { + "topdomains": {"terms": {"field": "canonical_domain", "size": 100}} + } + def __init__(self, query_text): self.query_text = query_text self._source = [ @@ -95,28 +110,17 @@ def basic_query(self, expanded: bool = False) -> Dict: } return default - def overview_query(self): + def aggregator_query(self, *aggs: "QueryBuilder.Aggregators") -> Dict: query = self.basic_query() query.update( { - "aggregations": { - "daily": { - "date_histogram": { - "field": "publication_date", - "calendar_interval": "day", - "min_doc_count": 1, - } - }, - "lang": {"terms": {"field": "language.keyword", "size": 100}}, - "domain": {"terms": {"field": "canonical_domain", "size": 100}}, - "tld": {"terms": {"field": "tld", "size": 100}}, - }, + "aggregations": {k: v for agg in aggs for k, v in agg.value.items()}, "track_total_hits": True, } ) return query - def terms_query(self, field): + def terms_query(self, field) -> Dict: resct = 200 aggr_map = { "terms": { @@ -170,7 +174,7 @@ def paged_query( query["search_after"] = [decode_key(resume)] return query - def article_query(self): + def article_query(self) -> Dict: default: dict = { "_source": self._expanded_source, "query": {"match": {"_id": self.query_text}}, @@ -249,30 +253,86 @@ def format_day_counts(self, bucket: list): def format_counts(self, bucket: list): return {item["key"]: item["doc_count"] for item in bucket} - def search_overview(self, collection: str, q: str): + def aggregator_query( + self, collection: str, q: str, *aggs: QueryBuilder.Aggregators, **options + ): """ - Get overview statistics for a query + Abstraction to DRY out permutations of the 'overview' query getting broken out into their own calls """ - res = self.ES.search(index=collection, body=QueryBuilder(q).overview_query()) # type: ignore [call-arg] + query_body = QueryBuilder(q).aggregator_query(*aggs) + + res = self.ES.search(index=collection, body=query_body) # type: ignore [call-arg] if not res["hits"]["hits"]: raise HTTPException(status_code=404, detail="No results found!") total = res["hits"]["total"]["value"] - tldsum = sum( - item["doc_count"] for item in res["aggregations"]["tld"]["buckets"] - ) - return { + return_dict = { "query": q, - "total": max(total, tldsum), - "topdomains": self.format_counts(res["aggregations"]["domain"]["buckets"]), - "toptlds": self.format_counts(res["aggregations"]["tld"]["buckets"]), - "toplangs": self.format_counts(res["aggregations"]["lang"]["buckets"]), - "dailycounts": self.format_day_counts( - res["aggregations"]["daily"]["buckets"] - ), - "matches": [self.format_match(h, collection) for h in res["hits"]["hits"]], } + # Add the results of each aggregator to the return value + for agg in aggs: + agg_name = next(iter(agg.value.keys())) + return_dict.update( + {agg_name: self.format_counts(res["aggregations"][agg_name]["buckets"])} + ) + + # Only return the total and matches if explicitly requested + if "overview" in options: + return_dict.update( + { + "total": total, + "matches": [ # type: ignore [dict-item] + self.format_match(h, collection) for h in res["hits"]["hits"] + ], + } + ) + + return return_dict + + def search_overview(self, collection: str, q: str): + """ + Get overview statistics for a query + """ + return self.aggregator_query( + collection, + q, + QueryBuilder.Aggregators.DAILY_COUNTS, + QueryBuilder.Aggregators.TOP_LANGS, + QueryBuilder.Aggregators.TOP_DOMAINS, + overview=True, + ) + + def daily_counts(self, collection: str, q: str): + """ + Return just a daily count histogram for a query + """ + return self.aggregator_query( + collection, + q, + QueryBuilder.Aggregators.DAILY_COUNTS, + ) + + def top_languages(self, collection: str, q: str): + """ + Return top languagues for a query + """ + return self.aggregator_query( + collection, + q, + QueryBuilder.Aggregators.TOP_LANGS, + ) + + def top_domains(self, collection: str, q: str): + """ + Return top domains for a query + """ + return self.aggregator_query( + collection, + q, + QueryBuilder.Aggregators.TOP_LANGS, + ) + def search_result( self, collection: str, From 2eba97ed4dcd53591e3764e006fed30c42c1b210 Mon Sep 17 00:00:00 2001 From: Paige Gulley Date: Wed, 24 Jul 2024 14:45:43 -0400 Subject: [PATCH 2/6] Adding tests for new endpoints --- client.py | 2 +- test/api_test.py | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/client.py b/client.py index 40a49f5..c8f1073 100644 --- a/client.py +++ b/client.py @@ -330,7 +330,7 @@ def top_domains(self, collection: str, q: str): return self.aggregator_query( collection, q, - QueryBuilder.Aggregators.TOP_LANGS, + QueryBuilder.Aggregators.TOP_DOMAINS, ) def search_result( diff --git a/test/api_test.py b/test/api_test.py index 93357bd..0b829a7 100644 --- a/test/api_test.py +++ b/test/api_test.py @@ -364,3 +364,36 @@ def test_top_terms(self): results = response.json() assert response.status_code == 200 assert len(results) > 0 + + def test_daily_counts(self): + response = self._client.post( + f"/v1/{INDEX_NAME}/search/daily_counts", + json={"q": "mediacloud"}, + timeout=TIMEOUT, + ) + + results = response.json() + assert response.status_code == 200 + assert "dailycounts" in results + + def test_top_languages(self): + response = self._client.post( + f"/v1/{INDEX_NAME}/search/top_languages", + json={"q": "mediacloud"}, + timeout=TIMEOUT, + ) + + results = response.json() + assert response.status_code == 200 + assert "toplangs" in results + + def test_top_domains(self): + response = self._client.post( + f"/v1/{INDEX_NAME}/search/top_domains", + json={"q": "mediacloud"}, + timeout=TIMEOUT, + ) + + results = response.json() + assert response.status_code == 200 + assert "topdomains" in results From 1351ed01539f24ceeb9ad016e998805322306f5b Mon Sep 17 00:00:00 2001 From: Paige Gulley Date: Wed, 24 Jul 2024 19:48:07 -0400 Subject: [PATCH 3/6] Adding .keyword significantly improves performance --- client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client.py b/client.py index c8f1073..41aae65 100644 --- a/client.py +++ b/client.py @@ -55,7 +55,7 @@ class Aggregators(Enum): } TOP_LANGS = {"toplangs": {"terms": {"field": "language.keyword", "size": 100}}} TOP_DOMAINS = { - "topdomains": {"terms": {"field": "canonical_domain", "size": 100}} + "topdomains": {"terms": {"field": "canonical_domain.keyword", "size": 100}} } def __init__(self, query_text): From cf465e51b74383ac9d5eb7d08faa73c4e305ecb7 Mon Sep 17 00:00:00 2001 From: Paige Gulley Date: Fri, 26 Jul 2024 14:07:34 -0400 Subject: [PATCH 4/6] more grokkable --- client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client.py b/client.py index 41aae65..9fc82ca 100644 --- a/client.py +++ b/client.py @@ -272,7 +272,7 @@ def aggregator_query( # Add the results of each aggregator to the return value for agg in aggs: - agg_name = next(iter(agg.value.keys())) + agg_name = list(agg.value.keys())[0] return_dict.update( {agg_name: self.format_counts(res["aggregations"][agg_name]["buckets"])} ) From fafa04fe3499ee554a7ebbf342bd9bf6246a36d6 Mon Sep 17 00:00:00 2001 From: Paige Gulley Date: Fri, 26 Jul 2024 15:05:10 -0400 Subject: [PATCH 5/6] add back in aggregation total count backup using top_domains --- client.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/client.py b/client.py index 9fc82ca..5f1e6c8 100644 --- a/client.py +++ b/client.py @@ -266,6 +266,7 @@ def aggregator_query( raise HTTPException(status_code=404, detail="No results found!") total = res["hits"]["total"]["value"] + return_dict = { "query": q, } @@ -279,9 +280,20 @@ def aggregator_query( # Only return the total and matches if explicitly requested if "overview" in options: + if QueryBuilder.Aggregators.TOP_DOMAINS not in aggs: + raise HTTPException( + status_code=500, + detail="Can't run overview query without top_domains aggregator", + ) + + domain_sum = sum( + item["doc_count"] + for item in res["aggregations"]["topdomains"]["buckets"] + ) + return_dict.update( { - "total": total, + "total": max(total, domain_sum), "matches": [ # type: ignore [dict-item] self.format_match(h, collection) for h in res["hits"]["hits"] ], From 3dbaef05c3e9ec8395ff11edeb6b4ed04e719409 Mon Sep 17 00:00:00 2001 From: Paige Gulley Date: Fri, 26 Jul 2024 17:27:52 -0400 Subject: [PATCH 6/6] comment to explain the top_domains aggregation in overview --- client.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/client.py b/client.py index 5f1e6c8..7e64d7e 100644 --- a/client.py +++ b/client.py @@ -280,6 +280,9 @@ def aggregator_query( # Only return the total and matches if explicitly requested if "overview" in options: + # We use a sum of the top_domains to supplement the total, as elasticsearch has a hard limit + # of 10,000 results per page in a source query, but aggregators can go around this. + # if QueryBuilder.Aggregators.TOP_DOMAINS not in aggs: raise HTTPException( status_code=500,