- introduce (logical) type for local and global pointers (slots)
- global pointers are 64-bit now
- synchronize allocation and deallocation in RecordManagers
- (over) synchronized modification of job queues
- check that references are still valid when accessed (using an
allocation counter)
- some more debugging support
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java
index bd8dee4..1973584 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java
@@ -59,20 +59,29 @@
System.out.println(sb.toString());
}
- public static void generateSource(Manager mgr, RecordType rec, InputStream is, StringBuilder sb) {
+ public static void generateSource(
+ Manager mgr,
+ RecordType rec,
+ InputStream is,
+ StringBuilder sb,
+ boolean debug) {
switch (mgr) {
case RECORD:
- generateMemoryManagerSource(rec, is, sb);
+ generateMemoryManagerSource(rec, is, sb, debug);
break;
case ARENA:
- generateArenaManagerSource(rec, is, sb);
+ generateArenaManagerSource(rec, is, sb, debug);
break;
default:
throw new IllegalArgumentException();
}
}
- private static void generateMemoryManagerSource(RecordType resource, InputStream is, StringBuilder sb) {
+ private static void generateMemoryManagerSource(
+ RecordType resource,
+ InputStream is,
+ StringBuilder sb,
+ boolean debug) {
BufferedReader in = new BufferedReader(new InputStreamReader(is));
String line = null;
@@ -118,7 +127,11 @@
}
}
- private static void generateArenaManagerSource(RecordType resource, InputStream is, StringBuilder sb) {
+ private static void generateArenaManagerSource(
+ RecordType resource,
+ InputStream is,
+ StringBuilder sb,
+ boolean debug) {
BufferedReader in = new BufferedReader(new InputStreamReader(is));
String line = null;
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java
index 2768916..8a7965a 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java
@@ -31,6 +31,8 @@
* @requiresDependencyResolution compile
*/
public class RecordManagerGeneratorMojo extends AbstractMojo {
+
+ private static final boolean DEBUG = true;
/**
* parameter injected from pom.xml
*
@@ -64,31 +66,34 @@
typeMap = new HashMap<String, RecordType>();
RecordType resource = new RecordType("Resource");
- resource.addField("last holder", RecordType.Type.INT, "-1");
- resource.addField("first waiter", RecordType.Type.INT, "-1");
- resource.addField("first upgrader", RecordType.Type.INT, "-1");
+ resource.addField("last holder", RecordType.Type.GLOBAL, "-1");
+ resource.addField("first waiter", RecordType.Type.GLOBAL, "-1");
+ resource.addField("first upgrader", RecordType.Type.GLOBAL, "-1");
+ resource.addField("next", RecordType.Type.GLOBAL, null);
resource.addField("max mode", RecordType.Type.INT, "LockMode.NL");
resource.addField("dataset id", RecordType.Type.INT, null);
resource.addField("pk hash val", RecordType.Type.INT, null);
- resource.addField("next", RecordType.Type.INT, null);
+ if (DEBUG) resource.addField("alloc id", RecordType.Type.SHORT, null, true);
typeMap.put(resource.name, resource);
RecordType request = new RecordType("Request");
- request.addField("resource id", RecordType.Type.INT, null);
+ request.addField("resource id", RecordType.Type.GLOBAL, null);
+ request.addField("job slot", RecordType.Type.GLOBAL, null);
+ request.addField("prev job request", RecordType.Type.GLOBAL, null);
+ request.addField("next job request", RecordType.Type.GLOBAL, null);
+ request.addField("next request", RecordType.Type.GLOBAL, null);
request.addField("lock mode", RecordType.Type.INT, null);
- request.addField("job id", RecordType.Type.INT, null);
- request.addField("prev job request", RecordType.Type.INT, null);
- request.addField("next job request", RecordType.Type.INT, null);
- request.addField("next request", RecordType.Type.INT, null);
+ if (DEBUG) request.addField("alloc id", RecordType.Type.SHORT, null, true);
typeMap.put(request.name, request);
RecordType job = new RecordType("Job");
+ job.addField("last holder", RecordType.Type.GLOBAL, "-1");
+ job.addField("last waiter", RecordType.Type.GLOBAL, "-1");
+ job.addField("last upgrader", RecordType.Type.GLOBAL, "-1");
job.addField("job id", RecordType.Type.INT, null);
- job.addField("last holder", RecordType.Type.INT, "-1");
- job.addField("last waiter", RecordType.Type.INT, "-1");
- job.addField("last upgrader", RecordType.Type.INT, "-1");
+ if (DEBUG) job.addField("alloc id", RecordType.Type.SHORT, null, true);
typeMap.put(job.name, job);
}
@@ -119,7 +124,7 @@
}
StringBuilder sb = new StringBuilder();
- Generator.generateSource(mgrType, typeMap.get(recordType), is, sb);
+ Generator.generateSource(mgrType, typeMap.get(recordType), is, sb, DEBUG);
is.close();
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java
index 26ca4c4..97e10d4 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java
@@ -22,7 +22,9 @@
enum Type {
BYTE,
SHORT,
- INT
+ INT,
+ LOCAL,
+ GLOBAL
}
static class Field {
@@ -31,12 +33,14 @@
Type type;
String initial;
int offset;
+ boolean debugField;
- Field(String name, Type type, String initial, int offset) {
+ Field(String name, Type type, String initial, int offset, boolean debugField) {
this.name = name;
this.type = type;
this.initial = initial;
this.offset = offset;
+ this.debugField = debugField;
}
String methodName(String prefix) {
@@ -54,7 +58,7 @@
StringBuilder appendMemoryManagerGetMethod(StringBuilder sb, String indent, int level) {
sb = indent(sb, indent, level);
sb.append("public ")
- .append(name(type))
+ .append(javaType(type))
.append(' ')
.append(methodName("get"))
.append("(int slotNum) {\n");
@@ -80,7 +84,7 @@
sb.append("public void ")
.append(methodName("set"))
.append("(int slotNum, ")
- .append(name(type))
+ .append(javaType(type))
.append(" value) {\n");
sb = indent(sb, indent, level + 1);
sb.append("final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;\n");
@@ -95,30 +99,21 @@
return sb;
}
- StringBuilder appendArenaManagerSetThreadLocal(StringBuilder sb, String indent, int level) {
- sb = indent(sb, indent, level);
- sb.append("final int arenaId = arenaId(slotNum);\n");
- sb = indent(sb, indent, level);
- sb.append("if (arenaId != local.get().arenaId) {\n");
- sb = indent(sb, indent, level + 1);
- sb.append("local.get().arenaId = arenaId;\n");
- sb = indent(sb, indent, level + 1);
- sb.append("local.get().mgr = get(arenaId);\n");
- sb = indent(sb, indent, level);
- sb.append("}\n");
- return sb;
- }
-
StringBuilder appendArenaManagerGetMethod(StringBuilder sb, String indent, int level) {
sb = indent(sb, indent, level);
sb.append("public ")
- .append(name(type))
+ .append(javaType(type))
.append(' ')
.append(methodName("get"))
- .append("(int slotNum) {\n");
- sb = appendArenaManagerSetThreadLocal(sb, indent, level + 1);
+ .append("(long slotNum) {\n");
+ if (! debugField) {
+ sb = indent(sb, indent, level + 1);
+ sb.append("if (TRACK_ALLOC) checkSlot(slotNum);\n");
+ }
sb = indent(sb, indent, level + 1);
- sb.append("return local.get().mgr.")
+ sb.append("final int arenaId = arenaId(slotNum);\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("return get(arenaId).")
.append(methodName("get"))
.append("(localId(slotNum));\n");
sb = indent(sb, indent, level);
@@ -130,12 +125,17 @@
sb = indent(sb, indent, level);
sb.append("public void ")
.append(methodName("set"))
- .append("(int slotNum, ")
- .append(name(type))
+ .append("(long slotNum, ")
+ .append(javaType(type))
.append(" value) {\n");
- sb = appendArenaManagerSetThreadLocal(sb, indent, level + 1);
+ if (! debugField) {
+ sb = indent(sb, indent, level + 1);
+ sb.append("if (TRACK_ALLOC) checkSlot(slotNum);\n");
+ }
sb = indent(sb, indent, level + 1);
- sb.append("local.get().mgr.")
+ sb.append("final int arenaId = arenaId(slotNum);\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("get(arenaId).")
.append(methodName("set"))
.append("(localId(slotNum), value);\n");
sb = indent(sb, indent, level);
@@ -209,7 +209,11 @@
}
void addField(String name, Type type, String initial) {
- fields.add(new Field(name, type, initial, totalSize));
+ addField(name, type, initial, false);
+ }
+
+ void addField(String name, Type type, String initial, boolean debugField) {
+ fields.add(new Field(name, type, initial, totalSize, debugField));
totalSize += size(type);
}
@@ -219,46 +223,56 @@
static int size(Type t) {
switch(t) {
- case BYTE: return 1;
- case SHORT: return 2;
- case INT: return 4;
- default: throw new IllegalArgumentException();
+ case BYTE: return 1;
+ case SHORT: return 2;
+ case INT: return 4;
+ case LOCAL: return 4;
+ case GLOBAL: return 8;
+ default: throw new IllegalArgumentException();
}
}
- static String name(Type t) {
+ static String javaType(Type t) {
switch(t) {
- case BYTE: return "byte";
- case SHORT: return "short";
- case INT: return "int";
- default: throw new IllegalArgumentException();
+ case BYTE: return "byte";
+ case SHORT: return "short";
+ case INT: return "int";
+ case LOCAL: return "int";
+ case GLOBAL: return "long";
+ default: throw new IllegalArgumentException();
}
}
static String bbGetter(Type t) {
switch(t) {
- case BYTE: return "get";
- case SHORT: return "getShort";
- case INT: return "getInt";
- default: throw new IllegalArgumentException();
+ case BYTE: return "get";
+ case SHORT: return "getShort";
+ case INT: return "getInt";
+ case LOCAL: return "getInt";
+ case GLOBAL: return "getLong";
+ default: throw new IllegalArgumentException();
}
}
static String bbSetter(Type t) {
switch(t) {
- case BYTE: return "put";
- case SHORT: return "putShort";
- case INT: return "putInt";
- default: throw new IllegalArgumentException();
+ case BYTE: return "put";
+ case SHORT: return "putShort";
+ case INT: return "putInt";
+ case LOCAL: return "putInt";
+ case GLOBAL: return "putLong";
+ default: throw new IllegalArgumentException();
}
}
static String deadMemInitializer(Type t) {
switch(t) {
- case BYTE: return "0xde";
- case SHORT: return "0xdead";
- case INT: return "0xdeadbeef";
- default: throw new IllegalArgumentException();
+ case BYTE: return "(byte)0xde";
+ case SHORT: return "(short)0xdead";
+ case INT: return "0xdeadbeef";
+ case LOCAL: return "0xdeadbeef";
+ case GLOBAL: return "0xdeadbeefdeadbeefl";
+ default: throw new IllegalArgumentException();
}
}
@@ -303,7 +317,7 @@
sb = indent(sb, indent, level);
sb.append("for (int i = 0; i < NO_SLOTS; ++i) {\n");
sb = indent(sb, indent, level + 1);
- sb.append(name(field.type))
+ sb.append(javaType(field.type))
.append(" value = bb.")
.append(bbGetter(field.type))
.append("(i * ITEM_SIZE + ")
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java
index 744b0f9..9a6492c 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java
@@ -17,8 +17,12 @@
import java.util.ArrayList;
+import edu.uci.ics.asterix.transaction.management.service.locking.@E@RecordManager.Buffer.Alloc;
+
public class @E@ArenaManager {
+ public static final boolean TRACK_ALLOC = true;
+
private final int noArenas;
private ArrayList<@E@RecordManager> arenas;
private volatile int nextArena;
@@ -36,22 +40,39 @@
};
}
- public static int arenaId(int i) {
- return (i >> 24) & 0xff;
+ public static int arenaId(long l) {
+ return (int)((l >>> 48) & 0xffff);
}
- public static int localId(int i) {
- return i & 0xffffff;
+ public static int allocId(long l) {
+ return (int)((l >>> 32) & 0xffff);
}
- public int allocate() {
+ public static int localId(long l) {
+ return (int) (l & 0xffffffffL);
+ }
+
+ public long allocate() {
final LocalManager localManager = local.get();
- int result = localManager.arenaId << 24;
- result |= localManager.mgr.allocate();
+ long result = localManager.arenaId;
+ result = result << 48;
+ final int localId = localManager.mgr.allocate();
+ result |= localId;
+ if (TRACK_ALLOC) {
+ final long allocId = (++localManager.mgr.allocCounter % 0x7fff);
+ result |= (allocId << 32);
+ setAllocId(result, (short) allocId);
+ assert allocId(result) == allocId;
+ }
+ assert arenaId(result) == localManager.arenaId;
+ assert localId(result) == localId;
return result;
}
- public void deallocate(int slotNum) {
+ public void deallocate(long slotNum) {
+ if (TRACK_ALLOC) {
+ checkSlot(slotNum);
+ }
final int arenaId = arenaId(slotNum);
get(arenaId).deallocate(localId(slotNum));
}
@@ -78,6 +99,28 @@
@METHODS@
+ private void checkSlot(long slotNum) {
+ final int refAllocId = allocId(slotNum);
+ final short curAllocId = getAllocId(slotNum);
+ if (refAllocId != curAllocId) {
+ System.err.println("checkSlot(" + slotNum + "): " + refAllocId);
+ String msg = "reference to slot " + slotNum
+ + " of arena " + arenaId(slotNum) + " refers to version "
+ + Integer.toHexString(refAllocId) + " current version is "
+ + Integer.toHexString(curAllocId);
+ Alloc a = getAlloc(slotNum);
+ if (a != null) {
+ msg += "\n" + a.toString();
+ }
+ throw new IllegalStateException(msg);
+ }
+ }
+
+ public Alloc getAlloc(long slotNum) {
+ final int arenaId = arenaId(slotNum);
+ return get(arenaId).getAlloc(localId(slotNum));
+ }
+
static class LocalManager {
int arenaId;
@E@RecordManager mgr;
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
index 801d556..7fa3ed7 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
@@ -20,12 +20,14 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import edu.uci.ics.asterix.transaction.management.service.locking.@E@RecordManager.Buffer.Alloc;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
public class @E@RecordManager {
public static final int SHRINK_TIMER_THRESHOLD = 120000; //2min
-
+ public static final boolean TRACK_ALLOC = false;
+
static final int NO_SLOTS = 10;
static final int NEXT_FREE_SLOT_OFFSET = 0;
@@ -36,14 +38,18 @@
private int occupiedSlots;
private long shrinkTimer;
private boolean isShrinkTimerOn;
+
+ int allocCounter;
public @E@RecordManager() {
buffers = new ArrayList<Buffer>();
buffers.add(new Buffer());
current = 0;
+
+ allocCounter = 0;
}
- public int allocate() {
+ synchronized int allocate() {
if (buffers.get(current).isFull()) {
int size = buffers.size();
boolean needNewBuffer = true;
@@ -66,7 +72,7 @@
return buffers.get(current).allocate() + current * NO_SLOTS;
}
- void deallocate(int slotNum) {
+ synchronized void deallocate(int slotNum) {
buffers.get(slotNum / NO_SLOTS).deallocate(slotNum % NO_SLOTS);
--occupiedSlots;
@@ -160,6 +166,15 @@
@METHODS@
+ public Alloc getAlloc(int slotNum) {
+ final Buffer buf = buffers.get(slotNum / NO_SLOTS);
+ if (buf.allocList == null) {
+ return null;
+ } else {
+ return buf.allocList.get(slotNum % NO_SLOTS);
+ }
+ }
+
StringBuffer append(StringBuffer sb) {
sb.append("+++ current: ")
.append(current)
@@ -180,13 +195,11 @@
}
static class Buffer {
- public static final boolean TRACK_ALLOC = true;
-
private ByteBuffer bb;
private int freeSlotNum;
private int occupiedSlots = -1; //-1 represents 'deinitialized' state.
- private ArrayList<Alloc> allocList;
+ ArrayList<Alloc> allocList;
Buffer() {
initialize();
@@ -269,6 +282,9 @@
}
private void checkSlot(int slotNum) {
+ if (true || ! TRACK_ALLOC) {
+ return;
+ }
final int itemOffset = (slotNum % NO_SLOTS) * ITEM_SIZE;
// @CHECK_SLOT@
}
@@ -290,7 +306,13 @@
PrintWriter pw = new PrintWriter(sw);
new Exception().printStackTrace(pw);
pw.close();
- return sw.toString();
+ String res = sw.toString();
+ // remove first 3 lines
+ int nlPos = 0;
+ for (int i = 0; i < 3; ++i) {
+ nlPos = res.indexOf('\n', nlPos) + 1;
+ }
+ return res.substring(nlPos);
}
public String toString() {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index 9679f86..359b58e 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -18,7 +18,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -51,7 +51,7 @@
private ResourceArenaManager resArenaMgr;
private RequestArenaManager reqArenaMgr;
private JobArenaManager jobArenaMgr;
- private ConcurrentHashMap<Integer, Integer> jobIdSlotMap;
+ private ConcurrentHashMap<Integer, Long> jobIdSlotMap;
enum LockAction {
GET,
@@ -101,16 +101,16 @@
int dsId = datasetId.getId();
- int jobSlot = getJobSlot(txnContext.getJobId().getId());
+ long jobSlot = getJobSlot(txnContext.getJobId().getId());
ResourceGroup group = table.get(datasetId, entityHashValue);
group.getLatch();
try {
// 1) Find the resource in the hash table
- int resSlot = getResourceSlot(group, dsId, entityHashValue);
+ long resSlot = getResourceSlot(group, dsId, entityHashValue);
// 2) create a request entry
- int reqSlot = getRequestSlot(resSlot, jobSlot, lockMode);
+ long reqSlot = getRequestSlot(resSlot, jobSlot, lockMode);
// 3) check lock compatibility
boolean locked = false;
@@ -160,16 +160,16 @@
* @param jobSlot the slot that contains the information about the job
* @return true if a cycle would be introduced, false otherwise
*/
- private boolean introducesDeadlock(int resSlot, int jobSlot) {
- int reqSlot = resArenaMgr.getLastHolder(resSlot);
+ private boolean introducesDeadlock(long resSlot, long jobSlot) {
+ long reqSlot = resArenaMgr.getLastHolder(resSlot);
while (reqSlot >= 0) {
- int holderJobSlot = reqArenaMgr.getJobId(reqSlot);
+ long holderJobSlot = reqArenaMgr.getJobSlot(reqSlot);
if (holderJobSlot == jobSlot) {
return true;
}
- int waiter = jobArenaMgr.getLastWaiter(holderJobSlot);
+ long waiter = jobArenaMgr.getLastWaiter(holderJobSlot);
while (waiter >= 0) {
- int watingOnResSlot = reqArenaMgr.getResourceId(waiter);
+ long watingOnResSlot = reqArenaMgr.getResourceId(waiter);
if (introducesDeadlock(watingOnResSlot, jobSlot)) {
return true;
}
@@ -203,7 +203,7 @@
}
int dsId = datasetId.getId();
- int jobSlot = getJobSlot(txnContext.getJobId().getId());
+ long jobSlot = getJobSlot(txnContext.getJobId().getId());
boolean locked = false;
@@ -212,9 +212,9 @@
try {
// 1) Find the resource in the hash table
- int resSlot = getResourceSlot(group, dsId, entityHashValue);
+ long resSlot = getResourceSlot(group, dsId, entityHashValue);
// 2) create a request entry
- int reqSlot = getRequestSlot(resSlot, jobSlot, lockMode);
+ long reqSlot = getRequestSlot(resSlot, jobSlot, lockMode);
// 3) check lock compatibility
int curLockMode = resArenaMgr.getMaxMode(resSlot);
@@ -272,23 +272,23 @@
try {
int dsId = datasetId.getId();
- int resource = findResourceInGroup(group, dsId, entityHashValue);
+ long resource = findResourceInGroup(group, dsId, entityHashValue);
if (resource < 0) {
throw new IllegalStateException("resource (" + dsId + ", " + entityHashValue + ") not found");
}
int jobId = txnContext.getJobId().getId();
- int jobSlot = getJobSlot(jobId);
+ long jobSlot = getJobSlot(jobId);
// since locking is properly nested, finding the last holder for a job is good enough
- int holder = removeLastHolder(resource, jobSlot);
+ long holder = removeLastHolder(resource, jobSlot);
// deallocate request
reqArenaMgr.deallocate(holder);
// deallocate resource or fix max lock mode
if (resourceNotUsed(resource)) {
- int prev = group.firstResourceIndex.get();
+ long prev = group.firstResourceIndex.get();
if (prev == resource) {
group.firstResourceIndex.set(resArenaMgr.getNext(resource));
} else {
@@ -324,14 +324,14 @@
log("releaseLocks", -1, -1, LockMode.NL, txnContext);
int jobId = txnContext.getJobId().getId();
- Integer jobSlot = jobIdSlotMap.get(jobId);
+ Long jobSlot = jobIdSlotMap.get(jobId);
if (jobSlot == null) {
// we don't know the job, so there are no locks for it - we're done
return;
}
- int holder = jobArenaMgr.getLastHolder(jobSlot);
+ long holder = jobArenaMgr.getLastHolder(jobSlot);
while (holder != -1) {
- int resource = reqArenaMgr.getResourceId(holder);
+ long resource = reqArenaMgr.getResourceId(holder);
int dsId = resArenaMgr.getDatasetId(resource);
int pkHashVal = resArenaMgr.getPkHashVal(resource);
unlock(new DatasetId(dsId), pkHashVal, txnContext);
@@ -340,12 +340,12 @@
jobArenaMgr.deallocate(jobSlot);
}
- private int getJobSlot(int jobId) {
- Integer jobSlot = jobIdSlotMap.get(jobId);
+ private long getJobSlot(int jobId) {
+ Long jobSlot = jobIdSlotMap.get(jobId);
if (jobSlot == null) {
- jobSlot = new Integer(jobArenaMgr.allocate());
+ jobSlot = new Long(jobArenaMgr.allocate());
jobArenaMgr.setJobId(jobSlot, jobId);
- Integer oldSlot = jobIdSlotMap.putIfAbsent(jobId, jobSlot);
+ Long oldSlot = jobIdSlotMap.putIfAbsent(jobId, jobSlot);
if (oldSlot != null) {
// if another thread allocated a slot for this jobId between
// get(..) and putIfAbsent(..), we'll use that slot and
@@ -358,8 +358,8 @@
return jobSlot;
}
- private int getResourceSlot(ResourceGroup group, int dsId, int entityHashValue) {
- int resSlot = findResourceInGroup(group, dsId, entityHashValue);
+ private long getResourceSlot(ResourceGroup group, int dsId, int entityHashValue) {
+ long resSlot = findResourceInGroup(group, dsId, entityHashValue);
if (resSlot == -1) {
// we don't know about this resource, let's alloc a slot
@@ -372,18 +372,18 @@
return resSlot;
}
- private int getRequestSlot(int resSlot, int jobSlot, byte lockMode) {
- int reqSlot = reqArenaMgr.allocate();
+ private long getRequestSlot(long resSlot, long jobSlot, byte lockMode) {
+ long reqSlot = reqArenaMgr.allocate();
reqArenaMgr.setResourceId(reqSlot, resSlot);
reqArenaMgr.setLockMode(reqSlot, lockMode); // lock mode is a byte!!
- reqArenaMgr.setJobId(reqSlot, jobSlot);
+ reqArenaMgr.setJobSlot(reqSlot, jobSlot);
return reqSlot;
}
- private int findLastHolderForJob(int resource, int job) {
- int holder = resArenaMgr.getLastHolder(resource);
+ private long findLastHolderForJob(long resource, long job) {
+ long holder = resArenaMgr.getLastHolder(resource);
while (holder != -1) {
- if (job == reqArenaMgr.getJobId(holder)) {
+ if (job == reqArenaMgr.getJobSlot(holder)) {
return holder;
}
holder = reqArenaMgr.getNextRequest(holder);
@@ -402,13 +402,13 @@
* @param lockMode the lock mode that the resource should be locked with
* @return
*/
- private LockAction updateActionForSameJob(int resource, int job, byte lockMode) {
+ private LockAction updateActionForSameJob(long resource, long job, byte lockMode) {
// TODO we can reduce the numer of things we have to look at by carefully
// distinguishing the different lock modes
- int holder = resArenaMgr.getLastHolder(resource);
+ long holder = resArenaMgr.getLastHolder(resource);
LockAction res = LockAction.WAIT;
while (holder != -1) {
- if (job == reqArenaMgr.getJobId(holder)) {
+ if (job == reqArenaMgr.getJobSlot(holder)) {
if (reqArenaMgr.getLockMode(holder) == lockMode) {
return LockAction.GET;
} else {
@@ -420,8 +420,8 @@
return res;
}
- private int findResourceInGroup(ResourceGroup group, int dsId, int entityHashValue) {
- int resSlot = group.firstResourceIndex.get();
+ private long findResourceInGroup(ResourceGroup group, int dsId, int entityHashValue) {
+ long resSlot = group.firstResourceIndex.get();
while (resSlot != -1) {
// either we already have a lock on this resource or we have a
// hash collision
@@ -435,40 +435,44 @@
return -1;
}
- private void addHolder(int request, int resource, int job) {
- int lastHolder = resArenaMgr.getLastHolder(resource);
+ private void addHolder(long request, long resource, long job) {
+ long lastHolder = resArenaMgr.getLastHolder(resource);
reqArenaMgr.setNextRequest(request, lastHolder);
resArenaMgr.setLastHolder(resource, request);
- lastHolder = jobArenaMgr.getLastHolder(job);
- insertIntoJobQueue(request, lastHolder);
- jobArenaMgr.setLastHolder(job, request);
+ synchronized (jobArenaMgr) {
+ lastHolder = jobArenaMgr.getLastHolder(job);
+ insertIntoJobQueue(request, lastHolder);
+ jobArenaMgr.setLastHolder(job, request);
+ }
}
- private int removeLastHolder(int resource, int jobSlot) {
- int holder = resArenaMgr.getLastHolder(resource);
+ private long removeLastHolder(long resource, long jobSlot) {
+ long holder = resArenaMgr.getLastHolder(resource);
if (holder < 0) {
throw new IllegalStateException("no holder for resource " + resource);
}
// remove from the list of holders for a resource
- if (reqArenaMgr.getJobId(holder) == jobSlot) {
+ if (reqArenaMgr.getJobSlot(holder) == jobSlot) {
// if the head of the queue matches, we need to update the resource
- int next = reqArenaMgr.getNextRequest(holder);
+ long next = reqArenaMgr.getNextRequest(holder);
resArenaMgr.setLastHolder(resource, next);
} else {
holder = removeRequestFromQueueForJob(holder, jobSlot);
}
- // remove from the list of requests for a job
- int newHead = removeRequestFromJob(jobSlot, holder);
- jobArenaMgr.setLastHolder(jobSlot, newHead);
+ synchronized (jobArenaMgr) {
+ // remove from the list of requests for a job
+ long newHead = removeRequestFromJob(jobSlot, holder);
+ jobArenaMgr.setLastHolder(jobSlot, newHead);
+ }
return holder;
}
- private int removeRequestFromJob(int jobSlot, int holder) {
- int prevForJob = reqArenaMgr.getPrevJobRequest(holder);
- int nextForJob = reqArenaMgr.getNextJobRequest(holder);
+ private long removeRequestFromJob(long jobSlot, long holder) {
+ long prevForJob = reqArenaMgr.getPrevJobRequest(holder);
+ long nextForJob = reqArenaMgr.getNextJobRequest(holder);
if (nextForJob != -1) {
reqArenaMgr.setPrevJobRequest(nextForJob, prevForJob);
}
@@ -480,8 +484,8 @@
}
}
- private void addWaiter(int request, int resource, int job) {
- int waiter = resArenaMgr.getFirstWaiter(resource);
+ private void addWaiter(long request, long resource, long job) {
+ long waiter = resArenaMgr.getFirstWaiter(resource);
reqArenaMgr.setNextRequest(request, -1);
if (waiter == -1) {
resArenaMgr.setFirstWaiter(resource, request);
@@ -489,52 +493,59 @@
appendToRequestQueue(waiter, request);
}
- waiter = jobArenaMgr.getLastWaiter(job);
- insertIntoJobQueue(request, waiter);
- jobArenaMgr.setLastWaiter(job, request);
+ synchronized (jobArenaMgr) {
+ waiter = jobArenaMgr.getLastWaiter(job);
+ insertIntoJobQueue(request, waiter);
+ jobArenaMgr.setLastWaiter(job, request);
+ }
}
- private void removeWaiter(int request, int resource, int job) {
- int waiter = resArenaMgr.getFirstWaiter(resource);
+ private void removeWaiter(long request, long resource, long job) {
+ long waiter = resArenaMgr.getFirstWaiter(resource);
if (waiter == request) {
- int next = reqArenaMgr.getNextRequest(waiter);
+ long next = reqArenaMgr.getNextRequest(waiter);
resArenaMgr.setFirstWaiter(resource, next);
} else {
waiter = removeRequestFromQueueForSlot(waiter, request);
}
- // remove from the list of requests for a job
- int newHead = removeRequestFromJob(job, waiter);
- jobArenaMgr.setLastWaiter(job, newHead);
+ synchronized (jobArenaMgr) {
+ // remove from the list of requests for a job
+ long newHead = removeRequestFromJob(job, waiter);
+ jobArenaMgr.setLastWaiter(job, newHead);
+ }
}
- private void addUpgrader(int request, int resource, int job) {
- int upgrader = resArenaMgr.getFirstUpgrader(resource);
+ private void addUpgrader(long request, long resource, long job) {
+ long upgrader = resArenaMgr.getFirstUpgrader(resource);
reqArenaMgr.setNextRequest(request, -1);
if (upgrader == -1) {
resArenaMgr.setFirstUpgrader(resource, request);
} else {
appendToRequestQueue(upgrader, request);
}
-
- upgrader = jobArenaMgr.getLastUpgrader(job);
- insertIntoJobQueue(request, upgrader);
- jobArenaMgr.setLastUpgrader(job, request);
+ synchronized (jobArenaMgr) {
+ upgrader = jobArenaMgr.getLastUpgrader(job);
+ insertIntoJobQueue(request, upgrader);
+ jobArenaMgr.setLastUpgrader(job, request);
+ }
}
- private void removeUpgrader(int request, int resource, int job) {
- int upgrader = resArenaMgr.getFirstUpgrader(resource);
+ private void removeUpgrader(long request, long resource, long job) {
+ long upgrader = resArenaMgr.getFirstUpgrader(resource);
if (upgrader == request) {
- int next = reqArenaMgr.getNextRequest(upgrader);
+ long next = reqArenaMgr.getNextRequest(upgrader);
resArenaMgr.setFirstUpgrader(resource, next);
} else {
upgrader = removeRequestFromQueueForSlot(upgrader, request);
}
- // remove from the list of requests for a job
- int newHead = removeRequestFromJob(job, upgrader);
- jobArenaMgr.setLastUpgrader(job, newHead);
+ synchronized (jobArenaMgr) {
+ // remove from the list of requests for a job
+ long newHead = removeRequestFromJob(job, upgrader);
+ jobArenaMgr.setLastUpgrader(job, newHead);
+ }
}
- private void insertIntoJobQueue(int newRequest, int oldRequest) {
+ private void insertIntoJobQueue(long newRequest, long oldRequest) {
reqArenaMgr.setNextJobRequest(newRequest, oldRequest);
reqArenaMgr.setPrevJobRequest(newRequest, -1);
if (oldRequest >= 0) {
@@ -542,8 +553,8 @@
}
}
- private void appendToRequestQueue(int head, int appendee) {
- int next = reqArenaMgr.getNextRequest(head);
+ private void appendToRequestQueue(long head, long appendee) {
+ long next = reqArenaMgr.getNextRequest(head);
while(next != -1) {
head = next;
next = reqArenaMgr.getNextRequest(head);
@@ -557,9 +568,9 @@
* @param reqSlot
* @return
*/
- private int removeRequestFromQueueForSlot(int head, int reqSlot) {
- int cur = head;
- int prev = cur;
+ private long removeRequestFromQueueForSlot(long head, long reqSlot) {
+ long cur = head;
+ long prev = cur;
while (prev != -1) {
cur = reqArenaMgr.getNextRequest(prev);
if (cur == -1) {
@@ -570,7 +581,7 @@
}
prev = cur;
}
- int next = reqArenaMgr.getNextRequest(cur);
+ long next = reqArenaMgr.getNextRequest(cur);
reqArenaMgr.setNextRequest(prev, next);
return cur;
}
@@ -581,27 +592,27 @@
* @param jobSlot the job slot
* @return the slot of the first request that matched the given job
*/
- private int removeRequestFromQueueForJob(int head, int jobSlot) {
- int holder = head;
- int prev = holder;
+ private long removeRequestFromQueueForJob(long head, long jobSlot) {
+ long holder = head;
+ long prev = holder;
while (prev != -1) {
holder = reqArenaMgr.getNextRequest(prev);
if (holder == -1) {
throw new IllegalStateException("no entry for job " + jobSlot + " in queue");
}
- if (jobSlot == reqArenaMgr.getJobId(holder)) {
+ if (jobSlot == reqArenaMgr.getJobSlot(holder)) {
break;
}
prev = holder;
}
- int next = reqArenaMgr.getNextRequest(holder);
+ long next = reqArenaMgr.getNextRequest(holder);
reqArenaMgr.setNextRequest(prev, next);
return holder;
}
- private int determineNewMaxMode(int resource, int oldMaxMode) {
+ private int determineNewMaxMode(long resource, int oldMaxMode) {
int newMaxMode = LockMode.NL;
- int holder = resArenaMgr.getLastHolder(resource);
+ long holder = resArenaMgr.getLastHolder(resource);
while (holder != -1) {
int curLockMode = reqArenaMgr.getLockMode(holder);
if (curLockMode == oldMaxMode) {
@@ -622,7 +633,7 @@
return newMaxMode;
}
- private boolean resourceNotUsed(int resource) {
+ private boolean resourceNotUsed(long resource) {
return resArenaMgr.getLastHolder(resource) == -1
&& resArenaMgr.getFirstUpgrader(resource) == -1
&& resArenaMgr.getFirstWaiter(resource) == -1;
@@ -964,12 +975,12 @@
private static class ResourceGroup {
private ReentrantReadWriteLock latch;
private Condition condition;
- AtomicInteger firstResourceIndex;
+ AtomicLong firstResourceIndex;
ResourceGroup() {
latch = new ReentrantReadWriteLock();
condition = latch.writeLock().newCondition();
- firstResourceIndex = new AtomicInteger(-1);
+ firstResourceIndex = new AtomicLong(-1);
}
void getLatch() {