Merge branch 'peili/crash_recovery_tests'
diff --git a/asterix-app/src/test/resources/metadata/queries/basic/metadata_compaction_policy/metadata_compaction_policy.1.ddl.aql b/asterix-app/src/test/resources/metadata/queries/basic/metadata_compaction_policy/metadata_compaction_policy.1.ddl.aql
new file mode 100644
index 0000000..a017e75
--- /dev/null
+++ b/asterix-app/src/test/resources/metadata/queries/basic/metadata_compaction_policy/metadata_compaction_policy.1.ddl.aql
@@ -0,0 +1,6 @@
+/*
+ * Description  : query the Metadata dataset CompactionPolicy
+ * Expected Res : Success
+ * Date         : 13 Nov. 2013
+ * Issue        : 646
+ */
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/metadata/queries/basic/metadata_compaction_policy/metadata_compaction_policy.2.update.aql b/asterix-app/src/test/resources/metadata/queries/basic/metadata_compaction_policy/metadata_compaction_policy.2.update.aql
new file mode 100644
index 0000000..a017e75
--- /dev/null
+++ b/asterix-app/src/test/resources/metadata/queries/basic/metadata_compaction_policy/metadata_compaction_policy.2.update.aql
@@ -0,0 +1,6 @@
+/*
+ * Description  : query the Metadata dataset CompactionPolicy
+ * Expected Res : Success
+ * Date         : 13 Nov. 2013
+ * Issue        : 646
+ */
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/metadata/queries/basic/metadata_compaction_policy/metadata_compaction_policy.3.query.aql b/asterix-app/src/test/resources/metadata/queries/basic/metadata_compaction_policy/metadata_compaction_policy.3.query.aql
new file mode 100644
index 0000000..a9c9b19
--- /dev/null
+++ b/asterix-app/src/test/resources/metadata/queries/basic/metadata_compaction_policy/metadata_compaction_policy.3.query.aql
@@ -0,0 +1,9 @@
+/*
+ * Description  : query the Metadata dataset CompactionPolicy
+ * Expected Res : Success
+ * Date         : 13 Nov. 2013
+ * Issue        : 646
+ */
+ 
+for $x in dataset Metadata.CompactionPolicy
+return $x
diff --git a/asterix-app/src/test/resources/metadata/results/basic/meta17/meta17.1.adm b/asterix-app/src/test/resources/metadata/results/basic/meta17/meta17.1.adm
index 1995946..20518dd 100644
--- a/asterix-app/src/test/resources/metadata/results/basic/meta17/meta17.1.adm
+++ b/asterix-app/src/test/resources/metadata/results/basic/meta17/meta17.1.adm
@@ -1,4 +1,4 @@
-{ "DataverseName": "Metadata", "DatatypeName": "CompactionPolicyRecordType", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "EnumValues": null, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "DataverseName", "FieldType": "string" }, { "FieldName": "PolicyName", "FieldType": "string" }, { "FieldName": "Classname", "FieldType": "string" } ] }, "Union": null, "UnorderedList": null, "OrderedList": null }, "Timestamp": "Mon Sep 23 00:04:06 PDT 2013" }
+{ "DataverseName": "Metadata", "DatatypeName": "CompactionPolicyRecordType", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "EnumValues": null, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "DataverseName", "FieldType": "string" }, { "FieldName": "CompactionPolicy", "FieldType": "string" }, { "FieldName": "Classname", "FieldType": "string" } ] }, "Union": null, "UnorderedList": null, "OrderedList": null }, "Timestamp": "Mon Sep 23 00:04:06 PDT 2013" }
 { "DataverseName": "Metadata", "DatatypeName": "DatasetRecordType", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "EnumValues": null, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "DataverseName", "FieldType": "string" }, { "FieldName": "DatasetName", "FieldType": "string" }, { "FieldName": "DataTypeName", "FieldType": "string" }, { "FieldName": "DatasetType", "FieldType": "string" }, { "FieldName": "InternalDetails", "FieldType": "Field_InternalDetails_in_DatasetRecordType" }, { "FieldName": "ExternalDetails", "FieldType": "Field_ExternalDetails_in_DatasetRecordType" }, { "FieldName": "FeedDetails", "FieldType": "Field_FeedDetails_in_DatasetRecordType" }, { "FieldName": "Hints", "FieldType": "Field_Hints_in_DatasetRecordType" }, { "FieldName": "Timestamp", "FieldType": "string" }, { "FieldName": "DatasetId", "FieldType": "int32" }, { "FieldName": "PendingOp", "FieldType": "int32" } ] }, "Union": null, "UnorderedList": null, "OrderedList": null }, "Timestamp": "Mon Sep 23 00:04:06 PDT 2013" }
 { "DataverseName": "Metadata", "DatatypeName": "DatasourceAdapterRecordType", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "EnumValues": null, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "DataverseName", "FieldType": "string" }, { "FieldName": "Name", "FieldType": "string" }, { "FieldName": "Classname", "FieldType": "string" }, { "FieldName": "Type", "FieldType": "string" }, { "FieldName": "Timestamp", "FieldType": "string" } ] }, "Union": null, "UnorderedList": null, "OrderedList": null }, "Timestamp": "Mon Sep 23 00:04:06 PDT 2013" }
 { "DataverseName": "Metadata", "DatatypeName": "DatatypeRecordType", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "EnumValues": null, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "DataverseName", "FieldType": "string" }, { "FieldName": "DatatypeName", "FieldType": "string" }, { "FieldName": "Derived", "FieldType": "Field_Derived_in_DatatypeRecordType" }, { "FieldName": "Timestamp", "FieldType": "string" } ] }, "Union": null, "UnorderedList": null, "OrderedList": null }, "Timestamp": "Mon Sep 23 00:04:06 PDT 2013" }
diff --git a/asterix-app/src/test/resources/metadata/results/basic/metadata_compaction_policy/metadata_compaction_policy.1.adm b/asterix-app/src/test/resources/metadata/results/basic/metadata_compaction_policy/metadata_compaction_policy.1.adm
new file mode 100644
index 0000000..633e93a
--- /dev/null
+++ b/asterix-app/src/test/resources/metadata/results/basic/metadata_compaction_policy/metadata_compaction_policy.1.adm
@@ -0,0 +1,2 @@
+{ "DataverseName": "Metadata", "CompactionPolicy": "constant", "Classname": "edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory" }
+{ "DataverseName": "Metadata", "CompactionPolicy": "prefix", "Classname": "edu.uci.ics.hyracks.storage.am.lsm.common.impls.PrefixMergePolicyFactory" }
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/metadata/results/basic/metadata_datatype/metadata_datatype.1.adm b/asterix-app/src/test/resources/metadata/results/basic/metadata_datatype/metadata_datatype.1.adm
index 4c22058..877d4da 100644
--- a/asterix-app/src/test/resources/metadata/results/basic/metadata_datatype/metadata_datatype.1.adm
+++ b/asterix-app/src/test/resources/metadata/results/basic/metadata_datatype/metadata_datatype.1.adm
@@ -1,4 +1,4 @@
-{ "DataverseName": "Metadata", "DatatypeName": "CompactionPolicyRecordType", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "EnumValues": null, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "DataverseName", "FieldType": "string" }, { "FieldName": "PolicyName", "FieldType": "string" }, { "FieldName": "Classname", "FieldType": "string" } ] }, "Union": null, "UnorderedList": null, "OrderedList": null }, "Timestamp": "Mon Sep 23 00:25:26 PDT 2013" }
+{ "DataverseName": "Metadata", "DatatypeName": "CompactionPolicyRecordType", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "EnumValues": null, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "DataverseName", "FieldType": "string" }, { "FieldName": "CompactionPolicy", "FieldType": "string" }, { "FieldName": "Classname", "FieldType": "string" } ] }, "Union": null, "UnorderedList": null, "OrderedList": null }, "Timestamp": "Mon Sep 23 00:25:26 PDT 2013" }
 { "DataverseName": "Metadata", "DatatypeName": "DatasetRecordType", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "EnumValues": null, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "DataverseName", "FieldType": "string" }, { "FieldName": "DatasetName", "FieldType": "string" }, { "FieldName": "DataTypeName", "FieldType": "string" }, { "FieldName": "DatasetType", "FieldType": "string" }, { "FieldName": "InternalDetails", "FieldType": "Field_InternalDetails_in_DatasetRecordType" }, { "FieldName": "ExternalDetails", "FieldType": "Field_ExternalDetails_in_DatasetRecordType" }, { "FieldName": "FeedDetails", "FieldType": "Field_FeedDetails_in_DatasetRecordType" }, { "FieldName": "Hints", "FieldType": "Field_Hints_in_DatasetRecordType" }, { "FieldName": "Timestamp", "FieldType": "string" }, { "FieldName": "DatasetId", "FieldType": "int32" }, { "FieldName": "PendingOp", "FieldType": "int32" } ] }, "Union": null, "UnorderedList": null, "OrderedList": null }, "Timestamp": "Mon Sep 23 00:25:26 PDT 2013" }
 { "DataverseName": "Metadata", "DatatypeName": "DatasourceAdapterRecordType", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "EnumValues": null, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "DataverseName", "FieldType": "string" }, { "FieldName": "Name", "FieldType": "string" }, { "FieldName": "Classname", "FieldType": "string" }, { "FieldName": "Type", "FieldType": "string" }, { "FieldName": "Timestamp", "FieldType": "string" } ] }, "Union": null, "UnorderedList": null, "OrderedList": null }, "Timestamp": "Mon Sep 23 00:25:26 PDT 2013" }
 { "DataverseName": "Metadata", "DatatypeName": "DatatypeRecordType", "Derived": { "Tag": "RECORD", "IsAnonymous": false, "EnumValues": null, "Record": { "IsOpen": true, "Fields": [ { "FieldName": "DataverseName", "FieldType": "string" }, { "FieldName": "DatatypeName", "FieldType": "string" }, { "FieldName": "Derived", "FieldType": "Field_Derived_in_DatatypeRecordType" }, { "FieldName": "Timestamp", "FieldType": "string" } ] }, "Union": null, "UnorderedList": null, "OrderedList": null }, "Timestamp": "Mon Sep 23 00:25:26 PDT 2013" }
diff --git a/asterix-app/src/test/resources/metadata/testsuite.xml b/asterix-app/src/test/resources/metadata/testsuite.xml
index a668801..9a69eff 100644
--- a/asterix-app/src/test/resources/metadata/testsuite.xml
+++ b/asterix-app/src/test/resources/metadata/testsuite.xml
@@ -120,6 +120,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="basic">
+      <compilation-unit name="metadata_compaction_policy">
+        <output-dir compare="Text">metadata_compaction_policy</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="basic">
       <compilation-unit name="metadata_dataset">
         <output-dir compare="Text">metadata_dataset</output-dir>
       </compilation-unit>
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
index a752afa..410f207 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
@@ -61,12 +61,13 @@
     /**
      * @param datasetId
      * @param entityHashValue
+     * @param lockMode
      * @param txnContext
      * @throws ACIDException
      *             TODO
      * @return 
      */
