Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jason work to fix worker priority scheduling #227

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 142 additions & 132 deletions cws-core/src/main/java/jpl/cws/core/db/SchedulerDbService.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ public class SchedulerDbService extends DbService implements InitializingBean {
public static final int DEFAULT_WORKER_PROC_DEF_MAX_INSTANCES = 1;
public static final int PROCESSES_PAGE_SIZE = 50;

public static final String FIND_CLAIMABLE_ROWS_SQL =
"SELECT uuid FROM cws_sched_worker_proc_inst " +
"WHERE " +
" status='" + PENDING + "' AND " +
" proc_def_key=? " +
"ORDER BY " +
" priority ASC, " + // lower priorities favored
" created_time ASC " + // older dates (FIFO) favored
"LIMIT ?";
public static final String FIND_CLAIMABLE_ROWS_SQL =
"SELECT uuid, priority FROM cws_sched_worker_proc_inst " +
"WHERE " +
" status='"+PENDING+"' AND " +
" proc_def_key=? " +
"ORDER BY " +
" priority ASC, " + // lower priorities favored
" created_time ASC " + // older dates (FIFO) favored
"LIMIT ?";

public static final String UPDATE_CLAIMABLE_ROW_SQL =
"UPDATE cws_sched_worker_proc_inst " +
Expand Down Expand Up @@ -248,130 +248,140 @@ public int updateProcInstIdAndStartedByWorker(
* @return mappings of claimUuids and claimedRowUuids
*/

public Map<String, List<String>> claimHighestPriorityStartReq(String workerId, Map<String, Integer> workerProcsList, Map<String, Integer> limitsPerProcs, int limit) {
List<String> claimUuids = new ArrayList<String>();
List<String> rowUuids = new ArrayList<String>();
List<String> rowUuidsPerProcDefKey = new ArrayList<String>();
LinkedHashMap<String, String> uuidAndProcDefKeyPair = new LinkedHashMap<String, String>();
List<String> clearOutUnclaimedInst = new ArrayList<String>();
List<String> unfilteredRowUuids = new ArrayList<String>();
List<String> claimedRowUuids = new ArrayList<String>();
long t0 = System.currentTimeMillis();
int numClaimed = 0;
String claimUuid = null;
int attempts = 0;

// Try, until succeeding in claiming at least one row
//
while (attempts++ < 10) {
try {
// Find claimable rows
//
for (Map.Entry<String, Integer> procs : limitsPerProcs.entrySet()) {
rowUuidsPerProcDefKey = jdbcTemplate.queryForList(FIND_CLAIMABLE_ROWS_SQL, String.class,
new Object[]{procs.getKey(), procs.getValue() * 2});
// get list of uuids using array of procdefkeys IN (keys)
unfilteredRowUuids.addAll(rowUuidsPerProcDefKey);
}

Collections.sort(unfilteredRowUuids);
for (String id : unfilteredRowUuids) {
String procDefKeyString = getProcDefKeyFromUuid(id);
uuidAndProcDefKeyPair.put(id, procDefKeyString);
}

for (Map.Entry<String, Integer> procLimit : limitsPerProcs.entrySet()) {
Set<String> keys = uuidAndProcDefKeyPair.keySet();
int applyPerProcsCap = 0;
for (String key : keys) {

if (uuidAndProcDefKeyPair.get(key).equals(procLimit.getKey())) {
applyPerProcsCap = applyPerProcsCap + 1;
if (applyPerProcsCap > procLimit.getValue()) {
clearOutUnclaimedInst.add(key);
}
}
}
}

for (String removeUuidFromList : clearOutUnclaimedInst) {
uuidAndProcDefKeyPair.remove(removeUuidFromList);
}

Set<String> uuidKeys = uuidAndProcDefKeyPair.keySet();
// after its filtered add the uuids to rowUuids arraylist
for (String key : uuidKeys) {
rowUuids.add(key);
}

// make query that uses multi limit per ProcDefkey (JOIN)
// iterate to grab 30
if (!rowUuids.isEmpty()) {
// Found some claimable rows, so now try to claim them..
//
for (String uuid : rowUuids) {
claimUuid = UUID.randomUUID().toString();
int updateCount = jdbcTemplate.update(UPDATE_CLAIMABLE_ROW_SQL,
new Object[]{workerId, claimUuid, uuid, workerId});

if (updateCount == 1) {
numClaimed++;
claimUuids.add(claimUuid);
claimedRowUuids.add(uuid);
//log.debug("CLAIMED " + claimUuid + " (uuid=" +uuid+") for procDefKey '" + procDefKeyList + "'");
}

if (numClaimed == limit) {
break; // we have claimed up to the limit, so stop claiming
}
public Map<String,List<String>> claimHighestPriorityStartReq(String workerId, Map<String,Integer> workerProcsList, Map<String,Integer> limitsPerProcs, int limit) {
List<String> claimUuids = new ArrayList<String>();
List<String> rowUuids = new ArrayList<String>();
List<Map<String, Object>> rowUuidsPerProcDefKey = new ArrayList<Map<String, Object>>();
LinkedHashMap<String, String> uuidAndProcDefKeyPair = new LinkedHashMap<String, String>();
List<String> clearOutUnclaimedInst = new ArrayList<String>();
List<Map<String, Object>> unfilteredProcesses = new ArrayList<Map<String, Object>>();
List<String> claimedRowUuids = new ArrayList<String>();
long t0 = System.currentTimeMillis();
int numClaimed = 0;
String claimUuid = null;
int attempts = 0;

// Try, until succeeding in claiming at least one row
//
while (attempts++ < 10) {
try {
// Find claimable rows
//
for (Map.Entry<String, Integer> procs : limitsPerProcs.entrySet()) {
rowUuidsPerProcDefKey = jdbcTemplate.queryForList(FIND_CLAIMABLE_ROWS_SQL, new Object[] {procs.getKey(), procs.getValue()*2});
// get list of uuids using array of procdefkeys IN (keys)
unfilteredProcesses.addAll(rowUuidsPerProcDefKey);
}

unfilteredProcesses.sort(new Comparator<Map<String, Object>>() {
public int compare(Map<String, Object> one, Map<String, Object> two) {
return ((Integer) one.get("priority")).compareTo((Integer) two.get("priority"));
}

if (numClaimed == 0) {
// other workers beat us to claiming the rows
log.warn("Attempted to claim " + rowUuids.size() + " rows for procDefKeys '" + workerProcsList.keySet() + "', but claimed none! " +
(attempts < 10 ? "Retrying..." : "GIVING UP!"));
continue; // retry finding claimable rows
} else {
log.debug("Claimed (" + numClaimed + " of " + rowUuids.size() + ") for procDefKeys '" + workerProcsList.keySet() + "'");
}
} else if (log.isTraceEnabled()) {
log.trace("NO CLAIMABLE CANDIDATES AT THIS TIME");
}

break; // no retry needed
} catch (DeadlockLoserDataAccessException e) {
if (attempts == 10) {
log.error("Caught a DeadlockLoserDataAccessException. NOT Retrying as 10 attempts have been tried already!..");
break; // give up
}
log.warn("Caught a DeadlockLoserDataAccessException. Retrying..");
continue; // retry
} catch (Throwable t) {
log.error("Unexpected exception. Not retrying..", t);
break; // abort
}
} // end while (attempts)

long timeTaken = System.currentTimeMillis() - t0;
if (timeTaken > SLOW_WARN_THRESHOLD) {
log.warn("CLAIM cws_sched_worker_proc_inst took " + timeTaken + " ms!");
}
if (numClaimed >= 1) {
log.info("worker " + workerId + " claimed " + numClaimed + " row(s).");
} else {
log.trace("no rows claimed by worker: " + workerId);
}

if (numClaimed != claimUuids.size()) {
log.error("numUpdated != claimUuids.size()");
}

Map<String, List<String>> ret = new HashMap<String, List<String>>();
ret.put("claimUuids", claimUuids);
ret.put("claimedRowUuids", claimedRowUuids);

return ret;
}
});

for (Map<String, Object> proc : unfilteredProcesses) {
String uuid = (String) proc.get("uuid");
String procDefKeyString = getProcDefKeyFromUuid(uuid);
uuidAndProcDefKeyPair.put(uuid, procDefKeyString);
}

for (Map.Entry<String,Integer> procLimit : limitsPerProcs.entrySet()) {
Set<String> keys = uuidAndProcDefKeyPair.keySet();
int applyPerProcsCap = 0;
for (String key : keys) {

if (uuidAndProcDefKeyPair.get(key).equals(procLimit.getKey())) {
applyPerProcsCap = applyPerProcsCap + 1;
if (applyPerProcsCap > procLimit.getValue()) {
clearOutUnclaimedInst.add(key);
}
}
}
}

for (String removeUuidFromList : clearOutUnclaimedInst) {
uuidAndProcDefKeyPair.remove(removeUuidFromList);
}

Set<String> uuidKeys = uuidAndProcDefKeyPair.keySet();
// after its filtered add the uuids to rowUuids arraylist
for (String key : uuidKeys) {
rowUuids.add(key);
}

// make query that uses multi limit per ProcDefkey (JOIN)
// iterate to grab 30
if (!rowUuids.isEmpty()) {
// Found some claimable rows, so now try to claim them..
//
for (String uuid : rowUuids) {
claimUuid = UUID.randomUUID().toString();
int updateCount = jdbcTemplate.update(UPDATE_CLAIMABLE_ROW_SQL,
new Object[] {workerId, claimUuid, uuid, workerId});

if (updateCount == 1) {
numClaimed++;
claimUuids.add(claimUuid);
claimedRowUuids.add(uuid);
//log.debug("CLAIMED " + claimUuid + " (uuid=" +uuid+") for procDefKey '" + procDefKeyList + "'");
}

if (numClaimed == limit) {
break; // we have claimed up to the limit, so stop claiming
}
}

if (numClaimed == 0) {
// other workers beat us to claiming the rows
log.warn("Attempted to claim " + rowUuids.size() + " rows for procDefKeys '" + workerProcsList.keySet() + "', but claimed none! " +
(attempts < 10 ? "Retrying..." : "GIVING UP!"));
continue; // retry finding claimable rows
}
else {
log.debug("Claimed (" + numClaimed + " of " + rowUuids.size() + ") for procDefKeys '" + workerProcsList.keySet() + "'");
}
}
else if (log.isTraceEnabled()) {
log.trace("NO CLAIMABLE CANDIDATES AT THIS TIME");
}

break; // no retry needed
}
catch (DeadlockLoserDataAccessException e) {
if (attempts == 10) {
log.error("Caught a DeadlockLoserDataAccessException. NOT Retrying as 10 attempts have been tried already!..");
break; // give up
}
log.warn("Caught a DeadlockLoserDataAccessException. Retrying..");
continue; // retry
}
catch (Throwable t) {
log.error("Unexpected exception. Not retrying..", t);
break; // abort
}
} // end while (attempts)

long timeTaken = System.currentTimeMillis() - t0;
if (timeTaken > SLOW_WARN_THRESHOLD) {
log.warn("CLAIM cws_sched_worker_proc_inst took " + timeTaken + " ms!");
}
if (numClaimed >= 1) {
log.info("worker " + workerId + " claimed " + numClaimed + " row(s).");
}
else {
log.trace("no rows claimed by worker: " + workerId);
}

if (numClaimed != claimUuids.size()) {
log.error("numUpdated != claimUuids.size()" );
}

Map<String,List<String>> ret = new HashMap<String,List<String>>();
ret.put("claimUuids", claimUuids);
ret.put("claimedRowUuids", claimedRowUuids);

return ret;
}


public String getProcInstRowStatus(String uuid) {
Expand Down
Loading