less buggy work in progress
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ArenaManager.java.txt b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ArenaManager.java.txt
index 332167a..77bb3d5 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ArenaManager.java.txt
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ArenaManager.java.txt
@@ -28,11 +28,10 @@
}
public synchronized LocalManager getNext() {
- @E@MemoryManager mgr = arenas.get(nextArena);
- if (mgr == null) {
- mgr = new @E@MemoryManager();
- arenas.set(nextArena, mgr);
+ if (nextArena >= arenas.size()) {
+ arenas.add(new @E@MemoryManager());
}
+ @E@MemoryManager mgr = arenas.get(nextArena);
LocalManager res = new LocalManager();
res.mgr = mgr;
res.arenaId = nextArena;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index 7abb71b..6e9ee9b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -123,18 +123,23 @@
reqArenaMgr.setLockMode(reqSlot, lockMode); // lock mode is a byte!!
reqArenaMgr.setJobId(reqSlot, jobId);
+ int prevHead = -1;
Integer headOfJobReqQueue = jobReqMap.putIfAbsent(jobId, reqSlot);
- if (headOfJobReqQueue != null) {
+ while (headOfJobReqQueue != null) {
// TODO make sure this works (even if the job gets removed from the table)
- while (!jobReqMap.replace(jobId, headOfJobReqQueue, reqSlot)) {
- headOfJobReqQueue = jobReqMap.putIfAbsent(jobId, reqSlot);
+ if (jobReqMap.replace(jobId, headOfJobReqQueue, reqSlot)) {
+ prevHead = headOfJobReqQueue;
+ break;
}
+ headOfJobReqQueue = jobReqMap.putIfAbsent(jobId, reqSlot);
}
+
// this goes across arenas
-
- reqArenaMgr.setNextJobRequest(reqSlot, headOfJobReqQueue);
+ reqArenaMgr.setNextJobRequest(reqSlot, prevHead);
reqArenaMgr.setPrevJobRequest(reqSlot, -1);
- reqArenaMgr.setNextJobRequest(headOfJobReqQueue, reqSlot);
+ if (prevHead >= 0) {
+ reqArenaMgr.setPrevJobRequest(prevHead, reqSlot);
+ }
// 3) check lock compatibility
@@ -715,6 +720,7 @@
private ResourceGroup[] table;
public ResourceGroupTable() {
+ table = new ResourceGroup[TABLE_SIZE];
for (int i = 0; i < TABLE_SIZE; ++i) {
table[i] = new ResourceGroup();
}
@@ -722,7 +728,7 @@
ResourceGroup get(DatasetId dId, int entityHashValue) {
// TODO ensure good properties of hash function
- int h = dId.getId() ^ entityHashValue;
+ int h = Math.abs(dId.getId() ^ entityHashValue);
return table[h % TABLE_SIZE];
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Generator.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Generator.java
index c530883..83bac42 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Generator.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Generator.java
@@ -68,7 +68,7 @@
} else if (line.contains("@INIT_SLOT@")) {
for (int i = 0; i < resource.size(); ++i) {
final Field field = resource.fields.get(i);
- field.appendInitializers(sb, indent, 1);
+ field.appendInitializers(sb, indent, 3);
}
} else {
sb.append(line).append('\n');
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
index e61cb55..09d7000 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
@@ -62,7 +62,7 @@
ArrayList<LockRequest> requestList;
ArrayList<ArrayList<Integer>> expectedResultList;
int resultListIndex;
- LockManager lockMgr;
+ ILockManager lockMgr;
String requestFileName;
long defaultWaitTime;
@@ -72,7 +72,7 @@
this.workerReadyQueue = new WorkerReadyQueue();
this.requestList = new ArrayList<LockRequest>();
this.expectedResultList = new ArrayList<ArrayList<Integer>>();
- this.lockMgr = (LockManager) txnProvider.getLockManager();
+ this.lockMgr = txnProvider.getLockManager();
this.requestFileName = new String(requestFileName);
this.resultListIndex = 0;
this.defaultWaitTime = 10;
@@ -618,7 +618,7 @@
} catch (InterruptedException e) {
e.printStackTrace();
}
- log(Thread.currentThread().getName() + "Waiting for worker to finish its task...");
+ log(Thread.currentThread().getName() + " Waiting for worker to finish its task...");
queueSize = workerReadyQueue.size();
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RecordType.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RecordType.java
index dc0f252..1d0b41e 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RecordType.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RecordType.java
@@ -66,14 +66,13 @@
sb = indent(sb, indent, level + 1);
sb.append("final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;\n");
sb = indent(sb, indent, level + 1);
- sb.append(" b.")
+ sb.append("b.")
.append(bbSetter(type))
.append("((slotNum % NO_SLOTS) * ITEM_SIZE + ")
.append(offsetName())
.append(", value);\n");
sb = indent(sb, indent, level);
- sb.append(indent)
- .append("}\n");
+ sb.append("}\n");
return sb;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RequestArenaManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RequestArenaManager.java
index aea1f47..23260ba 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RequestArenaManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RequestArenaManager.java
@@ -28,11 +28,10 @@
}
public synchronized LocalManager getNext() {
- RequestMemoryManager mgr = arenas.get(nextArena);
- if (mgr == null) {
- mgr = new RequestMemoryManager();
- arenas.set(nextArena, mgr);
+ if (nextArena >= arenas.size()) {
+ arenas.add(new RequestMemoryManager());
}
+ RequestMemoryManager mgr = arenas.get(nextArena);
LocalManager res = new LocalManager();
res.mgr = mgr;
res.arenaId = nextArena;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RequestMemoryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RequestMemoryManager.java
index df139b5..36ca35a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RequestMemoryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RequestMemoryManager.java
@@ -27,14 +27,18 @@
private long shrinkTimer;
private boolean isShrinkTimerOn;
+ public RequestMemoryManager() {
+ buffers = new ArrayList<Buffer>();
+ buffers.add(new Buffer());
+ current = 0;
+ }
+
public int allocate() {
if (buffers.get(current).isFull()) {
int size = buffers.size();
boolean needNewBuffer = true;
- Buffer buffer;
-
for (int i = 0; i < size; i++) {
- buffer = buffers.get(i);
+ Buffer buffer = buffers.get(i);
if (! buffer.isInitialized()) {
buffer.initialize();
current = i;
@@ -151,8 +155,8 @@
public void setResourceId(int slotNum, int value) {
final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
- b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + RESOURCE_ID_OFF, value);
- }
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + RESOURCE_ID_OFF, value);
+ }
public int getLockMode(int slotNum) {
final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
@@ -161,8 +165,8 @@
public void setLockMode(int slotNum, int value) {
final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
- b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + LOCK_MODE_OFF, value);
- }
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + LOCK_MODE_OFF, value);
+ }
public int getJobId(int slotNum) {
final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
@@ -171,8 +175,8 @@
public void setJobId(int slotNum, int value) {
final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
- b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + JOB_ID_OFF, value);
- }
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + JOB_ID_OFF, value);
+ }
public int getPrevJobRequest(int slotNum) {
final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
@@ -181,8 +185,8 @@
public void setPrevJobRequest(int slotNum, int value) {
final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
- b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + PREV_JOB_REQUEST_OFF, value);
- }
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + PREV_JOB_REQUEST_OFF, value);
+ }
public int getNextJobRequest(int slotNum) {
final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
@@ -191,8 +195,8 @@
public void setNextJobRequest(int slotNum, int value) {
final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
- b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + NEXT_JOB_REQUEST_OFF, value);
- }
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + NEXT_JOB_REQUEST_OFF, value);
+ }
public int getNextRequest(int slotNum) {
final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
@@ -201,15 +205,19 @@
public void setNextRequest(int slotNum, int value) {
final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
- b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + NEXT_REQUEST_OFF, value);
- }
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + NEXT_REQUEST_OFF, value);
+ }
static class Buffer {
private ByteBuffer bb;
private int freeSlotNum;
- private int occupiedSlots; //-1 represents 'deinitialized' state.
+ private int occupiedSlots = -1; //-1 represents 'deinitialized' state.
+
+ Buffer() {
+ initialize();
+ }
void initialize() {
bb = ByteBuffer.allocate(NO_SLOTS * ITEM_SIZE);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ResourceArenaManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ResourceArenaManager.java
index b7bceca..2212b50 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ResourceArenaManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ResourceArenaManager.java
@@ -28,11 +28,10 @@
}
public synchronized LocalManager getNext() {
- ResourceMemoryManager mgr = arenas.get(nextArena);
- if (mgr == null) {
- mgr = new ResourceMemoryManager();
- arenas.set(nextArena, mgr);
+ if (nextArena >= arenas.size()) {
+ arenas.add(new ResourceMemoryManager());
}
+ ResourceMemoryManager mgr = arenas.get(nextArena);
LocalManager res = new LocalManager();
res.mgr = mgr;
res.arenaId = nextArena;
@@ -40,7 +39,7 @@
return res;
}
- public synchronized ResourceMemoryManager get(int i) {
+ public ResourceMemoryManager get(int i) {
return arenas.get(i);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ResourceMemoryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ResourceMemoryManager.java
index b49c7f8..ffe973f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ResourceMemoryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ResourceMemoryManager.java
@@ -28,14 +28,18 @@
private long shrinkTimer;
private boolean isShrinkTimerOn;
+ public ResourceMemoryManager() {
+ buffers = new ArrayList<Buffer>();
+ buffers.add(new Buffer());
+ current = 0;
+ }
+
public int allocate() {
if (buffers.get(current).isFull()) {
int size = buffers.size();
boolean needNewBuffer = true;
- Buffer buffer;
-
for (int i = 0; i < size; i++) {
- buffer = buffers.get(i);
+ Buffer buffer = buffers.get(i);
if (! buffer.isInitialized()) {
buffer.initialize();
current = i;
@@ -146,73 +150,73 @@
}
public int getLastHolder(int slotNum) {
- final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
- return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + LAST_HOLDER_OFF);
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + LAST_HOLDER_OFF);
}
public void setLastHolder(int slotNum, int value) {
- final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
- b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + LAST_HOLDER_OFF, value);
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + LAST_HOLDER_OFF, value);
}
public int getFirstWaiter(int slotNum) {
- final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
- return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + FIRST_WAITER_OFF);
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + FIRST_WAITER_OFF);
}
public void setFirstWaiter(int slotNum, int value) {
- final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
- b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + FIRST_WAITER_OFF, value);
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + FIRST_WAITER_OFF, value);
}
public int getFirstUpgrader(int slotNum) {
- final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
- return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + FIRST_UPGRADER_OFF);
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + FIRST_UPGRADER_OFF);
}
public void setFirstUpgrader(int slotNum, int value) {
- final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
- b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + FIRST_UPGRADER_OFF, value);
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + FIRST_UPGRADER_OFF, value);
}
public int getMaxMode(int slotNum) {
- final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
- return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + MAX_MODE_OFF);
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + MAX_MODE_OFF);
}
public void setMaxMode(int slotNum, int value) {
- final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
- b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + MAX_MODE_OFF, value);
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + MAX_MODE_OFF, value);
}
public int getDatasetId(int slotNum) {
- final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
- return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + DATASET_ID_OFF);
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + DATASET_ID_OFF);
}
public void setDatasetId(int slotNum, int value) {
- final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
- b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + DATASET_ID_OFF, value);
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + DATASET_ID_OFF, value);
}
public int getPkHashVal(int slotNum) {
- final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
- return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + PK_HASH_VAL_OFF);
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + PK_HASH_VAL_OFF);
}
public void setPkHashVal(int slotNum, int value) {
- final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
- b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + PK_HASH_VAL_OFF, value);
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + PK_HASH_VAL_OFF, value);
}
public int getNext(int slotNum) {
- final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
- return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + NEXT_OFF);
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + NEXT_OFF);
}
public void setNext(int slotNum, int value) {
- final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
- b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + NEXT_OFF, value);
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + NEXT_OFF, value);
}
@@ -220,7 +224,11 @@
static class Buffer {
private ByteBuffer bb;
private int freeSlotNum;
- private int occupiedSlots; //-1 represents 'deinitialized' state.
+ private int occupiedSlots = -1; //-1 represents 'deinitialized' state.
+
+ Buffer() {
+ initialize();
+ }
void initialize() {
bb = ByteBuffer.allocate(NO_SLOTS * ITEM_SIZE);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/StructuredMemoryManager.java.txt b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/StructuredMemoryManager.java.txt
index d28919d..a0e4516 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/StructuredMemoryManager.java.txt
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/StructuredMemoryManager.java.txt
@@ -20,14 +20,18 @@
private long shrinkTimer;
private boolean isShrinkTimerOn;
+ public @E@MemoryManager() {
+ buffers = new ArrayList<Buffer>();
+ buffers.add(new Buffer());
+ current = 0;
+ }
+
public int allocate() {
if (buffers.get(current).isFull()) {
int size = buffers.size();
boolean needNewBuffer = true;
- Buffer buffer;
-
for (int i = 0; i < size; i++) {
- buffer = buffers.get(i);
+ Buffer buffer = buffers.get(i);
if (! buffer.isInitialized()) {
buffer.initialize();
current = i;
@@ -143,7 +147,11 @@
static class Buffer {
private ByteBuffer bb;
private int freeSlotNum;
- private int occupiedSlots; //-1 represents 'deinitialized' state.
+ private int occupiedSlots = -1; //-1 represents 'deinitialized' state.
+
+ Buffer() {
+ initialize();
+ }
void initialize() {
bb = ByteBuffer.allocate(NO_SLOTS * ITEM_SIZE);