-    public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext)
+    public void unlock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
             throws ACIDException;
 
     /**
diff --git a/asterix-maven-plugins/pom.xml b/asterix-maven-plugins/pom.xml
index 56bc697..b26a6d6 100644
--- a/asterix-maven-plugins/pom.xml
+++ b/asterix-maven-plugins/pom.xml
@@ -35,5 +35,6 @@
 
   <modules>
     <module>lexer-generator-maven-plugin</module>
+    <module>record-manager-generator-maven-plugin</module>
   </modules>
 </project>
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/pom.xml b/asterix-maven-plugins/record-manager-generator-maven-plugin/pom.xml
new file mode 100644
index 0000000..a4470d6
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/pom.xml
@@ -0,0 +1,72 @@
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ ! 
+ !     http://www.apache.org/licenses/LICENSE-2.0
+ ! 
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>edu.uci.ics.asterix</groupId>
+  <artifactId>record-manager-generator-maven-plugin</artifactId>
+    <parent>
+        <artifactId>asterix-maven-plugins</artifactId>
+        <groupId>edu.uci.ics.asterix</groupId>
+        <version>0.8.1-SNAPSHOT</version>
+    </parent>
+
+  <packaging>maven-plugin</packaging>
+  <name>record-manager-generator-maven-plugin</name>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+          <fork>true</fork>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.8.1</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.maven</groupId>
+      <artifactId>maven-plugin-api</artifactId>
+      <version>2.0.2</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.maven</groupId>
+      <artifactId>maven-artifact</artifactId>
+      <version>2.0.2</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.maven</groupId>
+      <artifactId>maven-project</artifactId>
+      <version>2.0.2</version>
+    </dependency>
+	<dependency>
+      <groupId>org.json</groupId>
+      <artifactId>json</artifactId>
+      <version>20090211</version>
+      <type>jar</type>
+    </dependency>
+  </dependencies>    
+</project>
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java
new file mode 100644
index 0000000..14d8a7e
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.recordmanagergenerator;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import edu.uci.ics.asterix.recordmanagergenerator.RecordType.Field;
+
+public class Generator {
+    
+    public enum TemplateType {
+        RECORD_MANAGER,
+        ARENA_MANAGER,
+        SUPPORT
+    }
+    
+    public static void generateSource(
+            TemplateType tmplType, 
+            String packageName,
+            RecordType rec, 
+            InputStream is, 
+            StringBuilder sb, 
+            boolean debug) {
+        try {
+            BufferedReader in = new BufferedReader(new InputStreamReader(is));
+
+            switch (tmplType) {
+                case RECORD_MANAGER:
+                    generateMemoryManagerSource(packageName, rec, in, sb, debug);
+                    break;
+                case ARENA_MANAGER:
+                    generateArenaManagerSource(packageName, rec, in, sb, debug);
+                    break;
+                case SUPPORT:
+                    generateSupportFileSource(packageName, in, sb, debug);
+                    break;
+                default:
+                    throw new IllegalArgumentException();
+            }        
+        } catch (IOException ioe) {
+            ioe.printStackTrace();
+        }
+
+    }
+
+    private static void generateMemoryManagerSource(
+            String packageName,
+            RecordType resource, 
+            BufferedReader in, 
+            StringBuilder sb, 
+            boolean debug) throws IOException {
+        String line = null;
+        String indent = "    ";
+
+        while((line = in.readLine()) != null) {
+            if (line.contains("@PACKAGE@")) {
+                line = line.replace("@PACKAGE@", packageName);
+            }
+            if (line.contains("@E@")) {
+                line = line.replace("@E@", resource.name);
+            }
+            if (line.contains("@DEBUG@")) {
+                line = line.replace("@DEBUG@", Boolean.toString(debug));
+            }
+            if (line.contains("@CONSTS@")) {
+                resource.appendConstants(sb, indent, 1);
+                sb.append('\n');
+            } else if (line.contains("@METHODS@")) {
+                for (int i = 0; i < resource.size(); ++i) {
+                    final Field field = resource.fields.get(i);
+                    if (field.accessible) {
+                        field.appendMemoryManagerGetMethod(sb, indent, 1);
+                        sb.append('\n');
+                        field.appendMemoryManagerSetMethod(sb, indent, 1);
+                        sb.append('\n');
+                    }
+                }
+            } else if (line.contains("@INIT_SLOT@")) {
+                for (int i = 0; i < resource.size(); ++i) {                        
+                    final Field field = resource.fields.get(i);
+                    field.appendInitializers(sb, indent, 3);
+                }
+            } else if (line.contains("@CHECK_SLOT@")) {
+                for (int i = 0; i < resource.size(); ++i) {                        
+                    final Field field = resource.fields.get(i);
+                    field.appendChecks(sb, indent, 3);
+                }
+            } else if (line.contains("@PRINT_BUFFER@")) {
+                resource.appendBufferPrinter(sb, indent, 3);
+                sb.append('\n');
+            } else {
+                sb.append(line).append('\n');
+            }
+        }
+    }
+
+    private static void generateArenaManagerSource(
+            String packageName,
+            RecordType resource, 
+            BufferedReader in, 
+            StringBuilder sb, 
+            boolean debug) throws IOException {
+        String line = null;
+        String indent = "    ";
+
+        while((line = in.readLine()) != null) {
+            if (line.contains("@PACKAGE@")) {
+                line = line.replace("@PACKAGE@", packageName);
+            }
+            if (line.contains("@E@")) {
+                line = line.replace("@E@", resource.name);
+            }
+            if (line.contains("@DEBUG@")) {
+                line = line.replace("@DEBUG@", Boolean.toString(debug));
+            }
+            if (line.contains("@METHODS@")) {
+                for (int i = 0; i < resource.size(); ++i) {
+                    final Field field = resource.fields.get(i);
+                    if (field.accessible) {
+                        field.appendArenaManagerGetMethod(sb, indent, 1);
+                        sb.append('\n');
+                        field.appendArenaManagerSetMethod(sb, indent, 1);
+                        sb.append('\n');
+                    }
+                }
+            } else {
+                sb.append(line).append('\n');
+            }
+        }
+    }
+
+    private static void generateSupportFileSource(
+            String packageName,
+            BufferedReader in, 
+            StringBuilder sb,
+            boolean debug) throws IOException {
+        String line = null;
+        while((line = in.readLine()) != null) {
+            if (line.contains("@PACKAGE@")) {
+                line = line.replace("@PACKAGE@", packageName);
+            }
+            sb.append(line).append('\n');
+        }
+    }
+}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java
new file mode 100644
index 0000000..79221c1
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.recordmanagergenerator;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.InputStream;
+import java.io.Reader;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.maven.plugin.AbstractMojo;
+import org.apache.maven.plugin.MojoExecutionException;
+import org.apache.maven.plugin.MojoFailureException;
+import org.apache.maven.project.MavenProject;
+import org.json.JSONException;
+
+/**
+ * @goal generate-record-manager
+ * @phase generate-sources
+ * @requiresDependencyResolution compile
+ */
+public class RecordManagerGeneratorMojo extends AbstractMojo {
+    
+    /**
+     * parameter injected from pom.xml
+     * 
+     * @parameter
+     */
+    private boolean debug;
+    /**
+     * parameter injected from pom.xml
+     * 
+     * @parameter
+     * @required
+     */
+    private String packageName;
+    /**
+     * parameter injected from pom.xml
+     * 
+     * @parameter
+     * @required
+     */
+    private File[] inputFiles;
+    /**
+     * @parameter default-value="${project}"
+     * @required
+     * @readonly
+     */
+    MavenProject project;
+    
+    
+    String recordManagerTemplate = "RecordManager.java"; 
+    String arenaManagerTemplate = "ArenaManager.java";
+    String[] supportTemplates = { "Stats.java", "AllocInfo.java", "TypeUtil.java" };
+    
+    private Map<String, RecordType> typeMap;
+
+    public RecordManagerGeneratorMojo() {        
+    }
+
+    private void readRecordTypes() throws MojoExecutionException {
+        if (debug) {
+            getLog().info("generating debug code");
+        }
+
+        typeMap = new HashMap<String, RecordType>();
+
+        for (int i = 0; i < inputFiles.length; ++i) {
+            try {
+                getLog().info("reading " + inputFiles[i].toString());
+                Reader read = new FileReader(inputFiles[i]);
+                RecordType type = RecordType.read(read);
+                // always add allocId to enable tracking of allocations 
+                type.addField("alloc id", RecordType.Type.SHORT, null);
+                type.addToMap(typeMap);
+            } catch (FileNotFoundException fnfe) {
+                throw new MojoExecutionException("cound not find type description file " + inputFiles[i], fnfe);
+            } catch (JSONException jse) {
+                throw new MojoExecutionException("cound not parse type description file " + inputFiles[i], jse);
+            }
+        }
+    }
+
+    public void execute() throws MojoExecutionException, MojoFailureException {
+        String outputPath = project.getBuild().getDirectory() + File.separator 
+                + "generated-sources" + File.separator
+                + "java" + File.separator
+                + packageName.replace('.', File.separatorChar);
+        File dir = new File(outputPath);
+        if (!dir.exists()) {
+            dir.mkdirs();
+        }
+
+        readRecordTypes();
+        
+        for (String recordType : typeMap.keySet()) {
+            generateSource(Generator.TemplateType.RECORD_MANAGER, recordManagerTemplate, recordType, outputPath);
+            generateSource(Generator.TemplateType.ARENA_MANAGER, arenaManagerTemplate, recordType, outputPath);
+        }
+        
+        for (int i = 0; i < supportTemplates.length; ++i) {
+            generateSource(Generator.TemplateType.SUPPORT, supportTemplates[i], "", outputPath);
+        }
+    }
+
+    private void generateSource(Generator.TemplateType mgrType, String template, String recordType, String outputPath) throws MojoFailureException {
+        InputStream is = getClass().getClassLoader().getResourceAsStream(template);
+        if (is == null) {
+            throw new MojoFailureException("template '" + template + "' not found in classpath");
+        }
+
+        StringBuilder sb = new StringBuilder();
+        File outputFile = new File(outputPath + File.separator + recordType + template);
+
+        try {
+            getLog().info("generating " + outputFile.toString());
+
+            Generator.generateSource(mgrType, packageName, typeMap.get(recordType), is, sb, debug);
+            is.close();
+
+            FileWriter outWriter = new FileWriter(outputFile);
+            outWriter.write(sb.toString());
+            outWriter.close();
+        } catch (Exception ex) {
+            getLog().error(ex);
+            throw new MojoFailureException("failed to generate " + outputFile.toString());
+        }
+    }
+}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java
new file mode 100644
index 0000000..a0f6c61
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java
@@ -0,0 +1,388 @@
+/*
+ * Copyright 2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.recordmanagergenerator;
+
+import java.io.Reader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Map;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+
+public class RecordType {
+    
+    enum Type {
+        BYTE  (1, "byte",  "get",      "put",      "(byte)0xde",          "TypeUtil.Byte.append"),
+        SHORT (2, "short", "getShort", "putShort", "(short)0xdead",       "TypeUtil.Short.append"),
+        INT   (4, "int",   "getInt",   "putInt",   "0xdeadbeef",          "TypeUtil.Int.append"),
+        GLOBAL(8, "long",  "getLong",  "putLong",  "0xdeadbeefdeadbeefl", "TypeUtil.Global.append");
+        
+        Type(int size, String javaType, String bbGetter, String bbSetter, String deadMemInitializer, String appender) {
+            this.size = size;
+            this.javaType = javaType;
+            this.bbGetter = bbGetter;
+            this.bbSetter = bbSetter;
+            this.deadMemInitializer = deadMemInitializer;
+            this.appender = appender;
+        }
+        
+        int size;
+        String javaType;
+        String bbGetter;
+        String bbSetter;
+        String deadMemInitializer;
+        String appender;
+    }
+    
+    static class Field {
+                
+        String name;
+        Type type;
+        String initial;
+        int offset;
+        boolean accessible = true;
+
+        Field(String name, Type type, String initial, int offset, boolean accessible) {
+            this.name = name;
+            this.type = type;
+            this.initial = initial;
+            this.offset = offset;
+            this.accessible = accessible;
+        }
+        
+        public static Field fromJSON(JSONObject obj) throws JSONException {
+            String name = obj.getString("name");
+            Type type = parseType(obj.getString("type"));
+            String initial = obj.optString("initial", null);
+            return new Field(name, type, initial, -1, true);
+        }
+        
+        private static Type parseType(String string) {
+            string = string.toUpperCase();
+            if (string.equals("GLOBAL")) {
+                return Type.GLOBAL;
+            } else if (string.equals("INT")) {
+                return Type.INT;
+            } else if (string.equals("SHORT")) {
+                return Type.SHORT;
+            } else if (string.equals("BYTE")) {
+                return Type.BYTE;
+            }
+            throw new IllegalArgumentException("Unknown type \"" + string + "\"");
+        }
+
+        String methodName(String prefix) {
+            String words[] = name.split(" ");
+            assert(words.length > 0);
+            StringBuilder sb = new StringBuilder(prefix);
+            for (int j = 0; j < words.length; ++j) {
+                String word = words[j];
+                sb.append(word.substring(0, 1).toUpperCase());
+                sb.append(word.substring(1));
+            }
+            return sb.toString();        
+        }
+
+        StringBuilder appendMemoryManagerGetMethod(StringBuilder sb, String indent, int level) {
+            sb = indent(sb, indent, level);
+            sb.append("public ")
+              .append(type.javaType)
+              .append(' ')
+              .append(methodName("get"))
+              .append("(int slotNum) {\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("final Buffer buf = buffers.get(slotNum / NO_SLOTS);\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("buf.checkSlot(slotNum % NO_SLOTS);\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("final ByteBuffer b = buf.bb;\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("return b.")
+              .append(type.bbGetter)
+              .append("((slotNum % NO_SLOTS) * ITEM_SIZE + ")
+              .append(offsetName())
+              .append(");\n");
+            sb = indent(sb, indent, level);
+            sb.append("}\n");
+            return sb;
+        }
+            
+        StringBuilder appendMemoryManagerSetMethod(StringBuilder sb, String indent, int level) {
+            sb = indent(sb, indent, level);
+            sb.append("public void ")
+              .append(methodName("set"))
+              .append("(int slotNum, ")
+              .append(type.javaType)
+              .append(" value) {\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("b.")
+              .append(type.bbSetter)
+              .append("((slotNum % NO_SLOTS) * ITEM_SIZE + ")
+              .append(offsetName())
+              .append(", value);\n");
+            sb = indent(sb, indent, level);
+            sb.append("}\n");
+            return sb;
+        }
+
+        StringBuilder appendArenaManagerGetMethod(StringBuilder sb, String indent, int level) {
+            sb = indent(sb, indent, level);
+            sb.append("public ")
+              .append(type.javaType)
+              .append(' ')
+              .append(methodName("get"))
+              .append("(long slotNum) {\n");
+            if (initial != null) {
+              sb = indent(sb, indent, level + 1);
+              sb.append("if (TRACK_ALLOC_ID) checkAllocId(slotNum);\n");
+            }
+            sb = indent(sb, indent, level + 1);
+            sb.append("final int arenaId = TypeUtil.Global.arenaId(slotNum);\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("final int localId = TypeUtil.Global.localId(slotNum);\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("return get(arenaId).")
+              .append(methodName("get"))
+              .append("(localId);\n");
+            sb = indent(sb, indent, level);
+            sb.append("}\n");
+            return sb;
+        }
+            
+        StringBuilder appendArenaManagerSetMethod(StringBuilder sb, String indent, int level) {
+            sb = indent(sb, indent, level);
+            sb.append("public void ")
+              .append(methodName("set"))
+              .append("(long slotNum, ")
+              .append(type.javaType)
+              .append(" value) {\n");
+            if (initial != null) {
+              sb = indent(sb, indent, level + 1);
+              sb.append("if (TRACK_ALLOC_ID) checkAllocId(slotNum);\n");
+            }
+            sb = indent(sb, indent, level + 1);
+            sb.append("final int arenaId = TypeUtil.Global.arenaId(slotNum);\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("final int localId = TypeUtil.Global.localId(slotNum);\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("get(arenaId).")
+              .append(methodName("set"))
+              .append("(localId, value);\n");
+            sb = indent(sb, indent, level);
+            sb.append("}\n");
+            return sb;
+        }
+        
+        StringBuilder appendInitializers(StringBuilder sb, String indent, int level) {
+            sb = indent(sb, indent, level);
+            sb.append("bb.")
+              .append(type.bbSetter)
+              .append("(slotNum * ITEM_SIZE + ")
+              .append(offsetName())
+              .append(", ");
+            if (initial != null) {
+                sb.append(initial);
+            } else {
+                sb.append(type.deadMemInitializer);
+            }
+            sb.append(");\n");
+            return sb;
+        }
+        
+        StringBuilder appendChecks(StringBuilder sb, String indent, int level) {
+            if (initial == null) {
+                return sb;
+            }
+            sb = indent(sb, indent, level);
+            sb.append("if (bb.")
+              .append(type.bbGetter)
+              .append("(itemOffset + ")
+              .append(offsetName())
+              .append(") == ")
+              .append(type.deadMemInitializer)
+              .append(") {\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("String msg = \"invalid value in field ")
+              .append(offsetName())
+              .append(" of slot \" + slotNum;\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("throw new IllegalStateException(msg);\n");
+            sb = indent(sb, indent, level);
+            sb.append("}\n");
+            return sb;
+        }
+        
+        String offsetName() {
+            String words[] = name.split(" ");
+            assert(words.length > 0);
+            StringBuilder sb = new StringBuilder(words[0].toUpperCase());
+            for (int j = 1; j < words.length; ++j) {
+                sb.append("_").append(words[j].toUpperCase());
+            }
+            sb.append("_OFF");
+            return sb.toString();
+        }
+        
+        int offset() {
+            return offset;
+        }
+    }
+
+    String name;
+    ArrayList<Field> fields;
+    int totalSize;
+    boolean modifiable = true;
+    
+    static StringBuilder indent(StringBuilder sb, String indent, int level) {
+        for (int i = 0; i < level; ++i) {
+            sb.append(indent);
+        }
+        return sb;
+    }
+    
+    public RecordType(String name) {
+        this.name = name;
+        fields = new ArrayList<Field>();
+        addField("next free slot", Type.INT, "-1", false);
+    }
+    
+    public static RecordType read(Reader reader) throws JSONException {
+        JSONTokener tok = new JSONTokener(reader);
+        JSONObject obj = new JSONObject(tok);
+        return fromJSON(obj);
+    }
+    
+    public static RecordType fromJSON(JSONObject obj) throws JSONException {
+        RecordType result = new RecordType(obj.getString("name"));
+        JSONArray fields = obj.getJSONArray("fields");
+        for (int i = 0; i < fields.length(); ++i) {
+            JSONObject field = fields.getJSONObject(i);
+            result.fields.add(Field.fromJSON(field));
+        }
+        return result;        
+    }
+    
+    public void addToMap(Map<String, RecordType> map) {
+        modifiable = false;
+        calcOffsetsAndSize();
+        map.put(name, this);
+    }
+
+    public void addField(String name, Type type, String initial) {
+        addField(name, type, initial, true);
+    }    
+    
+    private void addField(String name, Type type, String initial, boolean accessible) {
+        if (! modifiable) {
+            throw new IllegalStateException("cannot modify type anmore");
+        }
+        fields.add(new Field(name, type, initial, -1, accessible));
+    }
+     
+    private void calcOffsetsAndSize() {
+        Collections.sort(fields, new Comparator<Field>() {
+            public int compare(Field left, Field right) {
+                return right.type.size - left.type.size;
+            }
+        });
+        // sort fields by size and align the items
+        totalSize = 0;
+        int alignment = 0;
+        for (int i = 0; i < fields.size(); ++i) {            
+            final Field field = fields.get(i);
+            assert field.offset == -1;
+            field.offset = totalSize;
+            final int size = field.type.size;
+            totalSize += size;
+            if (size > alignment) alignment = size;
+        }
+        if (totalSize % alignment != 0) {
+            totalSize = ((totalSize / alignment) + 1) * alignment; 
+        }
+    }
+    
+    int size() {
+        return fields.size();
+    }
+    
+    static String padRight(String s, int n) {
+        return String.format("%1$-" + n + "s", s);  
+    }
+
+    static String padLeft(String s, int n) {
+        return String.format("%1$" + n + "s", s);  
+    }
+    
+    StringBuilder appendConstants(StringBuilder sb, String indent, int level) {
+        sb = indent(sb, indent, level);
+        sb.append("public static int ITEM_SIZE = ")
+          .append(totalSize)
+          .append(";\n");
+        for (int i = 0; i < fields.size(); ++i) {
+            final Field field = fields.get(i);
+            sb = indent(sb, indent, level);
+            sb.append("public static int ")
+              .append(field.offsetName())
+              .append(" = ")
+              .append(field.offset).append("; // size: ")
+              .append(field.type.size).append("\n");
+        }
+        return sb;
+    }
+    
+    StringBuilder appendBufferPrinter(StringBuilder sb, String indent, int level) {
+        int maxNameWidth = 0;
+        for (int i = 0; i < fields.size(); ++i) {
+            int width = fields.get(i).name.length();
+            if (width > maxNameWidth) {
+                maxNameWidth = width;
+            }
+        }
+        for (int i = 0; i < fields.size(); ++i) {
+            final Field field = fields.get(i);
+            sb = indent(sb, indent, level);
+            sb.append("sb.append(\"")
+              .append(padRight(field.name, maxNameWidth))
+              .append(" | \");\n");
+            sb = indent(sb, indent, level);
+            sb.append("for (int i = 0; i < NO_SLOTS; ++i) {\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append(field.type.javaType)
+              .append(" value = bb.")
+              .append(field.type.bbGetter)
+              .append("(i * ITEM_SIZE + ")
+              .append(field.offsetName())
+              .append(");\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("sb = ")
+              .append(field.type.appender)
+              .append("(sb, value);\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("sb.append(\" | \");\n");
+            sb = indent(sb, indent, level);
+            sb.append("}\n");
+            sb = indent(sb, indent, level);
+            sb.append("sb.append(\"\\n\");\n");
+        }
+        return sb;
+    }
+}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/AllocInfo.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/AllocInfo.java
new file mode 100644
index 0000000..ef8415f
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/AllocInfo.java
@@ -0,0 +1,35 @@
+package @PACKAGE@;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+public class AllocInfo {
+    String alloc;
+    String free;
+    
+    void alloc() {
+        alloc = getStackTrace();
+    }
+    
+    void free() {
+        free = getStackTrace();
+    }
+
+    private String getStackTrace() {
+        StringWriter sw = new StringWriter();
+        PrintWriter pw = new PrintWriter(sw);
+        new Exception().printStackTrace(pw);
+        pw.close();
+        String res = sw.toString();
+        // remove first 3 lines
+        int nlPos = 0;
+        for (int i = 0; i < 3; ++i) {
+            nlPos = res.indexOf('\n', nlPos) + 1; 
+        }
+        return res.substring(nlPos);
+    }
+    
+    public String toString() {
+        return "allocation stack:\n" + alloc + "\nfree stack\n" + free;
+    }
+}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java
new file mode 100644
index 0000000..4abb3af
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package @PACKAGE@;
+
+public class @E@ArenaManager {
+    
+    public static final boolean TRACK_ALLOC_ID = @DEBUG@;
+
+    private final int noArenas;
+    private final @E@RecordManager[] arenas;
+    private ThreadLocal<LocalManager> local;
+    
+    static class LocalManager {
+        int arenaId;
+        @E@RecordManager mgr;
+    }
+    
+    public @E@ArenaManager(final int noArenas, final long txnShrinkTimer) {
+        this.noArenas = noArenas;
+        arenas = new @E@RecordManager[noArenas];
+        for (int i = 0; i < noArenas; ++i) {
+            arenas[i] = new @E@RecordManager(txnShrinkTimer);
+        }
+        local = new ThreadLocal<LocalManager>() {
+            private int nextArena = 0; 
+
+            @Override
+            protected synchronized LocalManager initialValue() {
+                @E@RecordManager mgr = arenas[nextArena];
+                LocalManager res = new LocalManager();
+                res.mgr = mgr;
+                res.arenaId = nextArena;
+                nextArena = (nextArena + 1) % noArenas;
+                return res;
+            }
+        };
+    }
+
+    public long allocate() {
+        final LocalManager localManager = local.get();
+        final @E@RecordManager recMgr = localManager.mgr;
+        final int allocId = TRACK_ALLOC_ID ? (++recMgr.allocCounter % 0x7fff) : 0;
+        final int localId = recMgr.allocate();
+        
+        long result = TypeUtil.Global.build(localManager.arenaId, allocId, localId);
+
+        if (TRACK_ALLOC_ID) setAllocId(result, (short) allocId);
+
+        assert TypeUtil.Global.allocId(result) == allocId;
+        assert TypeUtil.Global.arenaId(result) == localManager.arenaId;
+        assert TypeUtil.Global.localId(result) == localId;
+        return result;
+    }
+    
+    public void deallocate(long slotNum) {
+        if (TRACK_ALLOC_ID) checkAllocId(slotNum);
+        final int arenaId = TypeUtil.Global.arenaId(slotNum);
+        get(arenaId).deallocate(TypeUtil.Global.localId(slotNum));
+    }
+    
+    public @E@RecordManager get(int i) {
+        return arenas[i];
+    }
+    
+    public @E@RecordManager local() {
+        return local.get().mgr;
+    }
+    
+    @METHODS@
+    
+    private void checkAllocId(long slotNum) {
+        final int refAllocId = TypeUtil.Global.allocId(slotNum);
+        final short curAllocId = getAllocId(slotNum);
+        if (refAllocId != curAllocId) {
+            String msg = "reference to slot " + slotNum
+                + " of arena " + TypeUtil.Global.arenaId(slotNum)
+                + " refers to version " + Integer.toHexString(refAllocId)
+                + " current version is " + Integer.toHexString(curAllocId);
+            AllocInfo a = getAllocInfo(slotNum);
+            if (a != null) {
+                msg += "\n" + a.toString();
+            }
+            throw new IllegalStateException(msg);
+        }
+    }
+    
+    public AllocInfo getAllocInfo(long slotNum) {
+        final int arenaId = TypeUtil.Global.arenaId(slotNum);
+        return get(arenaId).getAllocInfo(TypeUtil.Global.localId(slotNum));
+    }
+    
+    public StringBuilder append(StringBuilder sb) {
+        for (int i = 0; i < noArenas; ++i) {
+            sb.append("++++ arena ").append(i).append(" ++++\n");
+            arenas[i].append(sb);
+        }
+        return sb;
+    }
+    
+    public String toString() {
+        return append(new StringBuilder()).toString();
+    }
+
+    public Stats addTo(Stats s) {
+        s.arenas += noArenas;
+        for (int i = 0; i < noArenas; ++i) {
+            arenas[i].addTo(s);
+        }
+        return s;
+    }
+}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
new file mode 100644
index 0000000..9afc673
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
@@ -0,0 +1,320 @@
+/*
+ * Copyright 2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package @PACKAGE@;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+public class @E@RecordManager {
+
+    public static final boolean CHECK_SLOTS = @DEBUG@;
+    public static final boolean TRACK_ALLOC_LOC = @DEBUG@;
+    
+    static final int NO_SLOTS = 1000;
+
+    @CONSTS@
+
+    private final long txnShrinkTimer;
+    private long shrinkTimer;
+    private ArrayList<Buffer> buffers;
+    private int current;
+    private int occupiedSlots;
+    private boolean isShrinkTimerOn;
+
+    int allocCounter;
+    
+    public @E@RecordManager(long txnShrinkTimer) {
+        this.txnShrinkTimer = txnShrinkTimer;
+        buffers = new ArrayList<Buffer>();
+        buffers.add(new Buffer());
+        current = 0;
+        
+        allocCounter = 0;
+    }
+    
+    enum SlotSource {
+        NON_FULL,
+        UNINITIALIZED,
+        NEW
+    }
+    
+    synchronized int allocate() {
+        if (buffers.get(current).isFull()) {
+            final int size = buffers.size();
+            final int start = current + 1;
+            SlotSource source = SlotSource.NEW;
+            for (int j = start; j < start + size; ++j) {
+                // If we find a buffer with space, we use it. Otherwise we
+                // remember the first uninitialized one and use that one.
+                final int i = j % size;
+                final Buffer buffer = buffers.get(i);
+                if (buffer.isInitialized() && ! buffer.isFull()) {
+                    source = SlotSource.NON_FULL;
+                    current = i;
+                    break;
+                } else if (! buffer.isInitialized() && source == SlotSource.NEW) {
+                    source = SlotSource.UNINITIALIZED;
+                    current = i;
+                }
+            }
+            
+            switch (source) {
+                case NEW:
+                    buffers.add(new Buffer());
+                    current = buffers.size() - 1;
+                    break;
+                case UNINITIALIZED:
+                    buffers.get(current).initialize();
+                case NON_FULL:
+                    break;
+            }
+        }
+        ++occupiedSlots;
+        return buffers.get(current).allocate() + current * NO_SLOTS;
+    }
+
+    synchronized void deallocate(int slotNum) {
+        buffers.get(slotNum / NO_SLOTS).deallocate(slotNum % NO_SLOTS);
+        --occupiedSlots;
+
+        if (needShrink()) {
+            shrink();
+        }
+    }
+
+    /**
+     * Shrink policy:
+     * Shrink when the resource under-utilization lasts for a certain amount of time.
+     * TODO Need to figure out which of the policies is better
+     * case1.
+     * buffers status : O x x x x x O (O is initialized, x is deinitialized)
+     * In the above status, 'CURRENT' needShrink() returns 'TRUE'
+     * even if there is nothing to shrink or deallocate.
+     * It doesn't distinguish the deinitialized children from initialized children
+     * by calculating totalNumOfSlots = buffers.size() * ChildEntityLockInfoArrayManager.NUM_OF_SLOTS.
+     * In other words, it doesn't subtract the deinitialized children's slots.
+     * case2.
+     * buffers status : O O x x x x x
+     * However, in the above case, if we subtract the deinitialized children's slots,
+     * needShrink() will return false even if we shrink the buffers at this case.
+     * 
+     * @return
+     */
+    private boolean needShrink() {
+        int size = buffers.size();
+        int usedSlots = occupiedSlots;
+        if (usedSlots == 0) {
+            usedSlots = 1;
+        }
+
+        if (size > 1 && size * NO_SLOTS / usedSlots >= 3) {
+            if (isShrinkTimerOn) {
+                if (System.currentTimeMillis() - shrinkTimer >= txnShrinkTimer) {
+                    isShrinkTimerOn = false;
+                    return true;
+                }
+            } else {
+                //turn on timer
+                isShrinkTimerOn = true;
+                shrinkTimer = System.currentTimeMillis();
+            }
+        } else {
+            //turn off timer
+            isShrinkTimerOn = false;
+        }
+
+        return false;
+    }
+
+    /**
+     * Shrink() may
+     * deinitialize(:deallocates ByteBuffer of child) Children(s) or
+     * shrink buffers according to the deinitialized children's contiguity status.
+     * It doesn't deinitialze or shrink more than half of children at a time.
+     */
+    private void shrink() {
+        int i;
+        int removeCount = 0;
+        int size = buffers.size();
+        int maxDecreaseCount = size / 2;
+        Buffer buffer;
+
+        //The first buffer never be deinitialized.
+        for (i = 1; i < size; i++) {
+            if (buffers.get(i).isEmpty()) {
+                buffers.get(i).deinitialize();
+            }
+        }
+
+        //remove the empty buffers from the end
+        for (i = size - 1; i >= 1; i--) {
+            buffer = buffers.get(i);
+            if (! buffer.isInitialized()) {
+                buffers.remove(i);
+                if (++removeCount == maxDecreaseCount) {
+                    break;
+                }
+            } else {
+                break;
+            }
+        }
+        
+        //reset allocChild to the first buffer
+        current = 0;
+
+        isShrinkTimerOn = false;
+    }
+
+    @METHODS@
+    
+    public AllocInfo getAllocInfo(int slotNum) {
+        final Buffer buf = buffers.get(slotNum / NO_SLOTS);
+        if (buf.allocList == null) {
+            return null;
+        } else {
+            return buf.allocList.get(slotNum % NO_SLOTS);
+        }
+    }
+    
+    StringBuilder append(StringBuilder sb) {
+        sb.append("+++ current: ")
+          .append(current)
+          .append(" no occupied slots: ")
+          .append(occupiedSlots)
+          .append(" +++\n");
+        for (int i = 0; i < buffers.size(); ++i) {
+            buffers.get(i).append(sb);
+            sb.append("\n");
+        }
+        return sb;
+    }
+    
+    public String toString() {
+        return append(new StringBuilder()).toString();
+    }
+
+    public Stats addTo(Stats s) {
+        final int size = buffers.size();
+        s.buffers += size;
+        s.slots += size * NO_SLOTS;
+        s.size += size * NO_SLOTS * ITEM_SIZE;
+        for (int i = 0; i < size; ++i) {
+            buffers.get(i).addTo(s);
+        }
+        return s;
+    }
+    
+    static class Buffer {
+        private ByteBuffer bb = null; // null represents 'deinitialized' state.
+        private int freeSlotNum;
+        private int occupiedSlots;
+        
+        ArrayList<AllocInfo> allocList;
+        
+        Buffer() {
+            initialize();
+        }
+
+        void initialize() {
+            bb = ByteBuffer.allocate(NO_SLOTS * ITEM_SIZE);
+            freeSlotNum = 0;
+            occupiedSlots = 0;
+
+            for (int i = 0; i < NO_SLOTS - 1; i++) {
+                setNextFreeSlot(i, i + 1);
+            }
+            setNextFreeSlot(NO_SLOTS - 1, -1); //-1 represents EOL(end of link)
+            
+            if (TRACK_ALLOC_LOC) {
+                allocList = new ArrayList<AllocInfo>(NO_SLOTS);
+                for (int i = 0; i < NO_SLOTS; ++i) {
+                    allocList.add(new AllocInfo());
+                }
+            }
+        }
+        
+        public void deinitialize() {
+            if (TRACK_ALLOC_LOC) allocList = null;
+            bb = null;
+        }
+        
+        public boolean isInitialized() {
+            return bb != null;
+        }
+
+        public boolean isFull() {
+            return freeSlotNum == -1;
+        }
+
+        public boolean isEmpty() {
+            return occupiedSlots == 0;
+        }
+        
+        public int allocate() {
+            int slotNum = freeSlotNum;
+            freeSlotNum = getNextFreeSlot(slotNum);
+            @INIT_SLOT@
+            occupiedSlots++;
+            if (TRACK_ALLOC_LOC) allocList.get(slotNum).alloc();
+            return slotNum;
+        }
+    
+        public void deallocate(int slotNum) {
+            @INIT_SLOT@
+            setNextFreeSlot(slotNum, freeSlotNum);
+            freeSlotNum = slotNum;
+            occupiedSlots--;
+            if (TRACK_ALLOC_LOC) allocList.get(slotNum).free();
+        }
+
+        public int getNextFreeSlot(int slotNum) {
+            return bb.getInt(slotNum * ITEM_SIZE + NEXT_FREE_SLOT_OFF);
+        }    
+            
+        public void setNextFreeSlot(int slotNum, int nextFreeSlot) {
+            bb.putInt(slotNum * ITEM_SIZE + NEXT_FREE_SLOT_OFF, nextFreeSlot);
+        }
+
+        StringBuilder append(StringBuilder sb) {
+            sb.append("++ free slot: ")
+              .append(freeSlotNum)
+              .append(" no occupied slots: ")
+              .append(occupiedSlots)
+              .append(" ++\n");
+            @PRINT_BUFFER@
+            return sb;
+        }
+        
+        public String toString() {
+            return append(new StringBuilder()).toString();
+        }
+        
+        public void addTo(Stats s) {
+            if (isInitialized()) {
+                s.items += occupiedSlots;
+            }
+        }
+        
+        private void checkSlot(int slotNum) {
+            if (! CHECK_SLOTS) {
+                return;
+            }
+            final int itemOffset = (slotNum % NO_SLOTS) * ITEM_SIZE;
+            // @CHECK_SLOT@
+        }
+    }
+    
+}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/Stats.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/Stats.java
new file mode 100644
index 0000000..c136101
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/Stats.java
@@ -0,0 +1,19 @@
+package @PACKAGE@;
+
+public class Stats {
+    int arenas  = 0;
+    int buffers = 0;
+    int slots   = 0;
+    int items   = 0;
+    int size    = 0;
+    
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("{ arenas : ").append(arenas);
+        sb.append(", buffers : ").append(buffers);
+        sb.append(", slots : ").append(slots);
+        sb.append(", items : ").append(items);
+        sb.append(", size : ").append(size).append(" }");
+        return sb.toString();
+    }
+}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/TypeUtil.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/TypeUtil.java
new file mode 100644
index 0000000..9571156
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/TypeUtil.java
@@ -0,0 +1,59 @@
+package @PACKAGE@;
+
+public class TypeUtil {
+    
+    public static class Byte {
+        public static StringBuilder append(StringBuilder sb, byte b) {
+            return sb.append(String.format("%1$18x", b));
+        }
+    }
+
+    public static class Short {
+        public static StringBuilder append(StringBuilder sb, short s) {
+            return sb.append(String.format("%1$18x", s));
+        }
+    }
+
+    public static class Int {
+        public static StringBuilder append(StringBuilder sb, int i) {
+            return sb.append(String.format("%1$18x", i));
+        }
+    }
+
+    public static class Global {
+
+        public static long build(int arenaId, int allocId, int localId) {
+            long result = arenaId;
+            result = result << 48;
+            result |= (allocId << 32);
+            result |= localId;
+            return result;
+        }
+
+        public static int arenaId(long l) {
+            return (int)((l >>> 48) & 0xffff);
+        }
+
+        public static int allocId(long l) {
+            return (int)((l >>> 32) & 0xffff);
+        }
+
+        public static int localId(long l) {
+            return (int) (l & 0xffffffffL);
+        }
+        
+        public static StringBuilder append(StringBuilder sb, long l) {
+            sb.append(String.format("%1$4x", TypeUtil.Global.arenaId(l)));
+            sb.append(':');
+            sb.append(String.format("%1$4x", TypeUtil.Global.allocId(l)));
+            sb.append(':');
+            sb.append(String.format("%1$8x", TypeUtil.Global.localId(l)));
+            return sb;
+        }
+        
+        public static String toString(long l) {
+            return append(new StringBuilder(), l).toString();
+        }
+        
+    }
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index c616fbf..1f65f7f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -154,8 +154,8 @@
     }
 
     @Override
