initialize newly allocated job slots
fix infinite loop in determineNewMaxMode
clean up waiters/upgraders when waiting ends
better display of Record/AreaManagers for debugging
add toString for LockRequest and LockRequestWorker for debugging
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java
index 5212885..1606afc 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java
@@ -140,16 +140,18 @@
}
StringBuilder appendInitializers(StringBuilder sb, String indent, int level) {
+ sb = indent(sb, indent, level);
+ sb.append("bb.")
+ .append(bbSetter(type))
+ .append("(slotNum * ITEM_SIZE + ")
+ .append(offsetName())
+ .append(", ");
if (initial != null) {
- sb = indent(sb, indent, level);
- sb.append("bb.")
- .append(bbSetter(type))
- .append("(currentSlot * ITEM_SIZE + ")
- .append(offsetName())
- .append(", ")
- .append(initial)
- .append(");\n");
+ sb.append(initial);
+ } else {
+ sb.append(deadMemInitializer(type));
}
+ sb.append(");\n");
return sb;
}
@@ -231,6 +233,15 @@
}
}
+ static String deadMemInitializer(Type t) {
+ switch(t) {
+ case BYTE: return "0xde";
+ case SHORT: return "0xdead";
+ case INT: return "0xdeadbeef";
+ default: throw new IllegalArgumentException();
+ }
+ }
+
static String padRight(String s, int n) {
return String.format("%1$-" + n + "s", s);
}
@@ -270,7 +281,7 @@
.append(padRight(field.name, maxNameWidth))
.append(" | \");\n");
sb = indent(sb, indent, level);
- sb.append("for (int i = 0; i < occupiedSlots; ++i) {\n");
+ sb.append("for (int i = 0; i < NO_SLOTS; ++i) {\n");
sb = indent(sb, indent, level + 1);
sb.append(name(field.type))
.append(" value = bb.")
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
index c8c64ab..01df3f0 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
@@ -215,14 +215,15 @@
}
public int allocate() {
- int currentSlot = freeSlotNum;
- freeSlotNum = getNextFreeSlot(currentSlot);
+ int slotNum = freeSlotNum;
+ freeSlotNum = getNextFreeSlot(slotNum);
@INIT_SLOT@
occupiedSlots++;
- return currentSlot;
+ return slotNum;
}
public void deallocate(int slotNum) {
+ @INIT_SLOT@
setNextFreeSlot(slotNum, freeSlotNum);
freeSlotNum = slotNum;
occupiedSlots--;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index d34d301..b1507eb 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -124,7 +124,8 @@
case WAIT:
// TODO can we have more than on upgrader? Or do we need to
// abort if we get a second upgrader?
- if (findLastHolderForJob(resSlot, jobSlot) != -1) {
+ boolean upgrade = findLastHolderForJob(resSlot, jobSlot) != -1;
+ if (upgrade) {
addUpgrader(reqSlot, resSlot, jobSlot);
} else {
if (! introducesDeadlock(resSlot, jobSlot)) {
@@ -135,6 +136,11 @@
}
try {
group.await();
+ if (upgrade) {
+ removeUpgrader(reqSlot, resSlot, jobSlot);
+ } else {
+ removeWaiter(reqSlot, resSlot, jobSlot);
+ }
} catch (InterruptedException e) {
throw new ACIDException(txnContext, "interrupted", e);
}
@@ -251,44 +257,9 @@
int jobId = txnContext.getJobId().getId();
int jobSlot = getJobSlot(jobId);
- // since locking is properly nested, finding the last holder is good enough
-
- int holder = resArenaMgr.getLastHolder(resource);
- if (holder < 0) {
- throw new IllegalStateException("no holder for resource " + resource);
- }
-
- // remove from the list of holders
- if (reqArenaMgr.getJobId(holder) == jobSlot) {
- int next = reqArenaMgr.getNextRequest(holder);
- resArenaMgr.setLastHolder(resource, next);
- } else {
- int prev = holder;
- while (prev != -1) {
- holder = reqArenaMgr.getNextRequest(prev);
- if (holder == -1) {
- throw new IllegalStateException("no holder for resource " + resource + " and job " + jobId);
- }
- if (jobSlot == reqArenaMgr.getJobId(holder)) {
- break;
- }
- prev = holder;
- }
- int next = reqArenaMgr.getNextRequest(holder);
- reqArenaMgr.setNextRequest(prev, next);
- }
-
- // remove from the list of requests for a job
- int prevForJob = reqArenaMgr.getPrevJobRequest(holder);
- int nextForJob = reqArenaMgr.getNextJobRequest(holder);
- if (prevForJob == -1) {
- jobArenaMgr.setLastHolder(jobSlot, nextForJob);
- } else {
- reqArenaMgr.setNextJobRequest(prevForJob, nextForJob);
- }
- if (nextForJob != -1) {
- reqArenaMgr.setPrevJobRequest(nextForJob, prevForJob);
- }
+
+ // since locking is properly nested, finding the last holder for a job is good enough
+ int holder = removeLastHolder(resource, jobSlot);
// deallocate request
reqArenaMgr.deallocate(holder);
@@ -342,6 +313,7 @@
Integer jobSlot = jobIdSlotMap.get(jobId);
if (jobSlot == null) {
jobSlot = new Integer(jobArenaMgr.allocate());
+ jobArenaMgr.setJobId(jobSlot, jobId);
Integer oldSlot = jobIdSlotMap.putIfAbsent(jobId, jobSlot);
if (oldSlot != null) {
// if another thread allocated a slot for this jobId between
@@ -412,7 +384,42 @@
lastHolder = jobArenaMgr.getLastHolder(job);
insertIntoJobQueue(request, lastHolder);
- jobArenaMgr.setLastHolder(resource, request);
+ jobArenaMgr.setLastHolder(job, request);
+ }
+
+ private int removeLastHolder(int resource, int jobSlot) {
+ int holder = resArenaMgr.getLastHolder(resource);
+ if (holder < 0) {
+ throw new IllegalStateException("no holder for resource " + resource);
+ }
+
+ // remove from the list of holders for a resource
+ if (reqArenaMgr.getJobId(holder) == jobSlot) {
+ // if the head of the queue matches, we need to update the resource
+ int next = reqArenaMgr.getNextRequest(holder);
+ resArenaMgr.setLastHolder(resource, next);
+ } else {
+ holder = removeRequestFromQueueForJob(holder, jobSlot);
+ }
+
+ // remove from the list of requests for a job
+ int newHead = removeRequestFromJob(jobSlot, holder);
+ jobArenaMgr.setLastHolder(jobSlot, newHead);
+ return holder;
+ }
+
+ private int removeRequestFromJob(int jobSlot, int holder) {
+ int prevForJob = reqArenaMgr.getPrevJobRequest(holder);
+ int nextForJob = reqArenaMgr.getNextJobRequest(holder);
+ if (nextForJob != -1) {
+ reqArenaMgr.setPrevJobRequest(nextForJob, prevForJob);
+ }
+ if (prevForJob == -1) {
+ return nextForJob;
+ } else {
+ reqArenaMgr.setNextJobRequest(prevForJob, nextForJob);
+ return -1;
+ }
}
private void addWaiter(int request, int resource, int job) {
@@ -429,6 +436,19 @@
jobArenaMgr.setLastWaiter(job, request);
}
+ private void removeWaiter(int request, int resource, int job) {
+ int waiter = resArenaMgr.getFirstWaiter(resource);
+ if (waiter == request) {
+ int next = reqArenaMgr.getNextRequest(waiter);
+ resArenaMgr.setFirstWaiter(resource, next);
+ } else {
+ waiter = removeRequestFromQueueForSlot(waiter, request);
+ }
+ // remove from the list of requests for a job
+ int newHead = removeRequestFromJob(job, waiter);
+ jobArenaMgr.setLastWaiter(job, newHead);
+ }
+
private void addUpgrader(int request, int resource, int job) {
int upgrader = resArenaMgr.getFirstUpgrader(resource);
reqArenaMgr.setNextRequest(request, -1);
@@ -443,6 +463,19 @@
jobArenaMgr.setLastUpgrader(job, request);
}
+ private void removeUpgrader(int request, int resource, int job) {
+ int upgrader = resArenaMgr.getFirstUpgrader(resource);
+ if (upgrader == request) {
+ int next = reqArenaMgr.getNextRequest(upgrader);
+ resArenaMgr.setFirstUpgrader(resource, next);
+ } else {
+ upgrader = removeRequestFromQueueForSlot(upgrader, request);
+ }
+ // remove from the list of requests for a job
+ int newHead = removeRequestFromJob(job, upgrader);
+ jobArenaMgr.setLastUpgrader(job, newHead);
+ }
+
private void insertIntoJobQueue(int newRequest, int oldRequest) {
reqArenaMgr.setNextJobRequest(newRequest, oldRequest);
reqArenaMgr.setPrevJobRequest(newRequest, -1);
@@ -459,6 +492,54 @@
}
reqArenaMgr.setNextRequest(head, appendee);
}
+
+ /**
+ *
+ * @param head
+ * @param reqSlot
+ * @return
+ */
+ private int removeRequestFromQueueForSlot(int head, int reqSlot) {
+ int cur = head;
+ int prev = cur;
+ while (prev != -1) {
+ cur = reqArenaMgr.getNextRequest(prev);
+ if (cur == -1) {
+ throw new IllegalStateException("request " + reqSlot+ " not in queue");
+ }
+ if (cur == reqSlot) {
+ break;
+ }
+ prev = cur;
+ }
+ int next = reqArenaMgr.getNextRequest(cur);
+ reqArenaMgr.setNextRequest(prev, next);
+ return cur;
+ }
+
+ /**
+ * remove the first request for a given job from a request queue
+ * @param head the head of the request queue
+ * @param jobSlot the job slot
+ * @return the slot of the first request that matched the given job
+ */
+ private int removeRequestFromQueueForJob(int head, int jobSlot) {
+ int holder = head;
+ int prev = holder;
+ while (prev != -1) {
+ holder = reqArenaMgr.getNextRequest(prev);
+ if (holder == -1) {
+ throw new IllegalStateException("no entry for job " + jobSlot + " in queue");
+ }
+ if (jobSlot == reqArenaMgr.getJobId(holder)) {
+ break;
+ }
+ prev = holder;
+ }
+ int next = reqArenaMgr.getNextRequest(holder);
+ reqArenaMgr.setNextRequest(prev, next);
+ return holder;
+ }
private int determineNewMaxMode(int resource, int oldMaxMode) {
int newMaxMode = LockMode.NL;
@@ -478,6 +559,7 @@
case WAIT:
throw new IllegalStateException("incompatible locks in holder queue");
}
+ holder = reqArenaMgr.getNextRequest(holder);
}
return newMaxMode;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
index 2426927..f5858e2 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
@@ -513,6 +513,18 @@
public void log(String s) {
System.out.println(s);
}
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{ t : \"").append(threadName).append("\", r : ");
+ if (lockRequest == null) {
+ sb.append("null");
+ } else {
+ sb.append("\"").append(lockRequest.toString()).append("\"");
+ }
+ sb.append(" }");
+ return sb.toString();
+ }
}
class WorkerReadyQueue {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
index e6f2798..f06edc8 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
@@ -555,6 +555,11 @@
this.entityHashValue = waitTime;
}
+ @Override
+ public String toString() {
+ return prettyPrint();
+ }
+
public String prettyPrint() {
StringBuilder s = new StringBuilder();
//s.append(threadName.charAt(7)).append("\t").append("\t");
@@ -595,23 +600,7 @@
}
s.append("\tJ").append(txnContext.getJobId().getId()).append("\tD").append(datasetIdObj.getId()).append("\tE")
.append(entityHashValue).append("\t");
- switch (lockMode) {
- case LockMode.S:
- s.append("S");
- break;
- case LockMode.X:
- s.append("X");
- break;
- case LockMode.IS:
- s.append("IS");
- break;
- case LockMode.IX:
- s.append("IX");
- break;
- default:
- throw new UnsupportedOperationException("Unsupported lock mode");
- }
- s.append("\n");
+ s.append(LockMode.toString(lockMode)).append("\n");
return s.toString();
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
index 28af321..1a73fe0 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
@@ -36,6 +36,17 @@
public static final byte IX = 2;
public static final byte S = 3;
public static final byte X = 4;
+
+ public static String toString(byte mode) {
+ switch (mode) {
+ case NL: return "NL";
+ case IS: return "IS";
+ case IX: return "IX";
+ case S: return "S";
+ case X: return "X";
+ default: throw new IllegalArgumentException("no such lock mode");
+ }
+ }
}
}