generated from cirrus-geo/cirrus-task-template
-
Notifications
You must be signed in to change notification settings - Fork 0
/
task.py
executable file
·143 lines (112 loc) · 4.65 KB
/
task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#!/usr/bin/env python
import argparse
import json
import logging
import os
import os.path as op
import sys
from shutil import rmtree
from tempfile import mkdtemp
from boto3utils import s3
from cirruslib import Catalog, get_task_logger
from cirruslib.transfer import download_item_assets, upload_item_assets
from version import __version__
TASK_NAME='task'
def handler(payload, context={}, local=None):
""" Handle Cirrus payload (STAC Process Catalog) """
# get catalog
catalog = Catalog.from_payload(payload)
# configure logger
logger = get_task_logger(f"{__name__}.{TASK_NAME}", catalog=catalog)
# these are any optional parameter provided for this task
config = catalog['process']['tasks'].get(TASK_NAME, {})
# these are general options used when uploading output data to s3
outopts = payload['process'].get('output_options', {})
# validation - add specific checks on input
# e.g., if task operates on one and only Item use this:
assert(len(catalog['features']) == 1)
item = catalog['features'][0]
# create temporary work directory if not running locally
tmpdir = mkdtemp() if local is None else local
outpath = op.join(tmpdir, 'output')
os.makedirs(outpath, exist_ok=True)
try:
# main logic - replace with own
# download asset, e.g. a thumbnail
item = download_item_assets(item, path=outpath, assets=['thumbnail'])
# do something, e.g. modify asset, create new asset
# item['assets']['asset2'] = create_new_asset(item)
# upload new assets
if local is not None:
item = upload_item_assets(item, assets=['asset2'], **outopts)
# recommended to add derived_from link
links = [l['href'] for l in item['links'] if l['rel'] == 'self']
if len(links) == 1:
# add derived from link
item ['links'].append({
'title': 'Source STAC Item',
'rel': 'derived_from',
'href': links[0],
'type': 'application/json'
})
catalog['features'][0] = item
except Exception as err:
msg = f"**task** failed: {err}"
logger.error(msg, exc_info=True)
raise Exception(msg)
finally:
# remove work directory if not running locally
if local is None:
logger.debug('Removing work directory %s' % tmpdir)
rmtree(tmpdir)
return catalog
def parse_args(args):
""" Parse CLI arguments """
desc = 'cirrus task'
dhf = argparse.ArgumentDefaultsHelpFormatter
parser0 = argparse.ArgumentParser(description=desc)
pparser = argparse.ArgumentParser(add_help=False)
pparser.add_argument('--version', help='Print version and exit', action='version', version=__version__)
pparser.add_argument('--log', default=2, type=int,
help='0:all, 1:debug, 2:info, 3:warning, 4:error, 5:critical')
subparsers = parser0.add_subparsers(dest='command')
# process subcommand
h = 'Locally process (development)'
parser = subparsers.add_parser('local', parents=[pparser], help=h, formatter_class=dhf)
parser.add_argument('filename', help='Full path of payload to process')
parser.add_argument('--workdir', help='Use this as work directory', default='')
# Cirrus process subcommand
h = 'Process Cirrus STAC Process Catalog'
parser = subparsers.add_parser('cirrus', parents=[pparser], help=h, formatter_class=dhf)
parser.add_argument('url', help='s3 url to STAC Process Catalog')
# turn Namespace into dictionary
pargs = vars(parser0.parse_args(args))
# only keep keys that are not None
pargs = {k: v for k, v in pargs.items() if v is not None}
if pargs.get('command', None) is None:
parser.print_help()
sys.exit(0)
return pargs
def cli():
args = parse_args(sys.argv[1:])
cmd = args.pop('command')
# logging
logging.basicConfig(stream=sys.stdout,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=args.pop('log') * 10)
# quiet these loud loggers
logging.getLogger("botocore").propagate = False
logging.getLogger("s3transfer").propagate = False
logging.getLogger("urllib3").propagate = False
if cmd == 'local':
with open(args['filename']) as f:
payload = json.loads(f.read())
handler(payload, local=args['workdir'])
if cmd == 'cirrus':
# fetch input catalog
catalog = s3().read_json(args['url'])
catalog = handler(catalog)
# upload return payload
s3().upload_json(catalog, args["url"].replace('.json', '_out.json'))
if __name__ == "__main__":
cli()