-    public void unlock(MetadataTransactionContext ctx) throws RemoteException, ACIDException {
-        metadataNode.unlock(ctx.getJobId());
+    public void unlock(MetadataTransactionContext ctx, byte lockMode) throws RemoteException, ACIDException {
+        metadataNode.unlock(ctx.getJobId(), lockMode);
     }
 
     @Override
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 29452f3..a2492fa 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -140,9 +140,9 @@
     }
 
     @Override
-    public void unlock(JobId jobId) throws ACIDException, RemoteException {
+    public void unlock(JobId jobId, byte lockMode) throws ACIDException, RemoteException {
         ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
-        transactionSubsystem.getLockManager().unlock(METADATA_DATASET_ID, -1, txnCtx);
+        transactionSubsystem.getLockManager().unlock(METADATA_DATASET_ID, -1, lockMode, txnCtx);
     }
 
     @Override
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
index 392c8a1..5fbd06d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
@@ -105,7 +105,7 @@
      * @throws ACIDException
      * @throws RemoteException
      */
-    public void unlock(MetadataTransactionContext ctx) throws ACIDException, RemoteException;
+    public void unlock(MetadataTransactionContext ctx, byte lockMode) throws ACIDException, RemoteException;
 
     /**
      * Inserts a new dataverse into the metadata.
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
index f27268f..fce3f25 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
@@ -84,7 +84,7 @@
      * @throws ACIDException
      * @throws RemoteException
      */
-    public void unlock(JobId jobId) throws ACIDException, RemoteException;
+    public void unlock(JobId jobId, byte lockMode) throws ACIDException, RemoteException;
 
     /**
      * Inserts a new dataverse into the metadata, acquiring local locks on
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
index 8452340..f83afbb 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -149,7 +149,7 @@
     public static final int COMPACTION_POLICY_ARECORD_CLASSNAME_FIELD_INDEX = 2;
 
     private static ARecordType createCompactionPolicyRecordType() throws AsterixException {
-        String[] fieldNames = { "DataverseName", "PolicyName", "Classname" };
+        String[] fieldNames = { "DataverseName", "CompactionPolicy", "Classname" };
         IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING };
         return new ARecordType("CompactionPolicyRecordType", fieldNames, fieldTypes, true);
     }
diff --git a/asterix-transactions/pom.xml b/asterix-transactions/pom.xml
index 3123008..ef06acc 100644
--- a/asterix-transactions/pom.xml
+++ b/asterix-transactions/pom.xml
@@ -33,6 +33,47 @@
 					<fork>true</fork>
 				</configuration>
 			</plugin>
+            <plugin>
+                <groupId>edu.uci.ics.asterix</groupId>
+                <artifactId>record-manager-generator-maven-plugin</artifactId>
+                <version>0.8.1-SNAPSHOT</version>
+                <configuration>
+                    <debug>false</debug>
+                    <inputFiles>
+                        <param>src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Job.json</param>
+                        <param>src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Resource.json</param>
+                        <param>src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Request.json</param>
+                    </inputFiles>
+                    <packageName>edu.uci.ics.asterix.transaction.management.service.locking</packageName>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>generate-record-manager</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>generate-record-manager</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>${project.build.directory}/generated-sources/java/</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
 		</plugins>
 
 	</build>
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
index dcc4d40..e16e106 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
@@ -64,7 +64,7 @@
     public void complete(ITupleReference tuple) throws HyracksDataException {
         int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
         try {
-            lockManager.unlock(datasetId, pkHash, txnCtx);
+            lockManager.unlock(datasetId, pkHash, LockMode.S, txnCtx);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
index efe1daa..9957edf 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
@@ -58,7 +58,7 @@
     public void cancel(ITupleReference tuple) throws HyracksDataException {
         int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
         try {
-            lockManager.unlock(datasetId, pkHash, txnCtx);
+            lockManager.unlock(datasetId, pkHash, LockMode.S, txnCtx);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
new file mode 100644
index 0000000..c4eeb9d
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -0,0 +1,1000 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.transactions.DatasetId;
+import edu.uci.ics.asterix.common.transactions.ILockManager;
+import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.transactions.ITransactionManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
+
+/**
+ * An implementation of the ILockManager interface.
+ * 
+ * @author tillw
+ */
+public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent {
+
+    private static final Logger LOGGER = Logger.getLogger(ConcurrentLockManager.class.getName());
+
+    public static final boolean IS_DEBUG_MODE = false;//true
+
+    private TransactionSubsystem txnSubsystem;
+    private ResourceGroupTable table;
+    private ResourceArenaManager resArenaMgr;
+    private RequestArenaManager reqArenaMgr;
+    private JobArenaManager jobArenaMgr;
+    private ConcurrentHashMap<Integer, Long> jobIdSlotMap;
+    private ThreadLocal<DatasetLockCache> dsLockCache;
+        
+    enum LockAction {
+        ERR(false, false),
+        GET(false, false),
+        UPD(false, true), // special version of GET that updates the max lock mode
+        WAIT(true, false),
+        CONV(true, true) // convert (upgrade) a lock (e.g. from S to X)
+        ;
+        boolean wait;
+        boolean modify;
+        LockAction(boolean wait, boolean modify) {
+            this.wait = wait;
+            this.modify = modify;
+        }
+    }
+    
+    static LockAction[][] ACTION_MATRIX = {
+        // new    NL              IS               IX                S                X
+        { LockAction.ERR, LockAction.UPD,  LockAction.UPD,  LockAction.UPD,  LockAction.UPD  }, // NL
+        { LockAction.ERR, LockAction.GET,  LockAction.UPD,  LockAction.UPD,  LockAction.WAIT }, // IS
+        { LockAction.ERR, LockAction.GET,  LockAction.GET,  LockAction.WAIT, LockAction.WAIT }, // IX
+        { LockAction.ERR, LockAction.GET,  LockAction.WAIT, LockAction.GET,  LockAction.WAIT }, // S
+        { LockAction.ERR, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT }  // X
+    };
+        
+    public ConcurrentLockManager(TransactionSubsystem txnSubsystem) throws ACIDException {
+        this.txnSubsystem = txnSubsystem;
+        
+        this.table = new ResourceGroupTable();
+        
+        final int lockManagerShrinkTimer = txnSubsystem.getTransactionProperties()
+                .getLockManagerShrinkTimer();
+
+        int noArenas = Runtime.getRuntime().availableProcessors() * 2;
+
+        resArenaMgr = new ResourceArenaManager(noArenas, lockManagerShrinkTimer);
+        reqArenaMgr = new RequestArenaManager(noArenas, lockManagerShrinkTimer);
+        jobArenaMgr = new JobArenaManager(noArenas, lockManagerShrinkTimer);
+        jobIdSlotMap = new ConcurrentHashMap<>();
+        dsLockCache = new ThreadLocal<DatasetLockCache>() {
+            protected DatasetLockCache initialValue() {
+                return new DatasetLockCache();
+            }
+        };
+    }
+
+    public AsterixTransactionProperties getTransactionProperties() {
+        return this.txnSubsystem.getTransactionProperties();
+    }
+
+    @Override
+    public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+            throws ACIDException {
+        log("lock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+        
+        final int dsId = datasetId.getId();        
+        final int jobId = txnContext.getJobId().getId();
+        
+        if (entityHashValue != -1) {
+            // get the intention lock on the dataset, if we want to lock an individual item
+            final byte dsLockMode = LockMode.intentionMode(lockMode);
+            if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
+                lock(datasetId, -1, dsLockMode, txnContext);
+                dsLockCache.get().put(jobId, dsId, dsLockMode);
+            }
+        }
+
+        final long jobSlot = findOrAllocJobSlot(jobId);
+        
+        final ResourceGroup group = table.get(datasetId, entityHashValue);
+        group.getLatch();
+        try {
+            validateJob(txnContext);
+            
+            final long resSlot = findOrAllocResourceSlot(group, dsId, entityHashValue);
+            final long reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
+            boolean locked = false;
+            while (! locked) {
+                final LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
+                switch (act) {
+                    case UPD:
+                        resArenaMgr.setMaxMode(resSlot, lockMode);
+                        // no break
+                    case GET:
+                        addHolder(reqSlot, resSlot, jobSlot);
+                        locked = true;
+                        break;
+                    case WAIT:
+                    case CONV:
+                        enqueueWaiter(group, reqSlot, resSlot, jobSlot, act, txnContext);
+                        break;
+                    case ERR:
+                    default:
+                        throw new IllegalStateException();
+                }
+            }
+        } finally {
+            group.releaseLatch();
+        }
+    }
+
+    private void enqueueWaiter(final ResourceGroup group, final long reqSlot,
+            final long resSlot, final long jobSlot, final LockAction act,
+            ITransactionContext txnContext) throws ACIDException {
+        final Queue queue = act.modify ? upgrader : waiter;
+        if (! introducesDeadlock(resSlot, jobSlot)) {
+            queue.add(reqSlot, resSlot, jobSlot);
+        } else {
+            requestAbort(txnContext);
+        }
+        try {
+            group.await(txnContext);
+        } finally {
+            queue.remove(reqSlot, resSlot, jobSlot);
+        }
+    }
+
+    /**
+     * determine if adding a job to the waiters of a resource will introduce a 
+     * cycle in the wait-graph where the job waits on itself
+     * @param resSlot the slot that contains the information about the resource
+     * @param jobSlot the slot that contains the information about the job
+     * @return true if a cycle would be introduced, false otherwise
+     */
+    private boolean introducesDeadlock(final long resSlot, final long jobSlot) {
+        synchronized (jobArenaMgr) {
+            long reqSlot = resArenaMgr.getLastHolder(resSlot);
+            while (reqSlot >= 0) {
+                long holderJobSlot = reqArenaMgr.getJobSlot(reqSlot);
+                if (holderJobSlot == jobSlot) {
+                    return true;
+                }
+                boolean scanWaiters = true;
+                long waiter = jobArenaMgr.getLastWaiter(holderJobSlot);
+                while (waiter >= 0) {
+                    long watingOnResSlot = reqArenaMgr.getResourceId(waiter);
+                    if (introducesDeadlock(watingOnResSlot, jobSlot)) {
+                        return true;
+                    }
+                    waiter = reqArenaMgr.getNextJobRequest(waiter);
+                    if (waiter < 0 && scanWaiters) {
+                        scanWaiters = false;
+                        waiter = jobArenaMgr.getLastUpgrader(holderJobSlot);
+                    }
+                }
+                reqSlot = reqArenaMgr.getNextRequest(reqSlot);
+            }
+            return false;
+        }
+    }
+
+    @Override
+    public void instantLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+            throws ACIDException {
+        log("instantLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+        
+        final int dsId = datasetId.getId();        
+        final int jobId = txnContext.getJobId().getId();
+        
+        if (entityHashValue != -1) {
+            // get the intention lock on the dataset, if we want to lock an individual item
+            final byte dsLockMode = LockMode.intentionMode(lockMode);
+            if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
+                lock(datasetId, -1, dsLockMode, txnContext);
+                dsLockCache.get().put(jobId, dsId, dsLockMode);
+            }
+        }
+
+        final ResourceGroup group = table.get(datasetId, entityHashValue);
+        if (group.firstResourceIndex.get() == -1l) {
+            validateJob(txnContext);
+            // if we do not have a resource in the group, we know that the
+            // resource that we are looking for is not locked 
+            return;
+        }
+        
+        // we only allocate a request slot if we actually have to wait
+        long reqSlot = -1;
+
+        group.getLatch();
+        try {
+            validateJob(txnContext);
+            
+            final long resSlot = findResourceInGroup(group, dsId, entityHashValue);
+            if (resSlot < 0) {
+                // if we don't find the resource, there are no locks on it.
+                return;
+            }
+
+            final long jobSlot = findOrAllocJobSlot(jobId);
+            
+            while (true) {
+                final LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
+                switch (act) {
+                    case UPD:
+                    case GET:
+                        return;
+                    case WAIT:
+                    case CONV:
+                        if (reqSlot == -1) {
+                            reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
+                        }
+                        enqueueWaiter(group, reqSlot, resSlot, jobSlot, act, txnContext);
+                        break;
+                    case ERR:
+                    default:
+                        throw new IllegalStateException();
+                }
+            }
+        } finally {
+            if (reqSlot != -1) {
+                // deallocate request, if we allocated one earlier
+                reqArenaMgr.deallocate(reqSlot);
+            }
+            group.releaseLatch();
+        }
+    }
+
+    @Override
+    public boolean tryLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+            throws ACIDException {
+        log("tryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+        
+        final int dsId = datasetId.getId();
+        final int jobId = txnContext.getJobId().getId();
+
+        if (entityHashValue != -1) {
+            // get the intention lock on the dataset, if we want to lock an individual item
+            final byte dsLockMode = LockMode.intentionMode(lockMode);
+            if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
+                if (! tryLock(datasetId, -1, dsLockMode, txnContext)) {
+                    return false;
+                }
+                dsLockCache.get().put(jobId, dsId, dsLockMode);
+            }
+        }
+
+        final long jobSlot = findOrAllocJobSlot(jobId);
+        
+        final ResourceGroup group = table.get(datasetId, entityHashValue);
+        group.getLatch();
+
+        try {
+            validateJob(txnContext);
+
+            final long resSlot = findOrAllocResourceSlot(group, dsId, entityHashValue);
+            final long reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
+            
+            final LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
+            switch (act) {
+                case UPD:
+                    resArenaMgr.setMaxMode(resSlot, lockMode);
+                    // no break
+                case GET:
+                    addHolder(reqSlot, resSlot, jobSlot);
+                    return true;
+                case WAIT:
+                case CONV:
+                    return false;
+                default:
+                    throw new IllegalStateException();
+            }
+        } finally {
+            group.releaseLatch();
+        }
+        
+        // if we did acquire the dataset lock, but not the entity lock, we keep
+        // it anyway and clean it up at the end of the job
+    }
+
+    @Override
+    public boolean instantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
+            ITransactionContext txnContext) throws ACIDException {
+        log("instantTryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+        
+        final int dsId = datasetId.getId();
+        final int jobId = txnContext.getJobId().getId();
+
+        if (entityHashValue != -1) {
+            // get the intention lock on the dataset, if we want to lock an individual item
+            final byte dsLockMode = LockMode.intentionMode(lockMode);
+            if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
+                if (! tryLock(datasetId, -1, dsLockMode, txnContext)) {
+                    return false;
+                }
+                dsLockCache.get().put(jobId, dsId, dsLockMode);
+            }
+        }
+
+        final ResourceGroup group = table.get(datasetId, entityHashValue);
+        if (group.firstResourceIndex.get() == -1l) {
+            validateJob(txnContext);
+            // if we do not have a resource in the group, we know that the
+            // resource that we are looking for is not locked 
+            return true;
+        }
+        
+        group.getLatch();
+        try {
+            validateJob(txnContext);
+            
+            final long resSlot = findResourceInGroup(group, dsId, entityHashValue);
+            if (resSlot < 0) {
+                // if we don't find the resource, there are no locks on it.
+                return true;
+            }
+
+            final long jobSlot = findOrAllocJobSlot(jobId);
+            
+            LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
+            switch (act) {
+                case UPD:
+                case GET:
+                    return true;
+                case WAIT:
+                case CONV:
+                    return false;
+                case ERR:
+                default:
+                    throw new IllegalStateException();
+                }
+        } finally {
+            group.releaseLatch();
+        }
+    }
+
+    @Override
+    public void unlock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext) throws ACIDException {
+        log("unlock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+
+        ResourceGroup group = table.get(datasetId, entityHashValue);
+        group.getLatch();
+        try {
+
+            int dsId = datasetId.getId();
+            long resource = findResourceInGroup(group, dsId, entityHashValue);
+            if (resource < 0) {
+                throw new IllegalStateException("resource (" + dsId + ",  " + entityHashValue + ") not found");
+            }
+
+            int jobId = txnContext.getJobId().getId();
+            long jobSlot = findOrAllocJobSlot(jobId);
+
+            long holder = removeLastHolder(resource, jobSlot, lockMode);
+
+            // deallocate request
+            reqArenaMgr.deallocate(holder);
+            // deallocate resource or fix max lock mode
+            if (resourceNotUsed(resource)) {
+                long prev = group.firstResourceIndex.get();
+                if (prev == resource) {
+                    group.firstResourceIndex.set(resArenaMgr.getNext(resource));
+                } else {
+                    while (resArenaMgr.getNext(prev) != resource) {
+                        prev = resArenaMgr.getNext(prev);
+                    }
+                    resArenaMgr.setNext(prev, resArenaMgr.getNext(resource));
+                }
+                resArenaMgr.deallocate(resource);
+            } else {
+                final int oldMaxMode = resArenaMgr.getMaxMode(resource);
+                final int newMaxMode = determineNewMaxMode(resource, oldMaxMode);
+                resArenaMgr.setMaxMode(resource, newMaxMode);
+                if (oldMaxMode != newMaxMode) {
+                    // the locking mode didn't change, current waiters won't be
+                    // able to acquire the lock, so we do not need to signal them
+                    group.wakeUp();
+                }
+            }
+        } finally {
+            group.releaseLatch();
+        }
+
+        // dataset intention locks are cleaned up at the end of the job
+    }
+    
+    @Override
+    public void releaseLocks(ITransactionContext txnContext) throws ACIDException {
+        log("releaseLocks", -1, -1, LockMode.ANY, txnContext);
+
+        int jobId = txnContext.getJobId().getId();
+        Long jobSlot = jobIdSlotMap.get(jobId);
+        if (jobSlot == null) {
+            // we don't know the job, so there are no locks for it - we're done
+            return;
+        }
+        synchronized (jobArenaMgr) {
+            long holder = jobArenaMgr.getLastHolder(jobSlot);
+            while (holder != -1) {
+                long resource = reqArenaMgr.getResourceId(holder);
+                int dsId = resArenaMgr.getDatasetId(resource);
+                int pkHashVal = resArenaMgr.getPkHashVal(resource);
+                unlock(new DatasetId(dsId), pkHashVal, LockMode.ANY, txnContext);
+                holder = jobArenaMgr.getLastHolder(jobSlot);
+            }
+            jobArenaMgr.deallocate(jobSlot);
+        }        
+        //System.err.println(table.append(new StringBuilder(), true).toString());        
+        //System.out.println("jobArenaMgr " + jobArenaMgr.addTo(new Stats()).toString());
+        //System.out.println("resArenaMgr " + resArenaMgr.addTo(new Stats()).toString());
+        //System.out.println("reqArenaMgr " + reqArenaMgr.addTo(new Stats()).toString());
+    }
+        
+    private long findOrAllocJobSlot(int jobId) {
+        Long jobSlot = jobIdSlotMap.get(jobId);
+        if (jobSlot == null) {
+            jobSlot = new Long(jobArenaMgr.allocate());
+            jobArenaMgr.setJobId(jobSlot, jobId);
+            Long oldSlot = jobIdSlotMap.putIfAbsent(jobId, jobSlot);
+            if (oldSlot != null) {
+                // if another thread allocated a slot for this jobId between
+                // get(..) and putIfAbsent(..), we'll use that slot and
+                // deallocate the one we allocated
+                jobArenaMgr.deallocate(jobSlot);
+                jobSlot = oldSlot;
+            }
+        }
+        assert(jobSlot >= 0);
+        return jobSlot;
+    }
+
+    private long findOrAllocResourceSlot(ResourceGroup group, int dsId, int entityHashValue) {
+        long resSlot = findResourceInGroup(group, dsId, entityHashValue);
+        
+        if (resSlot == -1) {
+            // we don't know about this resource, let's alloc a slot
+            resSlot = resArenaMgr.allocate();
+            resArenaMgr.setDatasetId(resSlot, dsId);
+            resArenaMgr.setPkHashVal(resSlot, entityHashValue);
+            resArenaMgr.setNext(resSlot, group.firstResourceIndex.get());
+            group.firstResourceIndex.set(resSlot);
+        }
+        return resSlot;
+    }
+
+    private long allocRequestSlot(long resSlot, long jobSlot, byte lockMode) {
+        long reqSlot = reqArenaMgr.allocate();
+        reqArenaMgr.setResourceId(reqSlot, resSlot);
+        reqArenaMgr.setLockMode(reqSlot, lockMode); // lock mode is a byte!!
+        reqArenaMgr.setJobSlot(reqSlot, jobSlot);
+        return reqSlot;
+    }
+
+    private LockAction determineLockAction(long resSlot, long jobSlot, byte lockMode) {
+        final int curLockMode = resArenaMgr.getMaxMode(resSlot);
+        final LockAction act = ACTION_MATRIX[curLockMode][lockMode];
+        if (act == LockAction.WAIT) {
+            return updateActionForSameJob(resSlot, jobSlot, lockMode);
+        }
+        return act;
+    }
+
+    /**
+     * when we've got a lock conflict for a different job, we always have to
+     * wait, if it is for the same job we either have to
+     * a) (wait and) convert the lock once conversion becomes viable or
+     * b) acquire the lock if we want to lock the same resource with the same 
+     * lock mode for the same job.
+     * @param resource the resource slot that's being locked
+     * @param job the job slot of the job locking the resource
+     * @param lockMode the lock mode that the resource should be locked with
+     * @return
+     */
+    private LockAction updateActionForSameJob(long resource, long job, byte lockMode) {
+        // TODO we can reduce the number of things we have to look at by
+        // carefully distinguishing the different lock modes
+        long holder = resArenaMgr.getLastHolder(resource);
+        LockAction res = LockAction.WAIT;
+        while (holder != -1) {
+            if (job == reqArenaMgr.getJobSlot(holder)) {
+                if (reqArenaMgr.getLockMode(holder) == lockMode) {
+                    return LockAction.GET;
+                } else {
+                    res = LockAction.CONV;
+                }
+            }
+            holder = reqArenaMgr.getNextRequest(holder);
+        }
+        return res;
+    }
+    
+    private long findResourceInGroup(ResourceGroup group, int dsId, int entityHashValue) {
+        long resSlot = group.firstResourceIndex.get();
+        while (resSlot != -1) {
+            // either we already have a lock on this resource or we have a 
+            // hash collision
+            if (resArenaMgr.getDatasetId(resSlot) == dsId && 
+                    resArenaMgr.getPkHashVal(resSlot) == entityHashValue) {
+                return resSlot;
+            } else {
+                resSlot = resArenaMgr.getNext(resSlot);
+            }
+        }
+        return -1;        
+    }
+    
+    private void addHolder(long request, long resource, long job) {
+        long lastHolder = resArenaMgr.getLastHolder(resource);
+        reqArenaMgr.setNextRequest(request, lastHolder);
+        resArenaMgr.setLastHolder(resource, request);
+        
+        synchronized (jobArenaMgr) {
+            long lastJobHolder = jobArenaMgr.getLastHolder(job);
+            insertIntoJobQueue(request, lastJobHolder);
+            jobArenaMgr.setLastHolder(job, request);
+        }
+    }
+    
+    private long removeLastHolder(long resource, long jobSlot, byte lockMode) {
+        long holder = resArenaMgr.getLastHolder(resource);
+        if (holder < 0) {
+            throw new IllegalStateException("no holder for resource " + resource);
+        }
+        
+        // remove from the list of holders for a resource
+        if (requestMatches(holder, jobSlot, lockMode)) {
+            // if the head of the queue matches, we need to update the resource
+            long next = reqArenaMgr.getNextRequest(holder);
+            resArenaMgr.setLastHolder(resource, next);
+        } else {
+            holder = removeRequestFromQueueForJob(holder, jobSlot, lockMode);
+        }
+        
+        synchronized (jobArenaMgr) {
+            // remove from the list of requests for a job
+            long newHead = removeRequestFromJob(jobSlot, holder);
+            jobArenaMgr.setLastHolder(jobSlot, newHead);            
+        }
+        return holder;
+    }
+
+    private boolean requestMatches(long holder, long jobSlot, byte lockMode) {
+        return jobSlot == reqArenaMgr.getJobSlot(holder) 
+                && (lockMode == LockMode.ANY
+                || lockMode == reqArenaMgr.getLockMode(holder));
+    }
+
+    private long removeRequestFromJob(long jobSlot, long holder) {
+        long prevForJob = reqArenaMgr.getPrevJobRequest(holder);
+        long nextForJob = reqArenaMgr.getNextJobRequest(holder);
+        if (nextForJob != -1) {
+            reqArenaMgr.setPrevJobRequest(nextForJob, prevForJob);
+        }
+        if (prevForJob == -1) {
+            return nextForJob;
+        } else {
+            reqArenaMgr.setNextJobRequest(prevForJob, nextForJob);
+            return -1;
+        }
+    }
+
+    interface Queue {
+        void add(long request, long resource, long job);
+        void remove(long request, long resource, long job);
+    }
+    
+    final Queue waiter = new Queue() {
+        public void add(long request, long resource, long job) {
+            long waiter = resArenaMgr.getFirstWaiter(resource);
+            reqArenaMgr.setNextRequest(request, -1);
+            if (waiter == -1) {
+                resArenaMgr.setFirstWaiter(resource, request);
+            } else {
+                appendToRequestQueue(waiter, request);
+            }
+            synchronized (jobArenaMgr) {
+                waiter = jobArenaMgr.getLastWaiter(job);
+                insertIntoJobQueue(request, waiter);
+                jobArenaMgr.setLastWaiter(job, request);
+            }            
+        }
+        public void remove(long request, long resource, long job) {
+            long waiter = resArenaMgr.getFirstWaiter(resource);
+            if (waiter == request) {
+                long next = reqArenaMgr.getNextRequest(waiter);
+                resArenaMgr.setFirstWaiter(resource, next);
+            } else {
+                waiter = removeRequestFromQueueForSlot(waiter, request);
+            }
+            synchronized (jobArenaMgr) {
+                // remove from the list of requests for a job
+                long newHead = removeRequestFromJob(job, waiter);
+                jobArenaMgr.setLastWaiter(job, newHead);            
+            }            
+        }
+    };
+        
+    final Queue upgrader = new Queue() {
+        public void add(long request, long resource, long job) {
+            long upgrader = resArenaMgr.getFirstUpgrader(resource);
+            reqArenaMgr.setNextRequest(request, -1);
+            if (upgrader == -1) {
+                resArenaMgr.setFirstUpgrader(resource, request);
+            } else {
+                appendToRequestQueue(upgrader, request);
+            }
+            synchronized (jobArenaMgr) {
+                upgrader = jobArenaMgr.getLastUpgrader(job);
+                insertIntoJobQueue(request, upgrader);
+                jobArenaMgr.setLastUpgrader(job, request);            
+            }
+        }
+        public void remove(long request, long resource, long job) {
+            long upgrader = resArenaMgr.getFirstUpgrader(resource);
+            if (upgrader == request) {
+                long next = reqArenaMgr.getNextRequest(upgrader);
+                resArenaMgr.setFirstUpgrader(resource, next);
+            } else {
+                upgrader = removeRequestFromQueueForSlot(upgrader, request);
+            }
+            synchronized (jobArenaMgr) {
+                // remove from the list of requests for a job
+                long newHead = removeRequestFromJob(job, upgrader);
+                jobArenaMgr.setLastUpgrader(job, newHead);            
+            }
+        }
+    };
+        
+    private void insertIntoJobQueue(long newRequest, long oldRequest) {
+        reqArenaMgr.setNextJobRequest(newRequest, oldRequest);
+        reqArenaMgr.setPrevJobRequest(newRequest, -1);
+        if (oldRequest >= 0) {
+            reqArenaMgr.setPrevJobRequest(oldRequest, newRequest);
+        }
+    }
+
+    private void appendToRequestQueue(long head, long appendee) {
+        long next = reqArenaMgr.getNextRequest(head);
+        while(next != -1) {
+            head = next;
+            next = reqArenaMgr.getNextRequest(head);
+        }
+        reqArenaMgr.setNextRequest(head, appendee);        
+    }
+        
+    private long removeRequestFromQueueForSlot(long head, long reqSlot) {
+        long cur = head;
+        long prev = cur;
+        while (prev != -1) {
+            cur = reqArenaMgr.getNextRequest(prev);
+            if (cur == -1) {
+                throw new IllegalStateException("request " + reqSlot+ " not in queue");
+            }
+            if (cur == reqSlot) {
+                break;
+            }
+            prev = cur;
+        }
+        long next = reqArenaMgr.getNextRequest(cur);
+        reqArenaMgr.setNextRequest(prev, next);
+        return cur;        
+    }
+    
+    /**
+     * remove the first request for a given job and lock mode from a request queue.
+     * If the value of the parameter lockMode is LockMode.NL the first request
+     * for the job is removed - independent of the LockMode.
+     * @param head the head of the request queue
+     * @param jobSlot the job slot
+     * @param lockMode the lock mode 
+     * @return the slot of the first request that matched the given job
+     */
+    private long removeRequestFromQueueForJob(long head, long jobSlot, byte lockMode) {
+        long holder = head;
+        long prev = holder;
+        while (prev != -1) {
+            holder = reqArenaMgr.getNextRequest(prev);
+            if (holder == -1) {
+                throw new IllegalStateException("no entry for job " + jobSlot + " in queue");
+            }
+            if (requestMatches(holder, jobSlot, lockMode)) {
+                break;
+            }
+            prev = holder;
+        }
+        long next = reqArenaMgr.getNextRequest(holder);
+        reqArenaMgr.setNextRequest(prev, next);
+        return holder;
+    }
+    
+    private int determineNewMaxMode(long resource, int oldMaxMode) {
+        int newMaxMode = LockMode.NL;
+        long holder = resArenaMgr.getLastHolder(resource);
+        while (holder != -1) {
+            int curLockMode = reqArenaMgr.getLockMode(holder);
+            if (curLockMode == oldMaxMode) {
+                // we have another lock of the same mode - we're done
+                return oldMaxMode;
+            }
+            switch (ACTION_MATRIX[newMaxMode][curLockMode]) {
+                case UPD:
+                    newMaxMode = curLockMode;
+                    break;
+                case GET:
+                    break;
+                case WAIT:
+                    throw new IllegalStateException("incompatible locks in holder queue");
+            }
+            holder = reqArenaMgr.getNextRequest(holder);
+        }
+        return newMaxMode;        
+    }
+    
+    private boolean resourceNotUsed(long resource) {
+        return resArenaMgr.getLastHolder(resource) == -1
+                && resArenaMgr.getFirstUpgrader(resource) == -1
+                && resArenaMgr.getFirstWaiter(resource) == -1;
+    }
+
+    private void log(String string, int id, int entityHashValue, byte lockMode, ITransactionContext txnContext) {
+        if (! LOGGER.isLoggable(Level.FINEST)) {
+            return;
+        }
+        StringBuilder sb = new StringBuilder();
+        sb.append("{ op : ").append(string);
+        if (id != -1) {
+            sb.append(" , dataset : ").append(id);
+        }
+        if (entityHashValue != -1) {
+            sb.append(" , entity : ").append(entityHashValue);
+        }
+        if (lockMode != LockMode.NL) {
+            sb.append(" , mode : ").append(LockMode.toString(lockMode));
+        }
+        if (txnContext != null) {
+            sb.append(" , jobId : ").append(txnContext.getJobId());            
+        }
+        sb.append(" }");
+        LOGGER.finest(sb.toString());
+    }
+
+    private void validateJob(ITransactionContext txnContext) throws ACIDException {
+        if (txnContext.getTxnState() == ITransactionManager.ABORTED) {
+            throw new ACIDException("" + txnContext.getJobId() + " is in ABORTED state.");
+        } else if (txnContext.isTimeout()) {
+            requestAbort(txnContext);
+        }
+    }
+
+    private void requestAbort(ITransactionContext txnContext) throws ACIDException {
+        txnContext.setTimeout(true);
+        throw new ACIDException("Transaction " + txnContext.getJobId()
+                + " should abort (requested by the Lock Manager)");
+    }
+
+    public StringBuilder append(StringBuilder sb) {
+        table.getAllLatches();
+        try {
+            sb.append(">>dump_begin\t>>----- [resTable] -----\n");
+            table.append(sb);
+            sb.append(">>dump_end\t>>----- [resTable] -----\n");
+
+            sb.append(">>dump_begin\t>>----- [resArenaMgr] -----\n");
+            resArenaMgr.append(sb);
+            sb.append(">>dump_end\t>>----- [resArenaMgr] -----\n");
+
+            sb.append(">>dump_begin\t>>----- [reqArenaMgr] -----\n");
+            reqArenaMgr.append(sb);
+            sb.append(">>dump_end\t>>----- [reqArenaMgr] -----\n");
+
+            sb.append(">>dump_begin\t>>----- [jobIdSlotMap] -----\n");
+            for(Integer i : jobIdSlotMap.keySet()) {
+                sb.append(i).append(" : ");
+                TypeUtil.Global.append(sb, jobIdSlotMap.get(i));
+                sb.append("\n");
+            }
+            sb.append(">>dump_end\t>>----- [jobIdSlotMap] -----\n");
+
+            sb.append(">>dump_begin\t>>----- [jobArenaMgr] -----\n");
+            jobArenaMgr.append(sb);
+            sb.append(">>dump_end\t>>----- [jobArenaMgr] -----\n");
+        } finally {
+            table.releaseAllLatches();
+        }
+        return sb;
+    }
+    
+    public String toString() {
+        return append(new StringBuilder()).toString();
+    }
+    
+    @Override
+    public String prettyPrint() throws ACIDException {
+        StringBuilder s = new StringBuilder("\n########### LockManager Status #############\n");
+        return append(s).toString() + "\n";
+    }
+
+    @Override
+    public void start() {
+        //no op
+    }
+
+    @Override
+    public void stop(boolean dumpState, OutputStream os) {
+        if (dumpState) {
+            try {
+                os.write(toString().getBytes());
+                os.flush();
+            } catch (IOException e) {
+                //ignore
+            }
+        }
+    }
+    
+    private static class DatasetLockCache {
+        private long jobId = -1;
+        private HashMap<Integer,Byte> lockCache =  new HashMap<Integer,Byte>();
+        // size 1 cache to avoid the boxing/unboxing that comes with the 
+        // access to the HashMap
+        private int cDsId = -1;
+        private byte cDsLockMode = -1;
+        
+        public boolean contains(final int jobId, final int dsId, byte dsLockMode) {
+            if (this.jobId == jobId) {
+                if (this.cDsId == dsId && this.cDsLockMode == dsLockMode) {
+                    return true;
+                }
+                final Byte cachedLockMode = this.lockCache.get(dsId);
+                if (cachedLockMode != null && cachedLockMode == dsLockMode) {
+                    this.cDsId = dsId;
+                    this.cDsLockMode = dsLockMode;
+                    return true;
+                }            
+            } else {
+                this.jobId = -1;
+                this.cDsId = -1;
+                this.cDsLockMode = -1;
+                this.lockCache.clear();
+            }
+            return false;
+        }
+        
+        public void put(final int jobId, final int dsId, byte dsLockMode) {
+            this.jobId = jobId;
+            this.cDsId = dsId;
+            this.cDsLockMode = dsLockMode;
+            this.lockCache.put(dsId, dsLockMode);
+        }
+        
+        public String toString() {
+            return "[ " + jobId + " : " + lockCache.toString() + "]";
+        }
+    }
+
+    private static class ResourceGroupTable {
+        public static final int TABLE_SIZE = 1024; // TODO increase?
+
+        private ResourceGroup[] table;
+        
+        public ResourceGroupTable() {
+            table = new ResourceGroup[TABLE_SIZE];
+            for (int i = 0; i < TABLE_SIZE; ++i) {
+                table[i] = new ResourceGroup();
+            }
+        }
+        
+        ResourceGroup get(DatasetId dId, int entityHashValue) {
+            // TODO ensure good properties of hash function
+            int h = Math.abs(dId.getId() ^ entityHashValue);
+            if (h < 0) h = 0;
+            return table[h % TABLE_SIZE];
+        }
+        
+        public void getAllLatches() {
+            for (int i = 0; i < TABLE_SIZE; ++i) {
+                table[i].getLatch();
+            }
+        }
+        
+        public void releaseAllLatches() {
+            for (int i = 0; i < TABLE_SIZE; ++i) {
+                table[i].releaseLatch();
+            }
+        }
+        
+        public StringBuilder append(StringBuilder sb) {
+            return append(sb, false);
+        }
+
+        public StringBuilder append(StringBuilder sb, boolean detail) {
+            for (int i = 0; i < table.length; ++i) {
+                sb.append(i).append(" : ");
+                if (detail) {
+                    sb.append(table[i]);
+                } else {
+                    sb.append(table[i].firstResourceIndex);
+                }
+                sb.append('\n');
+            }
+            return sb;
+        }
+    }
+    
+    private static class ResourceGroup {
+        private ReentrantReadWriteLock latch;
+        private Condition condition;
+        AtomicLong firstResourceIndex;
+
+        ResourceGroup() {
+            latch = new ReentrantReadWriteLock();
+            condition = latch.writeLock().newCondition();
+            firstResourceIndex = new AtomicLong(-1);
+        }
+        
+        void getLatch() {
+            log("latch");
+            latch.writeLock().lock();
+        }
+        
+        void releaseLatch() {
+            log("release");
+            latch.writeLock().unlock();
+        }
+        
+        boolean hasWaiters() {
+            return latch.hasQueuedThreads();
+        }
+        
+        void await(ITransactionContext txnContext) throws ACIDException {
+            log("wait for");
+            try {
+                condition.await();
+            } catch (InterruptedException e) {
+                throw new ACIDException(txnContext, "interrupted", e);
+            }
+        }
+        
+        void wakeUp() {
+            log("notify");
+            condition.signalAll();
+        }
+        
+        void log(String s) {
+            if (LOGGER.isLoggable(Level.FINEST)) {
+                LOGGER.finest(s + " " + toString());
+            }            
+        }
+        
+        public String toString() {
+            return "{ id : " + hashCode()
+                    + ", first : " + firstResourceIndex.toString() 
+                    + ", waiters : " + (hasWaiters() ? "true" : "false") + " }";
+        }
+    }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Job.json b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Job.json
new file mode 100644
index 0000000..a649b7c
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Job.json
@@ -0,0 +1,24 @@
+{ 
+    "name" : "Job",
+    "fields" : [
+        {
+            "name" : "last holder",
+            "type" : "GLOBAL",
+            "initial" : "-1"
+        },
+        {
+            "name" : "last waiter",
+            "type" : "GLOBAL",
+            "initial" : "-1"
+        },
+        {
+            "name" : "last upgrader",
+            "type" : "GLOBAL",
+            "initial" : "-1"
+        },
+        {
+            "name" : "job id",
+            "type" : "INT"
+        }
+    ]
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index c7df2f2..98b16c0 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -278,7 +278,7 @@
             did = entityInfoManager.getDatasetId(entityInfo);
             entityHashValue = entityInfoManager.getPKHashVal(entityInfo);
             if (did == datasetId.getId() && entityHashValue != -1) {
-                this.unlock(datasetId, entityHashValue, txnContext);
+                this.unlock(datasetId, entityHashValue, LockMode.ANY, txnContext);
             }
 
             entityInfo = prevEntityInfo;
@@ -638,7 +638,7 @@
     }
 
     @Override
-    public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext) throws ACIDException {
+    public void unlock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext) throws ACIDException {
         internalUnlock(datasetId, entityHashValue, txnContext, false);
     }
 
