-
Notifications
You must be signed in to change notification settings - Fork 3
/
datapackage.py
1787 lines (1639 loc) · 84.1 KB
/
datapackage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os
import sys
import re
import json
import csv
import logging
import itertools
from collections import UserString
import sqlite3
from deriva.core import DerivaServer, get_credential, urlquote, topo_sorted, tag, DEFAULT_SESSION_CONFIG
from deriva.core.ermrest_model import Model, Table, Column, Key, ForeignKey, builtin_types
import requests
from . import tableschema
from .tableschema import PackageDataName, submission_schema_json, constituent_schema_json, portal_prep_schema_json, portal_schema_json, registry_schema_json
from .exception import IncompatibleDatapackageModel, InvalidDatapackage
"""
Basic C2M2 catalog sketch
Demonstrates use of deriva-py APIs:
- server authentication (assumes active deriva-auth agent)
- catalog creation
- model provisioning
- basic configuration of catalog ACLs
- small Chaise presentation tweaks via model annotations
- simple insertion of tabular content
"""
logger = logging.getLogger(__name__)
if 'history_capture' not in tag:
tag['history_capture'] = 'tag:isrd.isi.edu,2020:history-capture'
if 'cfde_resource_src_rmt' not in tag:
tag['cfde_resource_src_rmt'] = 'tag:nih-cfde.org,2022:resource-src-rmt'
def sql_identifier(s):
return '"%s"' % (s.replace('"', '""'),)
def sql_literal(s):
if isinstance(s, str):
return "'%s'" % (s.replace("'", "''"),)
elif isinstance(s, (int, float)):
return s
else:
raise TypeError('Unexpected type %s in sql_literal(%r)' % (type(s), s))
def make_session_config():
"""Return custom requests session_config for our data submission scenarios
"""
session_config = DEFAULT_SESSION_CONFIG.copy()
session_config.update({
# our PUT/POST to ermrest is idempotent
"allow_retry_on_all_methods": True,
# do more retries before aborting
"retry_read": 8,
"retry_connect": 5,
# increase delay factor * 2**(n-1) for Nth retry
"retry_backoff_factor": 5,
})
return session_config
def tables_topo_sorted(tables):
"""Return tables topologically sorted to put dependant after references tables.
:param tables: iterable of table instances
"""
def tname(table):
return (
table.schema.name,
table.name
)
def target_tname(fkey):
return (
fkey.referenced_columns[0].table.schema.name,
fkey.referenced_columns[0].table.name
)
name_map = { tname(table): table for table in tables }
return [
name_map[tname_pair]
for tname_pair in topo_sorted({
tname(table): [
target_tname(fkey)
for fkey in table.foreign_keys
if target_tname(fkey) != tname(table) and target_tname(fkey) in name_map
]
for table in tables
})
]
class CfdeDataPackage (object):
# the translation stores frictionless table resource metadata under this annotation
resource_tag = 'tag:isrd.isi.edu,2019:table-resource'
# the translation leaves extranneous table-schema stuff under this annotation
# (i.e. stuff that perhaps wasn't translated to deriva equivalents)
schema_tag = 'tag:isrd.isi.edu,2019:table-schema-leftovers'
batch_size = 2000 # how may rows we'll send to ermrest
batch_bytes_limit = 256*1024 # 0.25MB
def __init__(self, package_filename, configurator=None):
"""Construct CfdeDataPackage from given package definition filename.
Special singletons in this module select built-in data:
- portal_schema_json
- registry_schema_json
"""
if not isinstance(package_filename, (str, PackageDataName)):
raise TypeError('package_filename must be a str filepath or built-in package data name')
if not isinstance(configurator, (tableschema.CatalogConfigurator, type(None))):
raise TypeError('configurator must be an instance of tableschema.CatalogConfigurator or None')
if configurator is None:
self.configurator = tableschema.ReviewConfigurator()
else:
self.configurator = configurator
self.package_filename = package_filename
self.catalog = None
self.cat_model_root = None
self.cat_cfde_schema = None
self.cat_has_history_control = None
# load 2 copies... first is mutated during translation
if isinstance(package_filename, PackageDataName):
package_def = json.loads(package_filename.get_data_str())
self.package_def = json.loads(package_filename.get_data_str())
else:
with open(self.package_filename, 'r') as f:
package_def = json.load(f)
with open(self.package_filename, 'r') as f:
self.package_def = json.load(f)
self.model_doc = tableschema.make_model(package_def, configurator=self.configurator, trusted=isinstance(self.package_filename, PackageDataName))
self.doc_model_root = Model(None, self.model_doc)
self.doc_cfde_schema = self.doc_model_root.schemas.get('CFDE')
if not set(self.model_doc['schemas']).issubset({'CFDE', 'public', 'c2m2'}):
raise ValueError('Unexpected schema set in data package: %s' % (set(self.model_doc['schemas']),))
def set_catalog(self, catalog, registry=None):
self.catalog = catalog
self.configurator.set_catalog(catalog, registry)
self.get_model()
self.cat_has_history_control = catalog.get('/').json().get("features", {}).get("history_control", False)
def get_model(self):
self.cat_model_root = self.catalog.getCatalogModel()
self.cat_cfde_schema = self.cat_model_root.schemas.get('CFDE')
def _compare_model_docs(self, candidate, absent_table_ok=True, absent_column_ok=True, absent_nonnull_ok=True, extra_table_ok=False, extra_column_ok=False, extra_fkey_ok=False, extra_nonnull_ok=True):
"""General-purpose model comparison to serve validation functions.
:param candidate: A CfdeDatapackage instance being evaluated with self as baseline.
:param absent_table_ok: Whether candidate is allowed to omit tables.
:param absent_column_ok: Whether candidate is allowed to omit non-critical columns.
:param extra_table_ok: Whether candidate is allowed to include tables.
:param extra_column_ok: Whether candidate is allowed to include non-critical columns.
:param extra_fkey_ok: Whether candidate is allowed to include foreign keys on extra, non-critical columns.
:param absent_nonnull_ok: Whether candidate is allowed to omit a non-null constraint.
:param extra_nonnull_ok: Whether candidate is allowed to include extra non-null constraints.
For model comparisons, a non-critical column is one which is
allowed to contain NULL values.
Raises IncompatibleDatapackageModel if candidate fails validation tests.
"""
baseline_tnames = set(self.doc_cfde_schema.tables.keys())
if self.package_filename is portal_schema_json:
# we have extra vocab tables not in the offical C2M2 schema
# where it uses enumeration!
baseline_tnames.difference_update({
'subject_granularity',
'subject_role',
'sex',
'race',
'ethnicity',
'disease_association_type',
'phenotype_association_type',
})
candidate_tnames = set(candidate.doc_cfde_schema.tables.keys())
missing_tnames = baseline_tnames.difference(candidate_tnames)
extra_tnames = candidate_tnames.difference(baseline_tnames)
if missing_tnames and not absent_table_ok:
raise IncompatibleDatapackageModel(
'Missing resources: %s' % (','.join(missing_tnames),)
)
if extra_tnames and not extra_table_ok:
raise IncompatibleDatapackageModel(
'Extra resources: %s' % (','.join(extra_tnames),)
)
for tname in baseline_tnames.intersection(candidate_tnames):
baseline_table = self.doc_cfde_schema.tables[tname]
candidate_table = candidate.doc_cfde_schema.tables[tname]
baseline_cnames = set(baseline_table.columns.elements.keys())
candidate_cnames = set(candidate_table.columns.elements.keys())
missing_cnames = baseline_cnames.difference(candidate_cnames)
extra_cnames = candidate_cnames.difference(baseline_cnames)
missing_nonnull_cnames = [
cname for cname in missing_cnames
if (not baseline_table.columns[cname].nullok) and (baseline_table.columns[cname].default is None)
]
extra_nonnull_cnames = [ cname for cname in extra_cnames if not candidate_table.columns[cname].nullok ]
if missing_cnames and not absent_column_ok:
raise IncompatibleDatapackageModel(
'Missing columns in resource %s: %s' % (tname, ','.join(missing_cnames),)
)
if missing_nonnull_cnames and not absent_nonnull_ok:
raise IncompatibleDatapackageModel(
'Missing non-nullable columns in resource %s: %s' % (tname, ','.join(missing_nonnull_cnames),)
)
if extra_cnames and not extra_column_ok:
raise IncompatibleDatapackageModel(
'Extra columns in resource %s: %s' % (tname, ','.join(extra_cnames),)
)
if extra_nonnull_cnames and not extra_nonnull_ok:
raise IncompatibleDatapackageModel(
'Extra non-nullable columns in resource %s: %s' % (tname, ','.join(extra_nonnull_cnames),)
)
# TBD: should this be a method in deriva-py ermrest_model.Type class?
def type_equal(t1, t2):
# be tolerant where we abuse "format" to specify subtypes for postgres
if t1.typename in {'int4', 'int8'}:
if t2.typename in {'int4', 'int8'}:
return True
if t1.typename in {'serial4', 'serial8'}:
if t2.typename in {'serial4', 'serial8'}:
return True
if t1.typename != t2.typename:
return False
if t1.is_domain != t2.is_domain or t1.is_array != t2.is_array:
return False
if t1.is_domain or t1.is_array:
return type_equal(t1.base_type, t2.base_type)
return True
for cname in baseline_cnames.intersection(candidate_cnames):
baseline_col = baseline_table.columns[cname]
candidate_col = candidate_table.columns[cname]
if not type_equal(baseline_col.type, candidate_col.type):
raise IncompatibleDatapackageModel(
'Type mismatch for resource %s column %s' % (tname, cname)
)
if not baseline_col.nullok and candidate_col.nullok and not extra_nonnull_ok:
# candidate can be more strict but not more relaxed?
raise IncompatibleDatapackageModel(
'Inconsistent nullability for resource %s column %s' % (tname, cname)
)
# TBD: do any constraint comparisons here?
#
# for now, defer such constraint checks to DB load time...
def validate_model_subset(self, subset):
"""Check that subset's model is a compatible subset of self.
:param subset: A CfdeDatapackage instance which should be a subset of self.
Raises IncompatibleDatapackageModel if supplied datapackage is not a compliant subset.
"""
self._compare_model_docs(subset)
def provision(self, alter=False):
"""Provision model idempotently in self.catalog"""
need_parts = []
# create empty schemas if missing
for nschema in self.doc_model_root.schemas.values():
if nschema.name not in self.cat_model_root.schemas:
sdoc = nschema.prejson()
del sdoc['tables']
sdoc.update({"schema_name": nschema.name})
need_parts.append(sdoc)
if need_parts:
self.catalog.post('/schema', json=need_parts).raise_for_status()
logger.info("Added empty schemas %r" % ([ sdoc["schema_name"] for sdoc in need_parts ]))
need_parts.clear()
self.get_model()
# create tables if missing, but stripped of fkeys and acl-bindings which might be incoherent
for nschema in self.doc_model_root.schemas.values():
schema = self.cat_model_root.schemas[nschema.name]
for ntable in nschema.tables.values():
if ntable.name not in schema.tables:
tdoc = ntable.prejson()
tdoc.pop("foreign_keys")
tdoc.pop("acl_bindings")
for cdoc in tdoc["column_definitions"]:
cdoc.pop("acl_bindings")
tdoc.update({"schema_name": nschema.name, "table_name": ntable.name})
need_parts.append(tdoc)
if need_parts:
self.catalog.post('/schema', json=need_parts).raise_for_status()
logger.info("Added base tables %r" % ([ (tdoc["schema_name"], tdoc["table_name"]) for tdoc in need_parts ]))
need_parts.clear()
self.get_model()
# create and/or upgrade columns, but stripped of acl-bindings which might be incoherent
for nschema in self.doc_model_root.schemas.values():
schema = self.cat_model_root.schemas[nschema.name]
for ntable in nschema.tables.values():
table = schema.tables[ntable.name]
for ncolumn in ntable.columns:
if ncolumn.name in {'nid', 'RID', 'RCT', 'RMT', 'RCB', 'RMB'}:
# don't consider patching system nor special CFDE columns...
pass
elif ncolumn.name not in table.columns.elements:
cdoc = ncolumn.prejson()
cdoc.pop('acl_bindings')
# HACK: prepare to allow registry upgrades w/ new non-null columns
cdoc_orig = dict(cdoc)
upgrade_data_path = cdoc.get('annotations', {}).get(self.schema_tag, {}).get('schema_upgrade_data_path')
if upgrade_data_path and isinstance(self.package_filename, PackageDataName):
cdoc['nullok'] = True
self.catalog.post(
'/schema/%s/table/%s/column' % (urlquote(nschema.name), urlquote(ntable.name)),
json=cdoc
).raise_for_status()
logger.info("Added column %s.%s.%s" % (nschema.name, ntable.name, ncolumn.name))
# apply built-in upgrade data to new column
if upgrade_data_path and isinstance(self.package_filename, PackageDataName):
with self.package_filename.get_data_stringio(upgrade_data_path) as upgrade_tsv:
reader = csv.reader(upgrade_tsv, delimiter='\t', skipinitialspace=True)
header = next(reader)
# we expect TSV to have key column(s) and then this new target column
if header[-1] != ncolumn.name:
raise ValueError('New column %s.%s.%s upgrade data final column %r should be %r' % (
nschema.name, ntable.name, ncolumn.name, header[-1], ncolumn.name,
))
for cname in header[0:-1]:
if cname not in table.column_definitions.elements:
raise ValueError('Unexpected column %s in new column %s.%s.%s upgrade data' % (
cname, nschema.name, ntable.name, ncolumn.name,
))
# allow upgrade data to include extra rows not present in target catalog
# e.g. for single source to work on dev/staging/prod VPC w/ data variations
def key_exists(row):
r = self.catalog.get(
'/entity/%s:%s/%s' % (
urlquote(nschema.name),
urlquote(ntable.name),
'/'.join([ '%s=%s' % (urlquote(cname), urlquote(value)) for cname, value in zip(header[0:-1], row[0:-1]) ]),
)
)
return len(r.json()) > 0
upgrade_data = [
dict(zip(header, row))
for row in reader
if key_exists(row)
]
self.catalog.put(
'/attributegroup/%s:%s/%s;%s' % (
urlquote(nschema.name),
urlquote(ntable.name),
','.join([ urlquote(cname) for cname in header[0:-1] ]),
urlquote(header[-1]),
),
json=upgrade_data,
)
logger.info("Applied new column %s.%s.%s upgrade data" % (nschema.name, ntable.name, ncolumn.name))
if cdoc_orig.get('nullok', True) is False:
self.catalog.put(
'/schema/%s/table/%s/column/%s' % (
urlquote(nschema.name),
urlquote(ntable.name),
urlquote(ncolumn.name),
),
json={'nullok': False},
)
logger.info("Altered new column %s.%s.%s to nullok=false" % (nschema.name, ntable.name, ncolumn.name))
else:
# consider column upgrade
column = table.columns[ncolumn.name]
change = {}
if ncolumn.nullok != column.nullok:
if alter:
change['nullok'] = ncolumn.nullok
elif column.nullok:
# existing column can accept this data
pass
else:
raise ValueError('Incompatible nullok settings for %s.%s' % (table.name, column.name))
def defeq(d1, d2):
if d1 is None:
if d2 is not None:
return False
return d1 == d2
if not defeq(ncolumn.default, column.default):
if alter:
change['default'] = ncolumn.default
else:
# no compatibility model for defaults?
pass
def typeeq(t1, t2):
if t1.typename != t2.typename:
return False
if t1.is_domain != t2.is_domain:
return False
if t1.is_array != t2.is_array:
return False
if t1.is_domain or t1.is_array:
return typeeq(t1.base_type, t2.base_type)
return True
if not typeeq(ncolumn.type, column.type):
if alter:
change['type'] = ncolumn.type
else:
raise ValueError('Mismatched type settings for %s.%s' % (table.name, column.name))
if change:
column.alter(**change)
logger.info("Altered column %s.%s.%s with changes %r" % (
nschema.name, ntable.name, ncolumn.name, change,
))
self.get_model()
# create and/or upgrade keys
for nschema in self.doc_model_root.schemas.values():
schema = self.cat_model_root.schemas[nschema.name]
for ntable in nschema.tables.values():
table = schema.tables[ntable.name]
for nkey in ntable.keys:
cnames = { c.name for c in nkey.unique_columns }
key = table.key_by_columns(cnames, raise_nomatch=False)
if key is None:
key = table.create_key(nkey.prejson())
logger.info("Created key %s" % (key.constraint_name,))
self.get_model()
# purge keys that no longer exist
for nschema in self.doc_model_root.schemas.values():
schema = self.cat_model_root.schemas[nschema.name]
for ntable in nschema.tables.values():
table = schema.tables[ntable.name]
for key in table.keys:
cnames = { c.name for c in key.unique_columns }
if cnames == {'RID'}:
continue
nkey = ntable.key_by_columns(cnames, raise_nomatch=False)
if nkey is None:
key.drop()
logger.info("Deleted key %s" % (key.constraint_name,))
self.get_model()
# create and/or upgrade fkeys, stripping acl-bindings which may be incoherent
for nschema in self.doc_model_root.schemas.values():
schema = self.cat_model_root.schemas[nschema.name]
for ntable in nschema.tables.values():
table = schema.tables[ntable.name]
for nfkey in ntable.foreign_keys:
if { c.name for c in nfkey.foreign_key_columns }.issubset({'RCB', 'RMB'}) and not nfkey.annotations.get(tag.noprune, False):
# skip built-in RCB/RMB fkeys we don't want
continue
pktable = schema.model.table(nfkey.pk_table.schema.name, nfkey.pk_table.name)
cmap = {
table.columns[nfkc.name]: pktable.columns[npkc.name]
for nfkc, npkc in nfkey.column_map.items()
}
fkey = table.fkey_by_column_map(cmap, raise_nomatch=False)
if fkey is None:
fkdoc = nfkey.prejson()
fkdoc["foreign_key_columns"][0].update({"schema_name": nschema.name, "table_name": ntable.name})
fkdoc.pop("acl_bindings")
need_parts.append(fkdoc)
else:
# consider fkey upgrade
change = {}
if nfkey.constraint_name != fkey.constraint_name:
change['constraint_name'] = nfkey.constraint_name
if nfkey.on_delete != fkey.on_delete:
change['on_delete'] = nfkey.on_delete
if nfkey.on_update != fkey.on_update:
change['on_update'] = nfkey.on_update
if change:
fkey.alter(**change)
logger.info("Altered foreign key %s.%s with changes %r" % (
nschema.name, fkey.constraint_name, change
))
if need_parts:
self.catalog.post('/schema', json=need_parts).raise_for_status()
logger.info("Added foreign-keys %r" % ([ tuple(fkdoc["names"][0]) for fkdoc in need_parts ]))
need_parts.clear()
self.get_model()
# restore acl-bindings we stripped earlier
self.apply_custom_config()
logger.info('Provisioned model in catalog %s' % self.catalog.get_server_uri())
def apply_custom_config(self):
self.get_model()
# get appropriate policies for this catalog scenario
self.configurator.apply_to_model(self.cat_model_root)
self.configurator.apply_to_model(self.doc_model_root)
self.cat_model_root.annotations.update(self.doc_model_root.annotations)
for schema in self.cat_model_root.schemas.values():
doc_schema = self.doc_model_root.schemas.get(schema.name)
if doc_schema is None:
continue
schema.acls.clear()
schema.acls.update(doc_schema.acls)
for table in schema.tables.values():
doc_table = doc_schema.tables.get(table.name)
if doc_table is None:
continue
table.annotations.clear()
table.annotations.update(doc_table.annotations)
table.acls.clear()
table.acl_bindings.clear()
table.acls.update(doc_table.acls)
table.acl_bindings.update(doc_table.acl_bindings)
for column in table.columns:
doc_column = doc_table.columns.elements.get(column.name)
if doc_column is None:
continue
column.annotations.clear()
column.annotations.update(doc_column.annotations)
column.acls.clear()
column.acl_bindings.clear()
column.acls.update(doc_column.acls)
column.acl_bindings.update(doc_column.acl_bindings)
if True or table.is_association():
for cname in {'RCB', 'RMB'}:
if cname not in table.columns.elements:
continue
for fkey in table.fkeys_by_columns([cname], raise_nomatch=False):
if fkey.annotations.get(tag.noprune, False):
continue
logger.info('Dropping %s' % fkey.uri_path)
fkey.drop()
for fkey in table.foreign_keys:
doc_fkey = doc_table.foreign_keys.elements.get( (doc_schema, fkey.name[1]) )
if doc_fkey is None:
continue
fkey.annotations.clear()
fkey.annotations.update(doc_fkey.annotations)
fkey.acls.clear()
fkey.acl_bindings.clear()
fkey.acls.update(doc_fkey.acls)
fkey.acl_bindings.update(doc_fkey.acl_bindings)
def compact_visible_columns(table):
"""Emulate Chaise heuristics while hiding system metadata"""
# hacks for CFDE:
# - assume we have an app-level primary key (besides RID)
# - ignore possibility of compound or overlapping fkeys
fkeys_by_col = {
fkey.foreign_key_columns[0].name: fkey.names[0]
for fkey in table.foreign_keys
}
return [
fkeys_by_col.get(col.name, col.name)
for col in table.column_definitions
if col.name not in {"nid", "RID", "RCT", "RMT", "RCB", "RMB"}
]
def visible_foreign_keys(table):
"""Emulate Chaise heuristics while hiding denorm tables"""
# hack: we use a fixed prefix for these tables
return [
fkey.names[0]
for fkey in table.referenced_by
#if not fkey.table.name.startswith("dataset_denorm")
]
for table in self.cat_cfde_schema.tables.values():
ntable = self.doc_cfde_schema.tables.get(table.name)
if ntable is None:
continue
table.comment = ntable.comment
table.display.update(ntable.display)
for column in table.column_definitions:
if column.name in {'id', 'url', 'md5', 'sha256'}:
# set these acronyms to all-caps
column.display["name"] = column.name.upper()
ncolumn = ntable.column_definitions.elements.get(column.name)
if ncolumn is None:
continue
column.comment = ncolumn.comment
column.display.update(ncolumn.display)
for fkey in table.foreign_keys:
try:
npktable = self.doc_model_root.table(fkey.pk_table.schema.name, fkey.pk_table.name)
nfkey = ntable.fkey_by_column_map({
ntable.column_definitions[fk_col.name]: npktable.column_definitions[pk_col.name]
for fk_col, pk_col in fkey.column_map.items()
})
fkey.foreign_key.update(nfkey.foreign_key)
except KeyError:
continue
#table.visible_columns = {'compact': compact_visible_columns(table)}
#table.visible_foreign_keys = {'*': visible_foreign_keys(table)}
## apply the above ACL and annotation changes to server
self.cat_model_root.apply()
logger.info('Applied custom config to catalog %s' % self.catalog.get_server_uri())
self.get_model()
@classmethod
def make_row2dict(cls, table, header):
"""Pickle a row2dict(row) function for use with a csv reader"""
numcols = len(header)
missingValues = set(table.annotations.get(cls.schema_tag, {}).get("missingValues", []))
for cname in header:
if cname not in table.column_definitions.elements:
raise ValueError("header column %s not found in table %s" % (cname, table.name))
def row2dict(row):
"""Convert row tuple to dictionary of {col: val} mappings."""
return dict(zip(
header,
[ None if x in missingValues else x for x in row ]
))
return row2dict
def dump_data_files(self, resources=None, dump_dir=None):
"""Dump resources to TSV files (inverse of normal load process)
:param resources: List of resources from datapackage or None (default) to get automatic list.
:param dump_dir: Path to a directory to write files or None (default) to use current working directory.
The automatic list of dump resources is derived from the
datapackage, including those tables that specified an input
file via the "path" attribute. Other resources are skipped.
"""
if resources is None:
resources = [
resource
for resource in self.package_def['resources']
if 'path' in resource
]
for resource in resources:
if 'path' in resource:
fname = os.path.basename(resource['path'])
else:
# allow dumping of unusual resources?
fname = '%s.tsv' % (resource['name'],)
if dump_dir is not None:
fname = '%s/%s' % (dump_dir.rstrip('/'), fname)
# dump the same TSV columns included in package def (not extra DERIVA fields)
cnames = [ field['name'] for field in resource['schema']['fields'] ]
table = self.cat_cfde_schema.tables[resource['name']]
if 'nid' in table.columns.elements:
kcol = 'nid'
elif 'RID' in table.columns.elements:
kcol = 'RID'
else:
raise ValueError('Cannot dump data for table %s with neither "nid" nor "RID" key columns!' % (table.name,))
def get_data():
r = self.catalog.get(
'/entity/CFDE:%s@sort(%s)?limit=%d' % (
urlquote(resource['name']),
kcol,
self.batch_size,
))
rows = r.json()
yield rows
while rows:
last = rows[-1][kcol]
r = self.catalog.get(
'/entity/CFDE:%s@sort(%s)@after(%s)?limit=%d' % (
urlquote(resource['name']),
kcol,
urlquote(last),
self.batch_size,
))
rows = r.json()
yield rows
with open(fname, 'w') as f:
writer = csv.writer(f, delimiter='\t', lineterminator='\n')
writer.writerow(tuple(cnames))
for rows in get_data():
for row in rows:
writer.writerow(tuple([
row[cname] if row is not None else ''
for cname in cnames
]))
del writer
logger.info('Dumped resource "%s" as "%s"' % (resource['name'], fname))
def load_data_files(self, onconflict='abort'):
"""Load tabular data from files into catalog table.
:param onconflict: ERMrest onconflict query parameter to emulate (default abort)
"""
tables_doc = self.model_doc['schemas']['CFDE']['tables']
for table in tables_topo_sorted(self.doc_cfde_schema.tables.values()):
# we are doing a clean load of data in fkey dependency order
resource = tables_doc[table.name]["annotations"].get(self.resource_tag, {})
logger.debug('Loading table "%s"...' % table.name)
if "path" not in resource:
continue
def open_package():
if isinstance(self.package_filename, PackageDataName):
path = resource["path"]
if self.package_filename is registry_schema_json:
# allow absolute path to reference packages
if path.startswith("/submission/"):
path = path[len("/submission/"):]
return submission_schema_json.get_data_stringio(path)
elif path.startswith("/portal_prep/"):
path = path[len("/portal_prep/"):]
return portal_prep_schema_json.get_data_stringio(path)
# fall through common else:
return self.package_filename.get_data_stringio(path)
else:
fname = "%s/%s" % (os.path.dirname(self.package_filename), resource["path"])
return open(fname, 'r')
try:
with open_package() as f:
# translate TSV to python dicts
reader = csv.reader(f, delimiter="\t", skipinitialspace=True)
header = next(reader)
missing = set(table.annotations.get(self.schema_tag, {}).get("missingValues", []))
for cname in header:
if cname not in table.column_definitions.elements:
raise ValueError("header column %s not found in table %s" % (cname, table.name))
if onconflict == 'update':
def has_key(cols):
if set(cols).issubset(set(header)):
return table.key_by_columns(cols, raise_nomatch=False) is not None
return False
keycols = None
if has_key(('id',)):
keycols = ('id',)
elif has_key(('id_namespace', 'local_id')):
keycols = ('id_namespace', 'local_id')
else:
for key in table.keys:
if has_key([ c.name for c in key.unique_columns ]):
keycols = [ c.name for c in key.unique_columns ]
break
if keycols is None:
raise NotImplementedError('Table %s TSV columns %r do not cover a key' % (table.name, header))
updcols = [ cname for cname in header if cname not in keycols ]
update_sig = ','.join([ urlquote(cname) for cname in keycols ])
if updcols:
update_sig = ';'.join([ update_sig, ','.join([ urlquote(cname) for cname in updcols ])])
else:
update_sig = False
# Largest known CFDE ingest has file with >5m rows
batch = []
def store_batch():
def row_to_json(row):
row = [ None if v in missing else v for v in row ]
res = dict(zip(header, row))
for cname in header:
if table.columns[cname].type.typename in ('text[]', 'json', 'jsonb', 'int4[]', 'int8[]'):
res[cname] = json.loads(res[cname]) if res[cname] is not None else None
return res
payload = [ row_to_json(row) for row in batch ]
if onconflict == 'update':
# emulate as two passes
rj = self.catalog.post(
"/entity/CFDE:%s?onconflict=skip" % (urlquote(table.name),),
json=payload
).json()
if update_sig:
self.catalog.put(
"/attributegroup/CFDE:%s/%s" % (urlquote(table.name), update_sig),
json=payload
).json() # drain response body...
else:
entity_url = "/entity/CFDE:%s?onconflict=%s" % (urlquote(table.name), urlquote(onconflict))
rj = self.catalog.post(
entity_url,
json=payload
).json()
logger.info("Batch of rows for %s loaded" % table.name)
skipped = len(batch) - len(rj)
if skipped:
logger.debug("Batch contained %d rows with existing keys" % skipped)
for raw_row in reader:
# Collect full batch, then insert at once
batch.append(raw_row)
if len(batch) >= self.batch_size:
try:
store_batch()
except Exception as e:
logger.error("Table %s data load FAILED from "
"%s: %s" % (table.name, self.package_filename, e))
raise
else:
batch.clear()
# After reader exhausted, ingest final batch
if len(batch) > 0:
try:
store_batch()
except Exception as e:
logger.error("Table %s data load FAILED from "
"%s: %s" % (table.name, self.package_filename, e))
raise
logger.info("All data for table %s loaded from %s." % (table.name, self.package_filename))
except UnicodeDecodeError as e:
raise InvalidDatapackage('Resource file "%s" is not valid UTF-8 data: %s' % (resource["path"], e))
def sqlite_import_data_files(self, conn, onconflict='abort', table_error_callback=None, progress=None):
"""Load tabular data from files into sqlite table.
:param conn: Existing sqlite3 connection to use as data destination.
:param onconflict: ERMrest onconflict query parameter to emulate (default abort)
:param table_error_callback: Optional callback to signal table loading errors, lambda rname, rpath, msg: ...
:param progress: Optional, mutable progress/restart-marker dictionary
"""
if progress is None:
progress = dict()
tables_doc = self.model_doc['schemas']['CFDE']['tables']
for table in tables_topo_sorted(self.doc_cfde_schema.tables.values()):
# we are doing a clean load of data in fkey dependency order
resource = tables_doc[table.name]["annotations"].get(self.resource_tag, {})
if "path" not in resource:
continue
if progress.get(table.name):
logger.info("Skipping sqlite import for %s due to existing progress marker" % table.name)
continue
logger.debug('Importing table "%s" into sqlite...' % table.name)
def open_package():
if isinstance(self.package_filename, PackageDataName):
return self.package_filename.get_data_stringio(resource["path"])
else:
fname = "%s/%s" % (os.path.dirname(self.package_filename), resource["path"])
return open(fname, 'r')
try:
with open_package() as f:
# translate TSV to python dicts
reader = csv.reader(f, delimiter="\t", skipinitialspace=True)
header = next(reader)
num_cols = len(header)
missing = set(table.annotations.get(self.schema_tag, {}).get("missingValues", []))
if not header:
raise InvalidDatapackage("blank/missing header for %r" % (resource["path"],))
for cname in header:
if cname not in table.column_definitions.elements:
raise InvalidDatapackage("header column %s not found in table %s" % (cname, table.name))
# Largest known CFDE ingest has file with >5m rows
batch = []
def insert_batch():
for row in batch:
if len(row) != num_cols:
msg = 'Expecting %d columns %r, found row with %d values %r.' % (
num_cols,
header,
len(row),
row,
)
raise InvalidDatapackage('Resource file "%s" inconsistent field counts: %s' % (
resource["path"],
msg,
))
sql = "INSERT INTO %(table)s (%(cols)s) VALUES %(values)s %(upsert)s" % {
'table': sql_identifier(table.name),
'cols': ', '.join([ sql_identifier(c) for c in header ]),
'values': ', '.join([
'(%s)' % (', '.join([ 'NULL' if x in missing else sql_literal(x) for x in row ]))
for row in batch
]),
'upsert': 'ON CONFLICT DO NOTHING' if onconflict == 'skip' else '',
}
try:
conn.execute(sql)
except sqlite3.OperationalError as e:
logger.info('got error during batch insertion: %s' % e)
logger.info('>>>>>>>>> BEGIN batch SQL')
logger.info(sql)
logger.info('<<<<<<<<< END batch SQL')
raise
except sqlite3.IntegrityError as e:
msg = str(e)
if msg.find('NOT NULL constraint failed') >= 0:
# find offending row to give better error details
error_cname = msg.split('.')[-1] # HACK works because of simple C2M2 column naming
try:
pos = header.index(error_cname)
for row in batch:
if row[pos] is None or row[pos] in missing:
raise InvalidDatapackage('Resource file "%s" missing required value for column %r, row %r' % (
resource["path"],
error_cname,
dict(zip(header, row)),
))
except ValueError:
raise InvalidDatapackage('Resource file "%s" does not supply required column %r' % (
resource["path"],
error_cname,
))
elif msg.find('UNIQUE constraint failed') >= 0:
# find offending row to give better error details
error_cnames = msg.split(':')[-1] # "... failed: tname.col, ... "
error_cnames = [
fragment.split('.')[-1] # only need cname from "tname.cname"
for fragment in error_cnames.split(',')
]
error_positions = [ header.index(cname) for cname in error_cnames ]
try:
cur = conn.cursor()
keys = set()
# check batch against itself and against prior batches in table
for row in batch:
key = tuple([ row[pos] for pos in error_positions ])
if key in keys:
raise InvalidDatapackage('Resource file "%s" violates uniqueness constraint for key %r' % (
resource["path"],
dict(zip(error_cnames, key)),
))
keys.add(key)
cur.execute("SELECT %(cols)s FROM %(table)s WHERE %(where)s" % {
'table': sql_identifier(table.name),
'cols': ','.join([ sql_identifier(cname) for cname in error_cnames ]),
'where': ' AND '.join([
'(%s = %s)' % (sql_identifier(k), sql_literal(v))
for k, v in dict(zip(error_cnames, key)).items()
]),
})
for key in cur:
# zero or one existing keys returned here
raise InvalidDatapackage('Resource file "%s" violates uniqueness constraint for key %r' % (
resource["path"],
dict(zip(error_cnames, key)),
))
except ValueError:
raise InvalidDatapackage('Resource file "%s" violates uniqueness constraint for key %r' % (
resource["path"],
error_cnames,
))
# re-raise if we don't have a better idea
raise
logger.debug("Batch of rows for %s loaded" % table.name)
for raw_row in reader:
# Collect full batch, then insert at once
batch.append(raw_row)
if len(batch) >= self.batch_size:
try:
insert_batch()
except Exception as e:
logger.error("Table %s data load FAILED from "
"%s: %s" % (table.name, self.package_filename, e))
raise
else:
batch.clear()
# After reader exhausted, ingest final batch
if len(batch) > 0:
try:
insert_batch()
except Exception as e:
logger.error("Table %s data load FAILED from "
"%s: %s" % (table.name, self.package_filename, e))
raise
progress[table.name] = True
logger.info("All data for table %s loaded from %s." % (table.name, self.package_filename))
except UnicodeDecodeError as e:
if table_error_callback:
table_error_callback(resource["name"], resource["path"], str(e))
raise InvalidDatapackage('Resource file "%s" is not valid UTF-8 data: %s' % (resource["path"], e))
def check_sqlite_tables(self, conn, source=None, table_error_callback=None, tablenames=None, progress=None):
"""Validate tabular data from sqlite table according to model.
:param conn: Existing sqlite3 connection to use as data source.
:param source: Another CfdeDatapackage representing source data, otherwise use self.
:param table_error_callback: Optional callback to signal error for one table, lambda tname, tpath, msg: ...
:param tablenames: Optional set of tablenames to check (default None means check all tables)
:param progress: Optional, mutable progress/restart-marker dictionary
"""
if progress is None:
progress = dict()
if not self.package_filename in (submission_schema_json, constituent_schema_json, portal_schema_json):
raise ValueError('check_sqlite_tables() is only valid for built-in datapackages')
if source is None:
source = self
tables_doc = source.model_doc['schemas']['CFDE']['tables']