Merge branch 'master' of https://code.google.com/p/asterixdb into icetindil/issue_703
diff --git a/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/transaction/RecoveryIT.java b/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/transaction/RecoveryIT.java
index 44ca366..11d4d8e 100644
--- a/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/transaction/RecoveryIT.java
+++ b/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/transaction/RecoveryIT.java
@@ -14,11 +14,8 @@
*/
package edu.uci.ics.asterix.installer.transaction;
-import java.io.BufferedReader;
import java.io.File;
-import java.io.FileReader;
import java.io.FilenameFilter;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
@@ -26,7 +23,6 @@
import org.apache.commons.io.FileUtils;
import org.junit.AfterClass;
-import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -44,8 +40,6 @@
private static final String PATH_BASE = "src/test/resources/transactionts/";
private TestCaseContext tcCtx;
private static File asterixInstallerPath;
- private static File asterixAppPath;
- private static File asterixDBPath;
private static File installerTargetPath;
private static String managixHomeDirName;
private static String managixHomePath;
@@ -59,8 +53,6 @@
outdir.mkdirs();
asterixInstallerPath = new File(System.getProperty("user.dir"));
- asterixDBPath = new File(asterixInstallerPath.getParent());
- asterixAppPath = new File(asterixDBPath.getAbsolutePath() + File.separator + "asterix-app");
installerTargetPath = new File(asterixInstallerPath, "target");
managixHomeDirName = installerTargetPath.list(new FilenameFilter() {
@Override
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java
index 14d8a7e..f1eb9c6 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java
@@ -139,6 +139,9 @@
sb.append('\n');
}
}
+ } else if (line.contains("@PRINT_RECORD@")) {
+ resource.appendRecordPrinter(sb, indent, 1);
+ sb.append('\n');
} else {
sb.append(line).append('\n');
}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java
index a0f6c61..a3d065c 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java
@@ -29,18 +29,19 @@
public class RecordType {
enum Type {
- BYTE (1, "byte", "get", "put", "(byte)0xde", "TypeUtil.Byte.append"),
- SHORT (2, "short", "getShort", "putShort", "(short)0xdead", "TypeUtil.Short.append"),
- INT (4, "int", "getInt", "putInt", "0xdeadbeef", "TypeUtil.Int.append"),
- GLOBAL(8, "long", "getLong", "putLong", "0xdeadbeefdeadbeefl", "TypeUtil.Global.append");
+ BYTE (1, "byte", "get", "put", "(byte)0xde", "TypeUtil.Byte.append", "TypeUtil.Byte.appendFixed"),
+ SHORT (2, "short", "getShort", "putShort", "(short)0xdead", "TypeUtil.Short.append", "TypeUtil.Short.appendFixed"),
+ INT (4, "int", "getInt", "putInt", "0xdeadbeef", "TypeUtil.Int.append", "TypeUtil.Int.appendFixed"),
+ GLOBAL(8, "long", "getLong", "putLong", "0xdeadbeefdeadbeefl", "TypeUtil.Global.append", "TypeUtil.Global.appendFixed");
- Type(int size, String javaType, String bbGetter, String bbSetter, String deadMemInitializer, String appender) {
+ Type(int size, String javaType, String bbGetter, String bbSetter, String deadMemInitializer, String appender, String tabAppender) {
this.size = size;
this.javaType = javaType;
this.bbGetter = bbGetter;
this.bbSetter = bbSetter;
this.deadMemInitializer = deadMemInitializer;
this.appender = appender;
+ this.tabAppender = tabAppender;
}
int size;
@@ -49,6 +50,7 @@
String bbSetter;
String deadMemInitializer;
String appender;
+ String tabAppender;
}
static class Field {
@@ -223,7 +225,7 @@
sb = indent(sb, indent, level + 1);
sb.append("String msg = \"invalid value in field ")
.append(offsetName())
- .append(" of slot \" + slotNum;\n");
+ .append(" of slot \" + TypeUtil.Global.toString(slotNum);\n");
sb = indent(sb, indent, level + 1);
sb.append("throw new IllegalStateException(msg);\n");
sb = indent(sb, indent, level);
@@ -374,7 +376,7 @@
.append(");\n");
sb = indent(sb, indent, level + 1);
sb.append("sb = ")
- .append(field.type.appender)
+ .append(field.type.tabAppender)
.append("(sb, value);\n");
sb = indent(sb, indent, level + 1);
sb.append("sb.append(\" | \");\n");
@@ -385,4 +387,37 @@
}
return sb;
}
+
+ StringBuilder appendRecordPrinter(StringBuilder sb, String indent, int level) {
+ sb = indent(sb, indent, level);
+ sb.append("public StringBuilder appendRecord(StringBuilder sb, long slotNum) {\n");
+
+ sb = indent(sb, indent, level + 1);
+ sb.append("sb.append(\"{ \");\n\n");
+
+ for (int i = 0; i < fields.size(); ++i) {
+ Field field = fields.get(i);
+ if (field.accessible) {
+ if (i > 0) {
+ sb = indent(sb, indent, level + 1);
+ sb.append("sb.append(\", \");\n\n");
+ }
+ sb = indent(sb, indent, level + 1);
+ sb.append("sb.append(\"\\\"").append(field.name).append("\\\" : \\\"\");\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("sb = ").append(field.type.appender).append("(sb, ");
+ sb.append(field.methodName("get")).append("(slotNum)");
+ sb.append(");\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("sb.append(\"\\\"\");\n\n");
+ }
+ }
+ sb = indent(sb, indent, level + 1);
+ sb.append("return sb.append(\" }\");\n");
+
+ sb = indent(sb, indent, level);
+ sb.append("}");
+
+ return sb;
+ }
}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java
index 6df032e..a950d30 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java
@@ -85,7 +85,7 @@
final int refAllocId = TypeUtil.Global.allocId(slotNum);
final short curAllocId = getAllocId(slotNum);
if (refAllocId != curAllocId) {
- String msg = "reference to slot " + slotNum
+ String msg = "reference to slot " + TypeUtil.Global.toString(slotNum)
+ " of arena " + TypeUtil.Global.arenaId(slotNum)
+ " refers to version " + Integer.toHexString(refAllocId)
+ " current version is " + Integer.toHexString(curAllocId);
@@ -102,6 +102,8 @@
return get(arenaId).getAllocInfo(TypeUtil.Global.localId(slotNum));
}
+ @PRINT_RECORD@
+
public StringBuilder append(StringBuilder sb) {
for (int i = 0; i < noArenas; ++i) {
sb.append("++++ arena ").append(i).append(" ++++\n");
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/TypeUtil.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/TypeUtil.java
index 9571156..f5d9734 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/TypeUtil.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/TypeUtil.java
@@ -4,18 +4,30 @@
public static class Byte {
public static StringBuilder append(StringBuilder sb, byte b) {
+ return sb.append(String.format("%1$x", b));
+ }
+
+ public static StringBuilder appendFixed(StringBuilder sb, byte b) {
return sb.append(String.format("%1$18x", b));
}
}
public static class Short {
public static StringBuilder append(StringBuilder sb, short s) {
+ return sb.append(String.format("%1$x", s));
+ }
+
+ public static StringBuilder appendFixed(StringBuilder sb, short s) {
return sb.append(String.format("%1$18x", s));
}
}
public static class Int {
public static StringBuilder append(StringBuilder sb, int i) {
+ return sb.append(String.format("%1$x", i));
+ }
+
+ public static StringBuilder appendFixed(StringBuilder sb, int i) {
return sb.append(String.format("%1$18x", i));
}
}
@@ -25,7 +37,7 @@
public static long build(int arenaId, int allocId, int localId) {
long result = arenaId;
result = result << 48;
- result |= (allocId << 32);
+ result |= (((long)allocId) << 32);
result |= localId;
return result;
}
@@ -43,6 +55,15 @@
}
public static StringBuilder append(StringBuilder sb, long l) {
+ sb.append(String.format("%1$x", TypeUtil.Global.arenaId(l)));
+ sb.append(':');
+ sb.append(String.format("%1$x", TypeUtil.Global.allocId(l)));
+ sb.append(':');
+ sb.append(String.format("%1$x", TypeUtil.Global.localId(l)));
+ return sb;
+ }
+
+ public static StringBuilder appendFixed(StringBuilder sb, long l) {
sb.append(String.format("%1$4x", TypeUtil.Global.arenaId(l)));
sb.append(':');
sb.append(String.format("%1$4x", TypeUtil.Global.allocId(l)));
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/dataset/hints/DatasetHints.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/dataset/hints/DatasetHints.java
index e27a4cb..dcd0a87 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/dataset/hints/DatasetHints.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/dataset/hints/DatasetHints.java
@@ -44,7 +44,7 @@
return h.validateValue(value);
}
}
- return new Pair<Boolean, String>(false, "Unknwon hint :" + hintName);
+ return new Pair<Boolean, String>(false, "Unknown hint :" + hintName);
}
private static Set<IHint> hints = initHints();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index 3dc4fa6..3719726 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -17,8 +17,10 @@
import java.io.IOException;
import java.io.OutputStream;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -47,6 +49,7 @@
private static final Level LVL = Level.FINER;
public static final boolean DEBUG_MODE = false;//true
+ public static final boolean CHECK_CONSISTENCY = false;
private TransactionSubsystem txnSubsystem;
private ResourceGroupTable table;
@@ -158,15 +161,20 @@
} finally {
group.releaseLatch();
}
+
+ if (CHECK_CONSISTENCY) assertLocksCanBefoundInJobQueue();
}
private void enqueueWaiter(final ResourceGroup group, final long reqSlot, final long resSlot, final long jobSlot,
final LockAction act, ITransactionContext txnContext) throws ACIDException {
final Queue queue = act.modify ? upgrader : waiter;
- if (!introducesDeadlock(resSlot, jobSlot)) {
- queue.add(reqSlot, resSlot, jobSlot);
+ if (introducesDeadlock(resSlot, jobSlot, NOPTracker.INSTANCE)) {
+ DeadlockTracker tracker = new CollectingTracker();
+ tracker.pushJob(jobSlot);
+ introducesDeadlock(resSlot, jobSlot, tracker);
+ requestAbort(txnContext, tracker.toString());
} else {
- requestAbort(txnContext);
+ queue.add(reqSlot, resSlot, jobSlot);
}
try {
group.await(txnContext);
@@ -175,6 +183,64 @@
}
}
+ interface DeadlockTracker {
+ void pushResource(long resSlot);
+ void pushRequest(long reqSlot);
+ void pushJob(long jobSlot);
+ void pop();
+ }
+
+ static class NOPTracker implements DeadlockTracker {
+ static final DeadlockTracker INSTANCE = new NOPTracker();
+
+ public void pushResource(long resSlot) {}
+ public void pushRequest(long reqSlot) {}
+ public void pushJob(long jobSlot) {}
+ public void pop() {}
+ }
+
+ static class CollectingTracker implements DeadlockTracker {
+ ArrayList<Long> slots = new ArrayList<Long>();
+ ArrayList<String> types = new ArrayList<String>();
+
+ @Override
+ public void pushResource(long resSlot) {
+ types.add("Resource");
+ slots.add(resSlot);
+ System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+ }
+
+ @Override
+ public void pushRequest(long reqSlot) {
+ types.add("Request");
+ slots.add(reqSlot);
+ System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+ }
+
+ @Override
+ public void pushJob(long jobSlot) {
+ types.add("Job");
+ slots.add(jobSlot);
+ System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+ }
+
+ @Override
+ public void pop() {
+ System.err.println("pop " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+ types.remove(types.size() - 1);
+ slots.remove(slots.size() - 1);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < slots.size(); ++i) {
+ sb.append(types.get(i) + " " + slots.get(i) + "\n");
+ }
+ return sb.toString();
+ }
+ }
+
/**
* determine if adding a job to the waiters of a resource will introduce a
* cycle in the wait-graph where the job waits on itself
@@ -185,11 +251,15 @@
* the slot that contains the information about the job
* @return true if a cycle would be introduced, false otherwise
*/
- private boolean introducesDeadlock(final long resSlot, final long jobSlot) {
+ private boolean introducesDeadlock(final long resSlot, final long jobSlot,
+ final DeadlockTracker tracker) {
synchronized (jobArenaMgr) {
+ tracker.pushResource(resSlot);
long reqSlot = resArenaMgr.getLastHolder(resSlot);
while (reqSlot >= 0) {
- long holderJobSlot = reqArenaMgr.getJobSlot(reqSlot);
+ tracker.pushRequest(reqSlot);
+ final long holderJobSlot = reqArenaMgr.getJobSlot(reqSlot);
+ tracker.pushJob(holderJobSlot);
if (holderJobSlot == jobSlot) {
return true;
}
@@ -197,7 +267,7 @@
long waiter = jobArenaMgr.getLastWaiter(holderJobSlot);
while (waiter >= 0) {
long watingOnResSlot = reqArenaMgr.getResourceId(waiter);
- if (introducesDeadlock(watingOnResSlot, jobSlot)) {
+ if (introducesDeadlock(watingOnResSlot, jobSlot, tracker)) {
return true;
}
waiter = reqArenaMgr.getNextJobRequest(waiter);
@@ -206,12 +276,15 @@
waiter = jobArenaMgr.getLastUpgrader(holderJobSlot);
}
}
+ tracker.pop(); // job
+ tracker.pop(); // request
reqSlot = reqArenaMgr.getNextRequest(reqSlot);
}
+ tracker.pop(); // resource
return false;
}
}
-
+
@Override
public void instantLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
throws ACIDException {
@@ -409,7 +482,9 @@
if (resource < 0) {
throw new IllegalStateException("resource (" + dsId + ", " + entityHashValue + ") not found");
}
-
+
+ if (CHECK_CONSISTENCY) assertLocksCanBefoundInJobQueue();
+
long holder = removeLastHolder(resource, jobSlot, lockMode);
// deallocate request
@@ -479,7 +554,6 @@
jobArenaMgr.deallocate(jobSlot);
jobIdSlotMap.remove(jobId);
stats.logCounters(LOGGER, Level.INFO, true);
- //LOGGER.info(toString());
}
private long findOrAllocJobSlot(int jobId) {
@@ -607,7 +681,6 @@
if (holder < 0) {
throw new IllegalStateException("no holder for resource " + resource);
}
-
// remove from the list of holders for a resource
if (requestMatches(holder, jobSlot, lockMode)) {
// if the head of the queue matches, we need to update the resource
@@ -619,7 +692,7 @@
synchronized (jobArenaMgr) {
// remove from the list of requests for a job
- long newHead = removeRequestFromJob(jobSlot, holder);
+ long newHead = removeRequestFromJob(holder, jobArenaMgr.getLastHolder(jobSlot));
jobArenaMgr.setLastHolder(jobSlot, newHead);
}
return holder;
@@ -630,7 +703,7 @@
&& (lockMode == LockMode.ANY || lockMode == reqArenaMgr.getLockMode(holder));
}
- private long removeRequestFromJob(long jobSlot, long holder) {
+ private long removeRequestFromJob(long holder, long unmodified) {
long prevForJob = reqArenaMgr.getPrevJobRequest(holder);
long nextForJob = reqArenaMgr.getNextJobRequest(holder);
if (nextForJob != -1) {
@@ -640,7 +713,7 @@
return nextForJob;
} else {
reqArenaMgr.setNextJobRequest(prevForJob, nextForJob);
- return -1;
+ return unmodified;
}
}
@@ -676,7 +749,7 @@
}
synchronized (jobArenaMgr) {
// remove from the list of requests for a job
- long newHead = removeRequestFromJob(job, waiter);
+ long newHead = removeRequestFromJob(waiter, jobArenaMgr.getLastWaiter(job));
jobArenaMgr.setLastWaiter(job, newHead);
}
}
@@ -708,7 +781,7 @@
}
synchronized (jobArenaMgr) {
// remove from the list of requests for a job
- long newHead = removeRequestFromJob(job, upgrader);
+ long newHead = removeRequestFromJob(upgrader, jobArenaMgr.getLastUpgrader(job));
jobArenaMgr.setLastUpgrader(job, newHead);
}
}
@@ -751,7 +824,7 @@
/**
* remove the first request for a given job and lock mode from a request queue.
- * If the value of the parameter lockMode is LockMode.NL the first request
+ * If the value of the parameter lockMode is LockMode.ANY the first request
* for the job is removed - independent of the LockMode.
*
* @param head
@@ -796,6 +869,8 @@
case GET:
break;
case WAIT:
+ case CONV:
+ case ERR:
throw new IllegalStateException("incompatible locks in holder queue");
}
holder = reqArenaMgr.getNextRequest(holder);
@@ -808,6 +883,24 @@
&& resArenaMgr.getFirstWaiter(resource) == -1;
}
+ private void validateJob(ITransactionContext txnContext) throws ACIDException {
+ if (txnContext.getTxnState() == ITransactionManager.ABORTED) {
+ throw new ACIDException("" + txnContext.getJobId() + " is in ABORTED state.");
+ } else if (txnContext.isTimeout()) {
+ requestAbort(txnContext, "timeout");
+ }
+ }
+
+ private void requestAbort(ITransactionContext txnContext, String msg) throws ACIDException {
+ txnContext.setTimeout(true);
+ throw new ACIDException("Transaction " + txnContext.getJobId()
+ + " should abort (requested by the Lock Manager)" + ":\n" + msg);
+ }
+
+ /*
+ * Debugging support
+ */
+
private void log(String string, int id, int entityHashValue, byte lockMode, ITransactionContext txnContext) {
if (! LOGGER.isLoggable(LVL)) {
return;
@@ -826,24 +919,102 @@
if (txnContext != null) {
sb.append(" , jobId : ").append(txnContext.getJobId());
}
+ sb.append(" , thread : ").append(Thread.currentThread().getName());
sb.append(" }");
LOGGER.log(LVL, sb.toString());
}
- private void validateJob(ITransactionContext txnContext) throws ACIDException {
- if (txnContext.getTxnState() == ITransactionManager.ABORTED) {
- throw new ACIDException("" + txnContext.getJobId() + " is in ABORTED state.");
- } else if (txnContext.isTimeout()) {
- requestAbort(txnContext);
+ private void assertLocksCanBefoundInJobQueue() throws ACIDException {
+ for (int i = 0; i < ResourceGroupTable.TABLE_SIZE; ++i) {
+ final ResourceGroup group = table.get(i);
+ if (group.tryLatch(100, TimeUnit.MILLISECONDS)) {
+ try {
+ long resSlot = group.firstResourceIndex.get();
+ while (resSlot != -1) {
+ int dsId = resArenaMgr.getDatasetId(resSlot);
+ int entityHashValue = resArenaMgr.getPkHashVal(resSlot);
+ long reqSlot = resArenaMgr.getLastHolder(resSlot);
+ while (reqSlot != -1) {
+ byte lockMode = (byte) reqArenaMgr.getLockMode(reqSlot);
+ long jobSlot = reqArenaMgr.getJobSlot(reqSlot);
+ int jobId = jobArenaMgr.getJobId(jobSlot);
+ assertLockCanBeFoundInJobQueue(dsId, entityHashValue, lockMode, jobId);
+ reqSlot = reqArenaMgr.getNextRequest(reqSlot);
+ }
+ resSlot = resArenaMgr.getNext(resSlot);
+ }
+ } finally {
+ group.releaseLatch();
+ }
+ } else {
+ LOGGER.warning("Could not check locks for " + group);
+ }
+ }
+ }
+
+ private void assertLockCanBeFoundInJobQueue(int dsId, int entityHashValue, byte lockMode, int jobId) {
+ if (findLockInJobQueue(dsId, entityHashValue, jobId, lockMode) == -1) {
+ String msg = "request for " + LockMode.toString(lockMode) + " lock on dataset " + dsId + " entity "
+ + entityHashValue + " not found for job " + jobId + " in thread " + Thread.currentThread().getName();
+ LOGGER.severe(msg);
+ throw new IllegalStateException(msg);
}
}
- private void requestAbort(ITransactionContext txnContext) throws ACIDException {
- txnContext.setTimeout(true);
- throw new ACIDException("Transaction " + txnContext.getJobId()
- + " should abort (requested by the Lock Manager)");
+ /**
+ * tries to find a lock request searching though the job queue
+ * @param dsId dataset id
+ * @param entityHashValue primary key hash value
+ * @param jobId job id
+ * @param lockMode lock mode
+ * @return the slot of the request, if the lock request is found, -1 otherwise
+ */
+ private long findLockInJobQueue(final int dsId, final int entityHashValue, final int jobId, byte lockMode) {
+ Long jobSlot = jobIdSlotMap.get(jobId);
+ if (jobSlot == null) {
+ return -1;
+ }
+
+ long holder;
+ synchronized (jobArenaMgr) {
+ holder = jobArenaMgr.getLastHolder(jobSlot);
+ }
+ while (holder != -1) {
+ long resource = reqArenaMgr.getResourceId(holder);
+ if (dsId == resArenaMgr.getDatasetId(resource)
+ && entityHashValue == resArenaMgr.getPkHashVal(resource)
+ && jobSlot == reqArenaMgr.getJobSlot(holder)
+ && (lockMode == reqArenaMgr.getLockMode(holder)
+ || lockMode == LockMode.ANY)) {
+ return holder;
+ }
+ synchronized (jobArenaMgr) {
+ holder = reqArenaMgr.getNextJobRequest(holder);
+ }
+ }
+ return -1;
}
+ private String resQueueToString(long resSlot) {
+ return appendResQueue(new StringBuilder(), resSlot).toString();
+ }
+
+ private StringBuilder appendResQueue(StringBuilder sb, long resSlot) {
+ resArenaMgr.appendRecord(sb, resSlot);
+ sb.append("\n");
+ appendReqQueue(sb, resArenaMgr.getLastHolder(resSlot));
+ return sb;
+ }
+
+ private StringBuilder appendReqQueue(StringBuilder sb, long head) {
+ while (head != -1) {
+ reqArenaMgr.appendRecord(sb, head);
+ sb.append("\n");
+ head = reqArenaMgr.getNextRequest(head);
+ }
+ return sb;
+ }
+
public StringBuilder append(StringBuilder sb) {
table.getAllLatches();
try {
@@ -954,12 +1125,17 @@
table[i] = new ResourceGroup();
}
}
+
ResourceGroup get(int dId, int entityHashValue) {
// TODO ensure good properties of hash function
int h = Math.abs(dId ^ entityHashValue);
if (h < 0) h = 0;
return table[h % TABLE_SIZE];
}
+
+ ResourceGroup get(int i) {
+ return table[i];
+ }
public void getAllLatches() {
for (int i = 0; i < TABLE_SIZE; ++i) {
@@ -1006,6 +1182,16 @@
log("latch");
latch.writeLock().lock();
}
+
+ boolean tryLatch(long timeout, TimeUnit unit) throws ACIDException {
+ log("tryLatch");
+ try {
+ return latch.writeLock().tryLock(timeout, unit);
+ } catch (InterruptedException e) {
+ LOGGER.finer("interrupted while wating on ResourceGroup");
+ throw new ACIDException("interrupted", e);
+ }
+ }
void releaseLatch() {
log("release");
@@ -1038,7 +1224,7 @@
}
public String toString() {
- return "{ id : " + hashCode() + ", first : " + firstResourceIndex.toString() + ", waiters : "
+ return "{ id : " + hashCode() + ", first : " + TypeUtil.Global.toString(firstResourceIndex.get()) + ", waiters : "
+ (hasWaiters() ? "true" : "false") + " }";
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
index 8f3e6df..bd05983 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
@@ -24,7 +24,6 @@
import org.apache.commons.io.FileUtils;
-import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
@@ -70,7 +69,7 @@
long defaultWaitTime;
public LockRequestController(String requestFileName) throws ACIDException, AsterixException {
- this.txnProvider = new TransactionSubsystem("nc1", null, new AsterixTransactionProperties(
+ this.txnProvider = new TransactionSubsystem("nc1", new TestRuntimeContextProvider(), new AsterixTransactionProperties(
new AsterixPropertiesAccessor()));
this.workerReadyQueue = new WorkerReadyQueue();
this.requestList = new ArrayList<LockRequest>();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Request.json b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Request.json
index 0c4fa71..d081965 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Request.json
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Request.json
@@ -3,23 +3,28 @@
"fields" : [
{
"name" : "resource id",
- "type" : "GLOBAL"
+ "type" : "GLOBAL",
+ "initial" : "-1"
},
{
"name" : "job slot",
- "type" : "GLOBAL"
+ "type" : "GLOBAL",
+ "initial" : "-1"
},
{
"name" : "prev job request",
- "type" : "GLOBAL"
+ "type" : "GLOBAL",
+ "initial" : "-1"
},
{
"name" : "next job request",
- "type" : "GLOBAL"
+ "type" : "GLOBAL",
+ "initial" : "-1"
},
{
"name" : "next request",
- "type" : "GLOBAL"
+ "type" : "GLOBAL",
+ "initial" : "-1"
},
{
"name" : "lock mode",
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
new file mode 100644
index 0000000..cf519ea
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2014 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.util.List;
+import java.util.concurrent.Executors;
+
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
+import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
+import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
+
+class TestRuntimeContextProvider implements IAsterixAppRuntimeContextProvider {
+
+ AsterixThreadExecutor ate = new AsterixThreadExecutor(Executors.defaultThreadFactory());
+ IIndexLifecycleManager ilm = new IndexLifecycleManager();
+
+ @Override
+ public AsterixThreadExecutor getThreadExecutor() {
+ return ate;
+ }
+
+ @Override
+ public IBufferCache getBufferCache() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IFileMapProvider getFileMapManager() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ITransactionSubsystem getTransactionSubsystem() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IIndexLifecycleManager getIndexLifecycleManager() {
+ return ilm;
+ }
+
+ @Override
+ public double getBloomFilterFalsePositiveRate() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ILSMIOOperationScheduler getLSMIOScheduler() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ILocalResourceRepository getLocalResourceRepository() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ResourceIdFactory getResourceIdFactory() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IIOManager getIOManager() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IAsterixAppRuntimeContext getAppContext() {
+ throw new UnsupportedOperationException();
+ }
+
+ static class IndexLifecycleManager implements IIndexLifecycleManager {
+
+ @Override
+ public IIndex getIndex(long resourceID) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void register(long resourceID, IIndex index) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void unregister(long resourceID) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void open(long resourceID) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close(long resourceID) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<IIndex> getOpenIndexes() {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 1e34df6..a7fb73b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -697,22 +697,12 @@
.getIndex(logRecord.getResourceId());
ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
- if (logRecord.getResourceType() == ResourceType.LSM_BTREE) {
- if (logRecord.getOldOp() != IndexOperation.NOOP.ordinal()) {
- if (logRecord.getOldOp() == IndexOperation.DELETE.ordinal()) {
- indexAccessor.forceDelete(logRecord.getOldValue());
- } else {
- indexAccessor.forceInsert(logRecord.getOldValue());
- }
- } else {
- indexAccessor.forcePhysicalDelete(logRecord.getNewValue());
- }
+ if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
+ indexAccessor.forceDelete(logRecord.getNewValue());
+ } else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) {
+ indexAccessor.forceInsert(logRecord.getNewValue());
} else {
- if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) {
- indexAccessor.forceInsert(logRecord.getNewValue());
- } else {
- indexAccessor.forceDelete(logRecord.getNewValue());
- }
+ throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp());
}
} catch (Exception e) {
throw new IllegalStateException("Failed to undo", e);