@@ -2211,7 +2211,7 @@
                     tempDatasetIdObj.setId(logRecord.getDatasetId());
                     tempJobIdObj.setId(logRecord.getJobId());
                     txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj, false);
-                    unlock(tempDatasetIdObj, logRecord.getPKHashValue(), txnCtx);
+                    unlock(tempDatasetIdObj, logRecord.getPKHashValue(), LockMode.ANY, txnCtx);
                     txnCtx.notifyOptracker(false);
                 } else if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
                     tempJobIdObj.setId(logRecord.getJobId());
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
index e61cb55..79625a6 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
@@ -24,6 +24,7 @@
 
 import org.apache.commons.io.FileUtils;
 
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
 import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
 import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
@@ -32,6 +33,7 @@
 import edu.uci.ics.asterix.common.transactions.ILockManager;
 import edu.uci.ics.asterix.common.transactions.ITransactionManager;
 import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -62,7 +64,7 @@
     ArrayList<LockRequest> requestList;
     ArrayList<ArrayList<Integer>> expectedResultList;
     int resultListIndex;
-    LockManager lockMgr;
+    ILockManager lockMgr;
     String requestFileName;
     long defaultWaitTime;
 
@@ -72,7 +74,7 @@
         this.workerReadyQueue = new WorkerReadyQueue();
         this.requestList = new ArrayList<LockRequest>();
         this.expectedResultList = new ArrayList<ArrayList<Integer>>();
