Skip to content

Commit

Permalink
change test when state argument is missing (demisto#30773)
Browse files Browse the repository at this point in the history
* change test when state argument is missing

* docker image

* Update Packs/ProofpointThreatResponse/ReleaseNotes/2_0_15.md

Co-authored-by: yuvalbenshalom <[email protected]>

---------

Co-authored-by: yuvalbenshalom <[email protected]>
  • Loading branch information
jbabazadeh and yuvalbenshalom authored Nov 9, 2023
1 parent 0b77b68 commit b6d5998
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@


def get_list(list_id):
fullurl = BASE_URL + 'api/lists/{}/members.json'.format(list_id)
fullurl = BASE_URL + f'api/lists/{list_id}/members.json'
res = requests.get(
fullurl,
headers={
Expand All @@ -36,7 +36,7 @@ def get_list(list_id):
)

if res.status_code < 200 or res.status_code >= 300:
return_error('Get list failed. URL: {}, StatusCode: {}'.format(fullurl, res.status_code))
return_error(f'Get list failed. URL: {fullurl}, StatusCode: {res.status_code}')

return res.json()

Expand All @@ -50,7 +50,7 @@ def get_list_command():


def add_to_list(list_id, indicator, comment, expiration):
fullurl = BASE_URL + 'api/lists/{}/members.json'.format(list_id)
fullurl = BASE_URL + f'api/lists/{list_id}/members.json'

indicator = {
'member': indicator
Expand All @@ -71,7 +71,7 @@ def add_to_list(list_id, indicator, comment, expiration):
)

if res.status_code < 200 or res.status_code >= 300:
return_error('Add to list failed. URL: {}, Request Body: {}'.format(fullurl, json.dumps(indicator)))
return_error(f'Add to list failed. URL: {fullurl}, Request Body: {json.dumps(indicator)}')

return res.json()

Expand All @@ -86,7 +86,7 @@ def add_to_list_command():
message = ''
for indicator in indicators:
add_to_list(list_id, indicator, comment, expiration)
message += '{} added successfully to {}\n'.format(indicator, list_id)
message += f'{indicator} added successfully to {list_id}\n'

demisto.results(message)

Expand All @@ -100,7 +100,7 @@ def block_ip_command():
message = ''
for ip in ips:
add_to_list(list_id, ip, None, expiration)
message += '{} added successfully to block_ip list\n'.format(ip)
message += f'{ip} added successfully to block_ip list\n'

demisto.results(message)

Expand All @@ -114,7 +114,7 @@ def block_domain_command():
message = ''
for domain in domains:
add_to_list(list_id, domain, None, expiration)
message += '{} added successfully to block_domain list\n'.format(domain)
message += f'{domain} added successfully to block_domain list\n'

demisto.results(message)

Expand All @@ -128,7 +128,7 @@ def block_url_command():
message = ''
for url in urls:
add_to_list(list_id, url, None, expiration)
message += '{} added successfully to block_url list\n'.format(url)
message += f'{url} added successfully to block_url list\n'

demisto.results(message)

Expand All @@ -142,7 +142,7 @@ def block_hash_command():
message = ''
for h in hashes:
add_to_list(list_id, h, None, expiration)
message += '{} added successfully to block_hash list\n'.format(h)
message += f'{h} added successfully to block_hash list\n'

demisto.results(message)

Expand Down Expand Up @@ -170,10 +170,10 @@ def search_indicator_command():
def delete_indicator(list_id, indicator_filter):
indicator = search_indicators(list_id, indicator_filter)
if len(indicator) == 0:
return_error('{} not exists in {}'.format(indicator_filter, list_id))
return_error(f'{indicator_filter} not exists in {list_id}')

indicator_id = indicator.get('id') # pylint: disable=E1101
fullurl = BASE_URL + 'api/lists/{}/members/{}.json'.format(list_id, indicator_id)
fullurl = BASE_URL + f'api/lists/{list_id}/members/{indicator_id}.json'
res = requests.delete(
fullurl,
headers={
Expand All @@ -182,7 +182,7 @@ def delete_indicator(list_id, indicator_filter):
verify=VERIFY_CERTIFICATE
)
if res.status_code < 200 or res.status_code >= 300:
return_error('Delete indicator failed. URL: {}, StatusCode: {}'.format(fullurl, res.status_code))
return_error(f'Delete indicator failed. URL: {fullurl}, StatusCode: {res.status_code}')


def delete_indicator_command():
Expand All @@ -191,7 +191,7 @@ def delete_indicator_command():
indicator = demisto.args().get('indicator')
delete_indicator(list_id, indicator)

demisto.results('{} deleted successfully from list {}'.format(list_id, indicator))
demisto.results(f'{list_id} deleted successfully from list {indicator}')


def test():
Expand All @@ -200,7 +200,9 @@ def test():
Returns:
'ok' if test passed, anything else will fail the test.
"""

integration_params = demisto.params()
if integration_params.get('isFetch') and not integration_params.get('states'):
raise DemistoException("Missing argument - You must provide at least one incident state.")
get_incidents_request(
{
'created_after': date.today(),
Expand All @@ -220,7 +222,7 @@ def create_incident_field_context(incident):
Returns:
list. The parsed incident fields list
"""
incident_field_values = dict()
incident_field_values = {}
for incident_field in incident.get('incident_field_values', []):
incident_field_values[incident_field['name'].replace(" ", "_")] = incident_field['value']

Expand Down Expand Up @@ -334,7 +336,7 @@ def get_incident_command():
args = demisto.args()
incident_id = args.pop('incident_id')
expand_events = args.get('expand_events')
fullurl = BASE_URL + 'api/incidents/{}.json'.format(incident_id)
fullurl = BASE_URL + f'api/incidents/{incident_id}.json'
incident_data = requests.get(
fullurl,
headers={
Expand All @@ -348,7 +350,7 @@ def get_incident_command():
)

if incident_data.status_code < 200 or incident_data.status_code >= 300:
return_error('Get incident failed. URL: {}, StatusCode: {}'.format(fullurl, incident_data.status_code))
return_error(f'Get incident failed. URL: {fullurl}, StatusCode: {incident_data.status_code}')

incident_data = incident_data.json()
human_readable = create_incidents_human_readable('Incident Results:', [incident_data])
Expand All @@ -370,11 +372,7 @@ def pass_sources_list_filter(incident, sources_list):
if len(sources_list) == 0:
return True

for source in sources_list:
if source in incident.get("event_sources"):
return True

return False
return any(source in incident.get('event_sources') for source in sources_list)


def pass_abuse_disposition_filter(incident, abuse_disposition_values):
Expand All @@ -391,9 +389,8 @@ def pass_abuse_disposition_filter(incident, abuse_disposition_values):
return True

for incident_field in incident.get('incident_field_values', []):
if incident_field['name'] == 'Abuse Disposition':
if incident_field['value'] in abuse_disposition_values:
return True
if incident_field['name'] == 'Abuse Disposition' and incident_field['value'] in abuse_disposition_values:
return True

return False

Expand Down Expand Up @@ -448,7 +445,7 @@ def get_incidents_request(params):
'You may consider adding a filter argument to the command.\n'
'URL: {}, StatusCode: {}'.format(fullurl, incidents_list.status_code))
else:
return_error('The operation failed. URL: {}, StatusCode: {}'.format(fullurl, incidents_list.status_code))
return_error(f'The operation failed. URL: {fullurl}, StatusCode: {incidents_list.status_code}')

return incidents_list.json()

Expand Down Expand Up @@ -546,7 +543,7 @@ def get_incidents_batch_by_time_request(params):
# updating params according to the new times
request_params['created_after'] = created_after.isoformat().split('.')[0] + 'Z'
request_params['created_before'] = created_before.isoformat().split('.')[0] + 'Z'
demisto.debug("End of the current batch loop with {} incidents".format(str(len(incidents_list))))
demisto.debug(f"End of the current batch loop with {str(len(incidents_list))} incidents")

# fetching the last batch when created_before is bigger then current time = fetching new incidents
if len(incidents_list) < fetch_limit:
Expand Down Expand Up @@ -597,7 +594,7 @@ def fetch_incidents_command():
for incident in incidents_list:
id = incident.get('id')
inc = {
'name': 'ProofPoint_TRAP - ID {}'.format(id),
'name': f'ProofPoint_TRAP - ID {id}',
'rawJSON': json.dumps(incident),
'occurred': incident['created_at']
}
Expand All @@ -615,7 +612,7 @@ def fetch_incidents_command():
demisto.setLastRun({'last_fetch': last_fetch})
demisto.setLastRun({'last_fetched_incident_id': last_fetched_id})

demisto.info('extracted {} incidents'.format(len(incidents)))
demisto.info(f'extracted {len(incidents)} incidents')

demisto.incidents(incidents)

Expand All @@ -640,7 +637,7 @@ def create_add_comment_human_readable(incident):
'Action ID': incident.get('id')
})

return tableToMarkdown('Comments added successfully to incident:{}'.format(incident_id), human_readable,
return tableToMarkdown(f'Comments added successfully to incident:{incident_id}', human_readable,
human_readable_headers, removeNull=True)


Expand All @@ -657,7 +654,7 @@ def add_comment_to_incident_command():
"detail": details
}

fullurl = BASE_URL + 'api/incidents/{}/comments.json'.format(incident_id)
fullurl = BASE_URL + f'api/incidents/{incident_id}/comments.json'
incident_data = requests.post(
fullurl,
headers={
Expand Down Expand Up @@ -708,25 +705,25 @@ def add_user_to_incident_command():
return_error('Add comment to incident command failed. URL: {}, '
'StatusCode: {}'.format(fullurl, incident_data.status_code))

return_outputs('The user was added successfully to incident {}'.format(incident_id), {}, {})
return_outputs(f'The user was added successfully to incident {incident_id}', {}, {})


def parse_json_argument(argument_string_value, argument_name):
parsed_arg = {}
try:
parsed_arg = json.loads(argument_string_value)
except ValueError as error:
return_error("The '{}' argument is not a valid json. Error: {}".format(argument_name, error))
return_error(f"The '{argument_name}' argument is not a valid json. Error: {error}")
if not parsed_arg.get(argument_name):
return_error("The '{}' json argument should start with a key named '{}'".format(argument_name, argument_name))
return_error(f"The '{argument_name}' json argument should start with a key named '{argument_name}'")

return parsed_arg


def prepare_ingest_alert_request_body(args):
json_arguments = ['attacker', 'cnc_host', 'detector', 'email', 'forensics_hosts', 'target', 'threat_info',
'custom_fields']
request_body = dict() # type: dict
request_body = {} # type: dict
for argument_name, argument_value in args.items():
if argument_name in json_arguments:
parsed_argument = parse_json_argument(argument_value, argument_name)
Expand All @@ -749,7 +746,7 @@ def ingest_alert_command():
"either as an argument or as an integration parameter.")

request_body = prepare_ingest_alert_request_body(assign_params(**args))
fullurl = BASE_URL + 'threat/json_event/events/{}'.format(json_source_id)
fullurl = BASE_URL + f'threat/json_event/events/{json_source_id}'
alert_data = requests.post(
fullurl,
headers={
Expand Down Expand Up @@ -791,7 +788,7 @@ def close_incident_command():
return_error('Incident closure failed. URL: {}, '
'StatusCode: {}'.format(fullurl, incident_data.status_code))

return_outputs('The incident {} was successfully closed'.format(incident_id), {}, {})
return_outputs(f'The incident {incident_id} was successfully closed', {}, {})


def search_quarantine():
Expand Down Expand Up @@ -892,7 +889,7 @@ def search_quarantine():
def main():
handle_proxy(demisto.params().get('proxy'))
command = demisto.command()
demisto.info('Command being called is {}'.format(command))
demisto.info(f'Command being called is {command}')

if command == 'test-module':
test()
Expand Down
Loading

0 comments on commit b6d5998

Please sign in to comment.