Skip to content

Commit

Permalink
Merge pull request #185 from pdowler/main
Browse files Browse the repository at this point in the history
icewind: validate mode
  • Loading branch information
pdowler authored Dec 16, 2024
2 parents 228157e + 0304964 commit b873a2b
Show file tree
Hide file tree
Showing 14 changed files with 792 additions and 221 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ jobs:

## applications
- name: build and test icewind
run: cd icewind && ../gradlew --info clean build javadoc checkstyleMain
run: cd icewind && ../gradlew --info clean build checkstyleMain

- name: build and test torkeep
run: cd torkeep && ../gradlew --info clean build javadoc checkstyleMain
run: cd torkeep && ../gradlew --info clean build checkstyleMain

- name: build and test argus
run: cd argus && ../gradlew --info clean build javadoc checkstyleMain
run: cd argus && ../gradlew --info clean build checkstyleMain

- name: build and test bifrost
run: cd bifrost && ../gradlew --info clean build javadoc checkstyleMain
run: cd bifrost && ../gradlew --info clean build checkstyleMain

2 changes: 1 addition & 1 deletion caom2persistence/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ sourceCompatibility = 1.8

group = 'org.opencadc'

version = '2.4.18'
version = '2.4.19'

description = 'OpenCADC CAOM database library'
def git_url = 'https://github.com/opencadc/caom2db'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ public HarvestStateDAO(DataSource dataSource, String database, String schema) {

protected void init() {
StringBuilder sb = new StringBuilder();
sb.append(database).append(".").append(schema).append(".");
if (database != null) {
sb.append(database).append(".");
}
sb.append(schema).append(".");
if (fakeSchemaTablePrefix != null) {
sb.append(fakeSchemaTablePrefix);
}
Expand Down
14 changes: 12 additions & 2 deletions icewind/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,17 @@ org.opencadc.icewind.caom.url=jdbc:postgresql://{server}/{database}
# Base for generating Plane publisherID values
org.opencadc.icewind.basePublisherID={uri}
# (optional) exit after processing collections once
# (optional) exit after processing each collection once
#org.opencadc.icewind.exitWhenComplete=true
# (optional) retry previously failed (skipped) observations
# (optional mode) retry previously failed (skipped) observations
# this mode always assumes exitWhenComplete=true
org.opencadc.icewind.retrySkipped = true
# (optional mode) validate remote and local observation sets for consistency
# this mode always assumes exitWhenComplete=true
# validate mode always assumes retrySkipped and performs retries after validation
org.opencadc.icewind.validate = true
```

The _caom_ database account owns and manages (create, alter, drop) CAOM database objects
Expand Down Expand Up @@ -89,6 +94,11 @@ retry previously failed (skipped) observations listed in the `caom2.HarvestSkipU
table. This mode always assumes _exitWhenComplete_ so it terminates after one pass
through the list.

The `icewind` _validate_ mode queries the _repoService_ and local database asnd compares the
two sets of observations, identifies discrepancies (missed delete, missed observation, or
Observation.accMetaChecksum discrepancy) and schedules a retry by creating a new record
in the `caom2.HarvestSkipURI` table.

### cadcproxy.pem (optional)
This client certificate can be provided in /config directory. If present, it is used to
authenticate to the _repoService_ if the service requests a client certificate. If
Expand Down
2 changes: 1 addition & 1 deletion icewind/VERSION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## deployable containers have a semantic and build tag
# semantic version tag: major.minor[.patch]
# build version tag: timestamp
VER=0.9.13
VER=0.10.0
TAGS="${VER} ${VER}-$(date --utc +"%Y%m%dT%H%M%S")"
unset VER
117 changes: 59 additions & 58 deletions icewind/src/main/java/org/opencadc/icewind/CaomHarvester.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,55 +92,41 @@ public class CaomHarvester implements Runnable {
private static final Long DEFAULT_IDLE_TIME = 30000L;

private final InitDatabase initdb;
private final HarvesterResource src;
private final HarvesterResource dest;
private final HarvestSource src;
private final HarvestDestination dest;
private final List<String> collections;
private final URI basePublisherID;
private final int batchSize;
private final int nthreads;
private final boolean full;
private final boolean skip;
private final boolean nochecksum;
private final boolean exitWhenComplete;
private final long maxIdle;

// hack option

// optional
int batchSize;
int numThreads;
boolean exitWhenComplete = false;
long maxSleep;
boolean validateMode = false;
boolean skipMode = false;
String retryErrorMessagePattern;

// not used by main
private boolean nochecksum;

/**
* Harvest everything.
*
* @param src source resource
* @param dest destination resource (must be a server/database/schema)
* @param collections list of collections to process
* @param basePublisherID base to use in generating Plane publisherID values in destination database
* @param batchSize number of observations per batch (~memory consumption)
* @param nthreads max threads when harvesting from a service
* @param full full harvest of all source entities
* @param skip attempt retry of all skipped observations
* @param nochecksum disable metadata checksum comparison
* @param exitWhenComplete exit after processing each collection if true, else continuously loop
* @param maxIdle max sleep time in seconds between runs when running continuously
*/
public CaomHarvester(HarvesterResource src, HarvesterResource dest, List<String> collections,
URI basePublisherID, int batchSize, int nthreads, boolean full, boolean skip,
boolean nochecksum, boolean exitWhenComplete, long maxIdle) {
public CaomHarvester(HarvestSource src, List<String> collections, HarvestDestination dest, URI basePublisherID) {
this.src = src;
this.dest = dest;
this.collections = collections;
this.dest = dest;
this.basePublisherID = basePublisherID;
this.batchSize = batchSize;
this.nthreads = nthreads;
this.full = full;
this.skip = skip;
this.nochecksum = nochecksum;
this.exitWhenComplete = exitWhenComplete;
this.maxIdle = maxIdle;


ConnectionConfig cc = new ConnectionConfig(null, null, dest.getUsername(), dest.getPassword(),
HarvesterResource.POSTGRESQL_DRIVER, dest.getJdbcUrl());
HarvestDestination.POSTGRESQL_DRIVER, dest.getJdbcUrl());
DataSource ds = DBUtil.getDataSource(cc);
this.initdb = new InitDatabase(ds, dest.getDatabase(), dest.getSchema());
this.initdb = new InitDatabase(ds, null, dest.getSchema());
}

@Override
Expand Down Expand Up @@ -176,31 +162,46 @@ public void run() {
while (!done) {
int ingested = 0;
for (String collection : collections) {
log.info(src.getIdentifier(collection) + " -> " + dest.getIdentifier(collection));

ObservationHarvester obsHarvester = new ObservationHarvester(src, dest, collection, basePublisherID, batchSize,
nthreads, full, nochecksum);
obsHarvester.setSkipped(skip, retryErrorMessagePattern);

DeletionHarvester obsDeleter = new DeletionHarvester(DeletedObservation.class, src, dest,
collection, batchSize * 100);
boolean initDel = init;
if (!init) {
// check if we have ever harvested before
HarvestState hs = obsHarvester.harvestStateDAO.get(obsHarvester.source, obsHarvester.cname);
initDel = (hs.curID == null && hs.curLastModified == null); // never harvested
}

try {
// delete observations before harvest to avoid observationURI conflicts from delete+create
obsDeleter.setInitHarvestState(initDel);
obsDeleter.run();

// harvest observations
obsHarvester.run();
ingested += obsHarvester.getIngested();
} catch (TransientException e) {
ingested = 0;
log.info(src.getIdentifier(collection) + " -> " + dest);

if (validateMode) {
ObservationValidator validator = new ObservationValidator(src, collection, dest, batchSize, numThreads, false);
ObservationHarvester obsHarvester = new ObservationHarvester(src, collection, dest, basePublisherID,
batchSize, numThreads, nochecksum);
obsHarvester.setSkipped(skipMode, null);
try {
validator.run();
if (validator.getNumMismatches() > 0) {
obsHarvester.run(); // retry skipped
}
} catch (TransientException ex) {
log.warn("validate " + src.getIdentifier(collection) + " FAIL", ex);
}
} else {
ObservationHarvester obsHarvester = new ObservationHarvester(src, collection, dest, basePublisherID,
batchSize, numThreads, nochecksum);
obsHarvester.setSkipped(skipMode, retryErrorMessagePattern);

DeletionHarvester obsDeleter = new DeletionHarvester(DeletedObservation.class, src, collection, dest);
boolean initDel = init;
if (!init) {
// check if we have ever harvested before
HarvestState hs = obsHarvester.harvestStateDAO.get(obsHarvester.source, obsHarvester.cname);
initDel = (hs.curID == null && hs.curLastModified == null); // never harvested
}

try {
// delete observations before harvest to avoid observationURI conflicts from delete+create
obsDeleter.setInitHarvestState(initDel);
obsDeleter.run();

// harvest observations
obsHarvester.run();
ingested += obsHarvester.getIngested();
} catch (TransientException ex) {
log.warn("harvest " + src.getIdentifier(collection) + " FAIL", ex);
ingested = 0;
}
}
}

Expand All @@ -211,7 +212,7 @@ public void run() {
if (ingested > 0 || sleep == 0) {
sleep = DEFAULT_IDLE_TIME;
} else {
sleep = Math.min(sleep * 2, maxIdle * 1000L);
sleep = Math.min(sleep * 2, maxSleep * 1000L);
}
try {
log.info("idle sleep: " + (sleep / 1000L) + " sec");
Expand Down
24 changes: 10 additions & 14 deletions icewind/src/main/java/org/opencadc/icewind/DeletionHarvester.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@
public class DeletionHarvester extends Harvester implements Runnable {

private static final Logger log = Logger.getLogger(DeletionHarvester.class);

private static int DEFAULT_BATCH_SIZE = 100000;

private DeletedEntityDAO deletedDAO;
private RepoClient repoClient;
private ObservationDAO obsDAO;
Expand All @@ -114,9 +115,10 @@ public class DeletionHarvester extends Harvester implements Runnable {
* @param collection the collection to process
* @param batchSize ignored, always full list
*/
public DeletionHarvester(Class<?> entityClass, HarvesterResource src, HarvesterResource dest,
String collection, int batchSize) {
super(entityClass, src, dest, collection, batchSize, false);
public DeletionHarvester(Class<?> entityClass, HarvestSource src, String collection,
HarvestDestination dest) {
super(entityClass, src, collection, dest);
setBatchSize(DEFAULT_BATCH_SIZE);
init();
}

Expand All @@ -143,22 +145,21 @@ private void init() {
repoClient.setReadTimeout(120000); // 2 min

// destination
final String destDS = "jdbc/DeletionHarvester";

final String destDS = DEST_DATASOURCE_NAME;
Map<String, Object> destConfig = getConfigDAO(dest);
try {
DataSource cur = null;
try {
cur = DBUtil.findJNDIDataSource(destDS);
} catch (NamingException notInitialized) {
log.debug("JNDI not initialized yet... OK");
log.info("JNDI DataSource not initialized yet... OK");
}
if (cur == null) {
ConnectionConfig destConnectionConfig = new ConnectionConfig(null, null,
dest.getUsername(), dest.getPassword(), HarvesterResource.POSTGRESQL_DRIVER, dest.getJdbcUrl());
dest.getUsername(), dest.getPassword(), HarvestDestination.POSTGRESQL_DRIVER, dest.getJdbcUrl());
DBUtil.createJNDIDataSource(destDS, destConnectionConfig);
} else {
log.debug("found DataSource: " + destDS + " -- re-using");
log.info("found DataSource: " + destDS + " -- re-using");
}
} catch (NamingException e) {
throw new IllegalStateException(String.format("Error creating destination JNDI datasource for %s reason: %s",
Expand Down Expand Up @@ -209,7 +210,6 @@ public void run() {
log.error("batched aborted");
}
go = (!num.abort && !num.done);
full = false; // do not start at min(lastModified) again
}
try {
close();
Expand Down Expand Up @@ -263,10 +263,6 @@ private Progress doit() {

startDate = state.curLastModified;
if (firstIteration) {
if (super.minDate != null) { // override state
startDate = super.minDate;
}
endDate = super.maxDate;
// harvest up to a little in the past because the head of the
// sequence may be volatile
long fiveMinAgo = System.currentTimeMillis() - 5 * 60000L;
Expand Down
Loading

0 comments on commit b873a2b

Please sign in to comment.