-        this.lockMgr = (LockManager) txnProvider.getLockManager();
+        this.lockMgr = txnProvider.getLockManager();
         this.requestFileName = new String(requestFileName);
         this.resultListIndex = 0;
         this.defaultWaitTime = 10;
@@ -151,6 +153,7 @@
         if (isSuccess) {
             log("\n*** Test Passed ***");
         }
+        ((LogManager) txnProvider.getLogManager()).stop(false, null);
     }
 
     public boolean handleRequest(LockRequest request) throws ACIDException {
@@ -482,7 +485,7 @@
                         request.txnContext);
                 break;
             case RequestType.UNLOCK:
-                lockMgr.unlock(request.datasetIdObj, request.entityHashValue, request.txnContext);
+                lockMgr.unlock(request.datasetIdObj, request.entityHashValue, request.lockMode, request.txnContext);
                 break;
             case RequestType.RELEASE_LOCKS:
                 lockMgr.releaseLocks(request.txnContext);
@@ -511,6 +514,18 @@
     public void log(String s) {
         System.out.println(s);
     }
+    
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("{ t : \"").append(threadName).append("\", r : ");
+        if (lockRequest == null) {
+            sb.append("null");
+        } else {
+            sb.append("\"").append(lockRequest.toString()).append("\""); 
+        }
+        sb.append(" }");
+        return sb.toString();
+    }
 }
 
 class WorkerReadyQueue {
@@ -618,7 +633,7 @@
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
-            log(Thread.currentThread().getName() + "Waiting for worker to finish its task...");
+            log(Thread.currentThread().getName() + " Waiting for worker to finish its task...");
             queueSize = workerReadyQueue.size();
         }
 
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
index e6f2798..214aa35 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
@@ -14,9 +14,14 @@
  */
 package edu.uci.ics.asterix.transaction.management.service.locking;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Random;
 
