Skip to content

Commit

Permalink
[checkpoint_harmony_endpoint] Auth and pagination fixes (#12158)
Browse files Browse the repository at this point in the history
* First round of fixes.

- Use optional access to cursor values for shorter conditions.
- Assume a 200 response from the auth endpoint, rather trying to set
  auth_token from the body of an error response.
- Always get an auth token from cursor data or by fetching a new one,
  and then use it for any following request, and...
- Check for the the absence of a task_id to decide when to submit a
  query, instead of always doing it after getting the auth token.
- Don't try to get body.message from an error response, because they
  actually look like this:
    {
      "success": false,
      "error": {
        "status": 400,
        "name": "Bad Request",
        "details": [
          "pageLimit must not be less than 10"
        ]
      }
    }
- Clarify the comments describing each section.

* Note the minimum page_limit value.

* Fix the rate limiting variables.

* Handle 401 responses for all requests that use the auth token.

* Save the auth token after any response. Don't clear the auth token at the end of a sequence.

A new token may be fetched in the middle of a sequence, so all results
need to save it.

* Remove unused last_page key from cursor data.

* Clear the task ID when done (query task returned no results).

* Improve clarity in comments and order of handling.

* Remove redundant task_ready key from cursor date and just use page_token instead. Don't override page_token to null when it's already null.

* Keep the token expiry time and get a new token 5 mins before expiry.

* Advance startTime and endTime parameters.

* Extend the default interval and improve variable descriptions.

* System test just one data stream, but make it a better test, that covers polling and pagination.

* Version bump, changelog entry.
  • Loading branch information
chrisberkhout authored Dec 24, 2024
1 parent 16bdfda commit d1a9faf
Show file tree
Hide file tree
Showing 24 changed files with 1,186 additions and 1,633 deletions.
586 changes: 33 additions & 553 deletions packages/checkpoint_harmony_endpoint/_dev/deploy/docker/files/config.yml

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions packages/checkpoint_harmony_endpoint/changelog.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
# newer versions go on top
- version: "0.2.1"
changes:
- description: Auth and pagination fixes.
type: bugfix
link: https://github.com/elastic/integrations/pull/12158
- version: "0.2.0"
changes:
- description: Add "preserve_original_event" tag to documents with `event.kind` set to "pipeline_error".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ input: cel
service: harmony
vars:
base_url: http://{{Hostname}}:{{Port}}
client_id: xxxxantibot
access_key: xxxx
initial_interval: 10s
interval: 10s
client_id: testclientid
access_key: testaccesskey
initial_interval: 720h
interval: 5m
limit: 10000
page_limit: 100
page_limit: 10
assert:
hit_count: 3
hit_count: 14
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
config_version: 3
resource.rate_limit.limit: 0.2
resource.rate_limit.burst: 5
resource.rate_limit.limit: {{resource_rate_limit_limit}}
resource.rate_limit.burst: {{resource_rate_limit_burst}}
{{#if enable_request_tracer}}
resource.tracer.filename: "../../logs/cel/http-request-trace-*.ndjson"
resource.tracer.maxbackups: 5
Expand All @@ -15,9 +15,15 @@ state:
page_limit: {{page_limit}}
filter: {{filter}}
program: |-
(!has(state.cursor) || has(state.cursor) && has(state.cursor.auth_token) && state.cursor.auth_token == null) ?
(
// Authenticating using API to retrieve auth token

(
state.?cursor.auth_data.expires.optMap(t,
t.parse_time(time_layout.RFC1123) - now() > duration("5m")
).orValue(false) ?
// Current auth data exists - Use it.
state.cursor.auth_data
:
// No current auth data - Use credentials to fetch a new token.
request("POST", state.url.trim_right("/") + "/auth/external").with(
{
"Header": {
Expand All @@ -30,187 +36,197 @@ program: |-
}.encode_json(),
}
).do_request().as(resp,
(resp.StatusCode == 200) ?
bytes(resp.Body).decode_json().as(body,
body.data.token
)
:
bytes(resp.Body).decode_json().as(body,
body.message
)
).as(auth_token,
// submit logs query to search security event logs
bytes(resp.Body).decode_json().as(body,
{
"token": body.data.token,
"expires": body.data.expires,
}
)
)
).as(auth_data,
(state.?cursor.task_id.orValue(null) == null) ?
// No task ID - Submit a query and get its task ID.
{
"startTime": state.?cursor.next_startTime.orValue(
timestamp(now() - duration(state.initial_interval)).format(time_layout.RFC3339)
),
"endTime": timestamp(now() - duration("1m")).format(time_layout.RFC3339),
}.as(timeframe,
request("POST", state.url.trim_right("/") + "/app/laas-logs-api/api/logs_query").with(
{
"Header": {
"Accept": ["application/json"],
"Content-Type": ["application/json"],
"Authorization": ["Bearer " + auth_token],
"Authorization": ["Bearer " + auth_data.token],
},
"Body": {
"filter": state.filter,
"limit": state.limit,
"pageLimit": state.page_limit,
"cloudService": "Harmony Endpoint",
"timeframe": {
"startTime": (state.?cursor.next_startTime.orValue(null) == null) ?
timestamp(now() - duration(state.initial_interval)).format(time_layout.RFC3339)
:
timestamp(state.cursor.next_startTime).format(time_layout.RFC3339),
"endTime": timestamp(now().format(time_layout.RFC3339)),
"startTime": timeframe.startTime,
"endTime": timeframe.endTime,
},
}.encode_json(),
}
).do_request().as(resp,
(resp.StatusCode == 200) ?
bytes(resp.Body).decode_json().as(body,
state.with(
{
"events": [{ "message": { "event": { "reason": "polling" }}.encode_json() }],
"want_more": true,
"cursor": {
"auth_token": auth_token,
"task_id": body.data.taskId,
"task_ready": false,
"page_token": null,
"next_startTime": (has(state.cursor) && has(state.cursor.next_startTime)) ? state.cursor.next_startTime : null,
"last_page": false,
},
}
)
)
:
(resp.StatusCode != 200) ?
// Any error - We're at the start, so clear everything and retry after interval.
state.with(
{
"events": {
"error": {
"message": "Error " + bytes(resp.Body).decode_json().as(body, body.message),
},
},
"events": {"error": {"message": "Error response: " + string(resp.Body)}},
"want_more": false,
"cursor": state.cursor.with(
{
"auth_token": null,
"auth_data": null,
"task_id": null,
"task_ready": false,
"page_token": null,
"last_page": false,
}
),
}
)
:
// Query submitted - Save the task ID.
bytes(resp.Body).decode_json().as(body,
state.with(
{
"events": [{"message": {"event": {"reason": "polling"}}.encode_json()}],
"want_more": true,
"cursor": {
"auth_data": auth_data,
"task_id": body.data.taskId,
"page_token": null,
"next_startTime": timeframe.endTime,
},
}
)
)
)
)
)
: (has(state.cursor) && has(state.cursor.task_ready) && state.cursor.task_ready == false) ?
(
// submit task ID to Check the progress of specific search event logs task and get the pageTokens
: (state.?cursor.page_token.orValue(null) == null) ?
// Task exists with no page token - Check whether it's ready or done.
request("GET", state.url.trim_right("/") + "/app/laas-logs-api/api/logs_query/" + state.cursor.task_id).with(
{
"Header": {
"Accept": ["application/json"],
"Content-Type": ["application/json"],
"Authorization": ["Bearer " + state.cursor.auth_token],
"Authorization": ["Bearer " + auth_data.token],
},
}
).do_request().as(resp,
bytes(resp.Body).decode_json().as(body,
// 'Ready' - Found results. Ready to retrieve event logs records on the 1st page
(body.data.state == "Ready") ?
state.with(
{
"events": [{ "message": { "event": { "reason": "polling" }}.encode_json() }],
"want_more": true,
"cursor": state.cursor.with(
{
"task_ready": true,
"page_token": body.data.pageTokens[0],
"last_page": null,
}
),
}
)
: (body.data.state == "Done") ?
// 'Done' - The entire specified time range of request has been covered / No results have been found for specified request
state.with(
{
"events": [],
"want_more": false,
"cursor": state.cursor.with(
{
"auth_token": null,
"task_ready": null,
"page_token": null,
"last_page": null,
}
),
}
)
:
state.with(
{
"events": [{ "message": { "event": { "reason": "polling" }}.encode_json() }],
"want_more": true,
"cursor": state.cursor.with(
{
"task_ready": false,
"page_token": null,
"last_page": null,
}
),
}
)
)
)
)
:
// use task ID and pageToken to retrieve event logs results on a specific page
request("POST", state.url.trim_right("/") + "/app/laas-logs-api/api/logs_query/retrieve").with(
{
"Header": {
"Accept": ["application/json"],
"Content-Type": ["application/json"],
"Authorization": ["Bearer " + state.cursor.auth_token],
},
"Body": {
"taskId": state.cursor.task_id,
"pageToken": state.cursor.page_token,
}.encode_json(),
}
).do_request().as(resp,
bytes(resp.Body).decode_json().as(body,
(body.data.nextPageToken == "NULL") ?
(resp.StatusCode == 401) ?
// 401 Unauthorized - Clear the auth data and retry immediately.
state.with(
{
"events": body.data.records.map(e, {"message": e.encode_json()}),
"want_more": false,
"cursor": state.cursor.with(
{
"auth_token": null,
"task_id": null,
"task_ready": null,
"page_token": null,
"last_page": null,
}
),
"events": [{"message": {"event": {"reason": "polling"}}.encode_json()}],
"want_more": true,
"cursor": state.cursor.with({"auth_data": null}),
}
)
:
bytes(resp.Body).decode_json().as(body,
(body.data.state == "Ready") ?
// 'Ready' (Results found) - Save the first page token.
state.with(
{
"events": [{"message": {"event": {"reason": "polling"}}.encode_json()}],
"want_more": true,
"cursor": state.cursor.with(
{
"auth_data": auth_data,
"page_token": body.data.pageTokens[0],
}
),
}
)
: (body.data.state == "Done") ?
// 'Done' (Results empty) - Clear the task ID and end the sequence.
state.with(
{
"events": [],
"want_more": false,
"cursor": state.cursor.with(
{
"auth_data": auth_data,
"task_id": null,
}
),
}
)
:
// Not ready or done - Keep polling.
state.with(
{
"events": [{"message": {"event": {"reason": "polling"}}.encode_json()}],
"want_more": true,
"cursor": state.cursor.with(
{
"auth_data": auth_data,
}
),
}
)
)
)
:
// Task is ready - Use the task ID and page token to retrieve a page of results.
request("POST", state.url.trim_right("/") + "/app/laas-logs-api/api/logs_query/retrieve").with(
{
"Header": {
"Accept": ["application/json"],
"Content-Type": ["application/json"],
"Authorization": ["Bearer " + auth_data.token],
},
"Body": {
"taskId": state.cursor.task_id,
"pageToken": state.cursor.page_token,
}.encode_json(),
}
).do_request().as(resp,
(resp.StatusCode == 401) ?
// 401 Unauthorized - Clear the auth data and retry immediately.
state.with(
{
"events": body.data.records.map(e, {"message": e.encode_json()}),
"events": [{"message": {"event": {"reason": "polling"}}.encode_json()}],
"want_more": true,
"cursor": state.cursor.with(
"cursor": state.cursor.with({"auth_data": null}),
}
)
:
bytes(resp.Body).decode_json().as(body,
(body.data.nextPageToken != "NULL") ?
// Not the last page - Save the next page token and continue.
state.with(
{
"task_ready": true,
"page_token": body.data.nextPageToken,
"last_page": false,
"events": body.data.records.map(e, {"message": e.encode_json()}),
"want_more": true,
"cursor": state.cursor.with(
{
"auth_data": auth_data,
"page_token": body.data.nextPageToken,
}
),
}
),
}
)
:
// Last page - Clear the task ID and page token, and end the sequence.
state.with(
{
"events": body.data.records.map(e, {"message": e.encode_json()}),
"want_more": false,
"cursor": state.cursor.with(
{
"auth_data": auth_data,
"task_id": null,
"page_token": null,
}
),
}
)
)
)
)
)
tags:
{{#if preserve_original_event}}
- preserve_original_event
Expand All @@ -224,4 +240,4 @@ publisher_pipeline.disable_host: true
{{#if processors}}
processors:
{{processors}}
{{/if}}
{{/if}}
Loading

0 comments on commit d1a9faf

Please sign in to comment.