+import org.apache.commons.io.FileUtils;
+
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
 import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
 import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
@@ -26,6 +31,7 @@
 import edu.uci.ics.asterix.common.transactions.ITransactionContext;
 import edu.uci.ics.asterix.common.transactions.ITransactionManager;
 import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -45,9 +51,16 @@
     private static int jobId = 0;
     private static Random rand;
 
-    public static void main(String args[]) throws ACIDException, AsterixException {
+    public static void main(String args[]) throws ACIDException, AsterixException, IOException {
         int i;
-        TransactionSubsystem txnProvider = new TransactionSubsystem("LockManagerRandomUnitTest", null,
+        //prepare configuration file
+        File cwd = new File(System.getProperty("user.dir"));
+        File asterixdbDir = cwd.getParentFile();
+        File srcFile = new File(asterixdbDir.getAbsoluteFile(), "asterix-app/src/main/resources/asterix-build-configuration.xml");
+        File destFile = new File(cwd, "target/classes/asterix-configuration.xml");
+        FileUtils.copyFile(srcFile, destFile);
+
+        TransactionSubsystem txnProvider = new TransactionSubsystem("nc1", null,
                 new AsterixTransactionProperties(new AsterixPropertiesAccessor()));
         rand = new Random(System.currentTimeMillis());
         for (i = 0; i < MAX_NUM_OF_ENTITY_LOCK_JOB; i++) {
@@ -64,6 +77,7 @@
             System.out.println("Creating " + i + "th EntityLockUpgradeJob..");
             generateEntityLockUpgradeThread(txnProvider);
         }
+        ((LogManager) txnProvider.getLogManager()).stop(false, null);
     }
 
     private static void generateEntityLockThread(TransactionSubsystem txnProvider) {
@@ -496,7 +510,7 @@
                         request.txnContext);
                 break;
             case RequestType.UNLOCK:
-                lockMgr.unlock(request.datasetIdObj, request.entityHashValue, request.txnContext);
+                lockMgr.unlock(request.datasetIdObj, request.entityHashValue, request.lockMode, request.txnContext);
                 break;
             case RequestType.RELEASE_LOCKS:
                 lockMgr.releaseLocks(request.txnContext);
@@ -555,6 +569,11 @@
         this.entityHashValue = waitTime;
     }
 
+    @Override
+    public String toString() {
+        return prettyPrint();
+    }
+    
     public String prettyPrint() {
         StringBuilder s = new StringBuilder();
         //s.append(threadName.charAt(7)).append("\t").append("\t");
@@ -595,23 +614,7 @@
         }
         s.append("\tJ").append(txnContext.getJobId().getId()).append("\tD").append(datasetIdObj.getId()).append("\tE")
                 .append(entityHashValue).append("\t");
-        switch (lockMode) {
-            case LockMode.S:
-                s.append("S");
-                break;
-            case LockMode.X:
-                s.append("X");
-                break;
-            case LockMode.IS:
-                s.append("IS");
-                break;
-            case LockMode.IX:
-                s.append("IX");
-                break;
-            default:
-                throw new UnsupportedOperationException("Unsupported lock mode");
-        }
-        s.append("\n");
+        s.append(LockMode.toString(lockMode)).append("\n");
         return s.toString();
     }
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Request.json b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Request.json
new file mode 100644
index 0000000..0c4fa71
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Request.json
@@ -0,0 +1,29 @@
+{
+    "name" : "Request",
+    "fields" : [
+        {
+            "name" : "resource id",
+            "type" : "GLOBAL"
+        },
+        {
+            "name" : "job slot",
+            "type" : "GLOBAL"
+        },
+        {
+            "name" : "prev job request",
+            "type" : "GLOBAL"
+        },
+        {
+            "name" : "next job request",
+            "type" : "GLOBAL"
+        },
+        {
+            "name" : "next request",
+            "type" : "GLOBAL"
+        },
+        {
+            "name" : "lock mode",
+            "type" : "INT"
+        }
+    ]
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Resource.json b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Resource.json
new file mode 100644
index 0000000..d0d553a
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Resource.json
@@ -0,0 +1,37 @@
+{
+    "name" : "Resource",
+    "fields" : [
+        {
+            "name" : "last holder",
+            "type" : "GLOBAL",
+            "initial" : "-1"
+        },
+        {
+            "name" : "first waiter",
+            "type" : "GLOBAL",
+            "initial" : "-1"
+        },
+        {
+            "name" : "first upgrader",
+            "type" : "GLOBAL",
+            "initial" : "-1"
+        },
+        {
+            "name" : "next",
+            "type" : "GLOBAL"
+        },
+        {
+            "name" : "max mode",
+            "type" : "INT",
+            "initial" : "edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode.NL"
+        },
+        {
+            "name" : "dataset id",
+            "type" : "INT"
+        },
+        {
+            "name" : "pk hash val",
+            "type" : "INT"
+        }
+    ]
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 933afcd..4f13b9f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -41,7 +41,6 @@
 import edu.uci.ics.asterix.common.transactions.ITransactionManager;
 import edu.uci.ics.asterix.common.transactions.LogManagerProperties;
 import edu.uci.ics.asterix.common.transactions.MutableLong;
-import edu.uci.ics.asterix.transaction.management.service.locking.LockManager;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
 
@@ -82,7 +81,7 @@
         emptyQ = new LinkedBlockingQueue<LogPage>(numLogPages);
         flushQ = new LinkedBlockingQueue<LogPage>(numLogPages);
         for (int i = 0; i < numLogPages; i++) {
-            emptyQ.offer(new LogPage((LockManager) txnSubsystem.getLockManager(), logPageSize, flushLSN));
+            emptyQ.offer(new LogPage(txnSubsystem, logPageSize, flushLSN));
         }
         appendLSN = initializeLogAnchor(nextLogFileId);
         flushLSN.set(appendLSN);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
index a3f42a7..1ed6fba 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
@@ -22,16 +22,21 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.transactions.DatasetId;
 import edu.uci.ics.asterix.common.transactions.ILogPage;
 import edu.uci.ics.asterix.common.transactions.ILogRecord;
+import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.transactions.JobId;
 import edu.uci.ics.asterix.common.transactions.MutableLong;
 import edu.uci.ics.asterix.transaction.management.service.locking.LockManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
 
 public class LogPage implements ILogPage {
 
     public static final boolean IS_DEBUG_MODE = false;//true
     private static final Logger LOGGER = Logger.getLogger(LogPage.class.getName());
-    private final LockManager lockMgr;
+    private final TransactionSubsystem txnSubsystem;
     private final LogPageReader logPageReader;
     private final int logPageSize;
     private final MutableLong flushLSN;
@@ -46,8 +51,8 @@
     private FileChannel fileChannel;
     private boolean stop;
 
-    public LogPage(LockManager lockMgr, int logPageSize, MutableLong flushLSN) {
-        this.lockMgr = lockMgr;
+    public LogPage(TransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLSN) {
+        this.txnSubsystem = txnSubsystem;
         this.logPageSize = logPageSize;
         this.flushLSN = flushLSN;
         appendBuffer = ByteBuffer.allocate(logPageSize);
@@ -187,7 +192,27 @@
     private void batchUnlock(int beginOffset, int endOffset) throws ACIDException {
         if (endOffset > beginOffset) {
             logPageReader.initializeScan(beginOffset, endOffset);
-            lockMgr.batchUnlock(this, logPageReader);
+
+            DatasetId dsId = new DatasetId(-1);
+            JobId jId = new JobId(-1);
+            ITransactionContext txnCtx = null;
+
+            LogRecord logRecord = logPageReader.next();
+            while (logRecord != null) {
+                if (logRecord.getLogType() == LogType.ENTITY_COMMIT) {
+                    dsId.setId(logRecord.getDatasetId());
+                    jId.setId(logRecord.getJobId());
+                    txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jId, false);
+                    txnSubsystem.getLockManager().unlock(dsId, logRecord.getPKHashValue(), LockMode.ANY, txnCtx);
+                    txnCtx.notifyOptracker(false);
+                } else if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
+                    jId.setId(logRecord.getJobId());
+                    txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jId, false);
+                    txnCtx.notifyOptracker(true);
+                    notifyJobTerminator();
+                }
+                logRecord = logPageReader.next();
+            }
         }
     }
 
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
index 78bca42..91f2535 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
@@ -30,17 +30,34 @@
     }
 
     public static class LockManagerConstants {
-        public static final String LOCK_CONF_DIR = "lock_conf";
-        public static final String LOCK_CONF_FILE = "lock.conf";
-        public static final int[] LOCK_CONFLICT_MATRIX = new int[] { 2, 3 };
-        public static final int[] LOCK_CONVERT_MATRIX = new int[] { 2, 0 };
-
         public static class LockMode {
-            public static final byte S = 0;
-            public static final byte X = 1;
-            public static final byte IS = 2;
-            public static final byte IX = 3;
+            public static final byte ANY = -1;
+            public static final byte NL  =  0;
+            public static final byte IS  =  1;
+            public static final byte IX  =  2;
+            public static final byte S   =  3;
+            public static final byte X   =  4;
+            
+            public static byte intentionMode(byte mode) {
+                switch (mode) {
+                    case S:  return IS;
+                    case X:  return IX;
+                    default: throw new IllegalArgumentException(
+                            "no intention lock mode for " + toString(mode));
+                }                
+            }
+            
+            public static String toString(byte mode) {
+                switch (mode) {
+                    case ANY: return "ANY";
+                    case NL:  return "NL";
+                    case IS:  return "IS";
+                    case IX:  return "IX";
+                    case S:   return "S";
+                    case X:   return "X";
+                    default:  throw new IllegalArgumentException("no such lock mode");
+                }
+            }
         }
     }
-
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index aceeb82..4cb5fac 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -22,7 +22,7 @@
 import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
 import edu.uci.ics.asterix.common.transactions.ITransactionManager;
 import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
-import edu.uci.ics.asterix.transaction.management.service.locking.LockManager;
+import edu.uci.ics.asterix.transaction.management.service.locking.ConcurrentLockManager;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
 import edu.uci.ics.asterix.transaction.management.service.recovery.CheckpointThread;
 import edu.uci.ics.asterix.transaction.management.service.recovery.RecoveryManager;
@@ -46,7 +46,7 @@
         this.id = id;
         this.txnProperties = txnProperties;
         this.transactionManager = new TransactionManager(this);
-        this.lockManager = new LockManager(this);
+        this.lockManager = new ConcurrentLockManager(this);
         this.logManager = new LogManager(this);
         this.recoveryManager = new RecoveryManager(this);
         this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;