Merge branch 'master' into westmann/locks
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 236df53..ccbf68f 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -17,6 +17,7 @@
 import java.io.File;
 import java.util.EnumSet;
 
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
 import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.hyracks.bootstrap.CCApplicationEntryPoint;
 import edu.uci.ics.asterix.hyracks.bootstrap.NCApplicationEntryPoint;
@@ -99,6 +100,7 @@
         nc2.stop();
         nc1.stop();
         cc.stop();
+        AsterixThreadExecutor.INSTANCE.shutdown();
     }
 
     public static void runJob(JobSpecification spec) throws Exception {
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
index edd4b2a..fb87f5e 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
@@ -36,4 +36,8 @@
     public Future<Object> submit(Callable command) {
         return (Future<Object>) executorService.submit(command);
     }
+    
+    public void shutdown() {
+        executorService.shutdown();
+    }
 }
diff --git a/asterix-maven-plugins/pom.xml b/asterix-maven-plugins/pom.xml
index 56bc697..b26a6d6 100644
--- a/asterix-maven-plugins/pom.xml
+++ b/asterix-maven-plugins/pom.xml
@@ -35,5 +35,6 @@
 
   <modules>
     <module>lexer-generator-maven-plugin</module>
+    <module>record-manager-generator-maven-plugin</module>
   </modules>
 </project>
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/pom.xml b/asterix-maven-plugins/record-manager-generator-maven-plugin/pom.xml
new file mode 100644
index 0000000..9ffc8d2
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/pom.xml
@@ -0,0 +1,56 @@
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ ! 
+ !     http://www.apache.org/licenses/LICENSE-2.0
+ ! 
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>edu.uci.ics.asterix</groupId>
+  <artifactId>record-manager-generator-maven-plugin</artifactId>
+    <parent>
+        <artifactId>asterix-maven-plugins</artifactId>
+        <groupId>edu.uci.ics.asterix</groupId>
+        <version>0.8.1-SNAPSHOT</version>
+    </parent>
+
+  <packaging>maven-plugin</packaging>
+  <name>record-manager-generator-maven-plugin</name>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+          <fork>true</fork>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.8.1</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.maven</groupId>
+      <artifactId>maven-plugin-api</artifactId>
+      <version>2.0.2</version>
+    </dependency>
+  </dependencies>    
+</project>
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java
new file mode 100644
index 0000000..31c4855
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.recordmanagergenerator;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import edu.uci.ics.asterix.recordmanagergenerator.RecordType.Field;
+
+public class Generator {
+    
+    public enum Manager {
+        RECORD,
+        ARENA
+    }
+    
+    public static void generateSource(
+            Manager mgr, 
+            RecordType rec, 
+            InputStream is, 
+            StringBuilder sb, 
+            boolean debug) {
+        switch (mgr) {
+            case RECORD:
+                generateMemoryManagerSource(rec, is, sb, debug);
+                break;
+            case ARENA:
+                generateArenaManagerSource(rec, is, sb, debug);
+                break;
+            default:
+                throw new IllegalArgumentException();
+        }        
+    }
+
+    private static void generateMemoryManagerSource(
+            RecordType resource, 
+            InputStream is, 
+            StringBuilder sb, 
+            boolean debug) {
+        BufferedReader in = new BufferedReader(new InputStreamReader(is));
+        String line = null;
+
+        try {
+
+            String indent = "    ";
+
+            while((line = in.readLine()) != null) {
+                if (line.contains("@E@")) {
+                    line = line.replace("@E@", resource.name);
+                }
+                if (line.contains("@DEBUG@")) {
+                    line = line.replace("@DEBUG@", Boolean.toString(debug));
+                }
+                if (line.contains("@CONSTS@")) {
+                    resource.appendConstants(sb, indent, 1);
+                    sb.append('\n');
+                } else if (line.contains("@METHODS@")) {
+                    for (int i = 0; i < resource.size(); ++i) {
+                        final Field field = resource.fields.get(i);
+                        if (field.accessible) {
+                            field.appendMemoryManagerGetMethod(sb, indent, 1);
+                            sb.append('\n');
+                            field.appendMemoryManagerSetMethod(sb, indent, 1);
+                            sb.append('\n');
+                        }
+                    }
+                } else if (line.contains("@INIT_SLOT@")) {
+                    for (int i = 0; i < resource.size(); ++i) {                        
+                        final Field field = resource.fields.get(i);
+                        field.appendInitializers(sb, indent, 3);
+                    }
+                } else if (line.contains("@CHECK_SLOT@")) {
+                    for (int i = 0; i < resource.size(); ++i) {                        
+                        final Field field = resource.fields.get(i);
+                        field.appendChecks(sb, indent, 3);
+                    }
+                } else if (line.contains("@PRINT_BUFFER@")) {
+                    resource.appendBufferPrinter(sb, indent, 3);
+                    sb.append('\n');
+                } else {
+                  sb.append(line).append('\n');
+                }
+            }
+
+        } catch (IOException ioe) {
+            ioe.printStackTrace();
+        }
+    }
+
+    private static void generateArenaManagerSource(
+            RecordType resource, 
+            InputStream is, 
+            StringBuilder sb, 
+            boolean debug) {
+        BufferedReader in = new BufferedReader(new InputStreamReader(is));
+        String line = null;
+
+        try {
+
+            String indent = "    ";
+
+            while((line = in.readLine()) != null) {
+                if (line.contains("@E@")) {
+                    line = line.replace("@E@", resource.name);
+                }
+                if (line.contains("@DEBUG@")) {
+                    line = line.replace("@DEBUG@", Boolean.toString(debug));
+                }
+                if (line.contains("@METHODS@")) {
+                    for (int i = 0; i < resource.size(); ++i) {
+                        final Field field = resource.fields.get(i);
+                        if (field.accessible) {
+                            field.appendArenaManagerGetMethod(sb, indent, 1);
+                            sb.append('\n');
+                            field.appendArenaManagerSetMethod(sb, indent, 1);
+                            sb.append('\n');
+                        }
+                    }
+                } else {
+                  sb.append(line).append('\n');
+                }
+            }
+
+        } catch (IOException ioe) {
+            ioe.printStackTrace();
+        }
+    }
+}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java
new file mode 100644
index 0000000..8b6b581
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.recordmanagergenerator;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.maven.plugin.AbstractMojo;
+import org.apache.maven.plugin.MojoExecutionException;
+import org.apache.maven.plugin.MojoFailureException;
+
+/**
+ * @goal generate-record-manager
+ * @phase generate-sources
+ * @requiresDependencyResolution compile
+ */
+public class RecordManagerGeneratorMojo extends AbstractMojo {
+    
+    /**
+     * parameter injected from pom.xml
+     * 
+     * @parameter
+     */
+    private boolean debug;
+    /**
+     * parameter injected from pom.xml
+     * 
+     * @parameter
+     */
+    private String arenaManagerTemplate;
+    /**
+     * parameter injected from pom.xml
+     * 
+     * @parameter
+     */
+    private String recordManagerTemplate;
+    /**
+     * parameter injected from pom.xml
+     * 
+     * @parameter
+     * @required
+     */
+    private String[] recordTypes;
+    /**
+     * parameter injected from pom.xml
+     * 
+     * @parameter
+     * @required
+     */
+    private File outputDir;
+
+    private Map<String, RecordType> typeMap;
+
+    public RecordManagerGeneratorMojo() {        
+    }
+
+    private void defineRecordTypes() {
+        if (debug) {
+            getLog().info("generating debug code");
+        }
+
+        typeMap = new HashMap<String, RecordType>();
+
+        RecordType resource = new RecordType("Resource");
+        resource.addField("last holder", RecordType.Type.GLOBAL, "-1");
+        resource.addField("first waiter", RecordType.Type.GLOBAL, "-1");
+        resource.addField("first upgrader", RecordType.Type.GLOBAL, "-1");
+        resource.addField("next", RecordType.Type.GLOBAL, null);
+        resource.addField("max mode", RecordType.Type.INT, "LockMode.NL");
+        resource.addField("dataset id", RecordType.Type.INT, null);
+        resource.addField("pk hash val", RecordType.Type.INT, null);
+        resource.addField("alloc id", RecordType.Type.SHORT, null);
+
+        resource.addToMap(typeMap);
+
+        RecordType request = new RecordType("Request");
+        request.addField("resource id", RecordType.Type.GLOBAL, null);
+        request.addField("job slot", RecordType.Type.GLOBAL, null);
+        request.addField("prev job request", RecordType.Type.GLOBAL, null);
+        request.addField("next job request", RecordType.Type.GLOBAL, null);
+        request.addField("next request", RecordType.Type.GLOBAL, null);
+        request.addField("lock mode", RecordType.Type.INT, null);
+        request.addField("alloc id", RecordType.Type.SHORT, null);
+
+        request.addToMap(typeMap);
+
+        RecordType job = new RecordType("Job");
+        job.addField("last holder", RecordType.Type.GLOBAL, "-1");
+        job.addField("last waiter", RecordType.Type.GLOBAL, "-1");
+        job.addField("last upgrader", RecordType.Type.GLOBAL, "-1");
+        job.addField("job id", RecordType.Type.INT, null);
+        job.addField("alloc id", RecordType.Type.SHORT, null);
+
+        job.addToMap(typeMap);
+    }
+
+    public void execute() throws MojoExecutionException, MojoFailureException {
+        if (!outputDir.exists()) {
+            outputDir.mkdirs();
+        }
+
+        defineRecordTypes();
+
+        for (int i = 0; i < recordTypes.length; ++i) {
+            final String recordType = recordTypes[i];
+
+            if (recordManagerTemplate != null) {
+                generateSource(Generator.Manager.RECORD, recordManagerTemplate, recordType);
+            }
+
+            if (arenaManagerTemplate != null) {
+                generateSource(Generator.Manager.ARENA, arenaManagerTemplate, recordType);
+            }
+        }
+    }
+
+    private void generateSource(Generator.Manager mgrType, String template, String recordType) throws MojoFailureException {
+        InputStream is = getClass().getClassLoader().getResourceAsStream(template);
+        if (is == null) {
+            throw new MojoFailureException("template '" + template + "' not found in classpath");
+        }
+
+        StringBuilder sb = new StringBuilder();
+        File outputFile = new File(outputDir, recordType + template);
+
+        try {
+            getLog().info("generating " + outputFile.toString());
+
+            Generator.generateSource(mgrType, typeMap.get(recordType), is, sb, debug);
+            is.close();
+
+            FileWriter outWriter = new FileWriter(outputFile);
+            outWriter.write(sb.toString());
+            outWriter.close();
+        } catch (Exception ex) {
+            getLog().error(ex);
+            throw new MojoFailureException("failed to generate " + outputFile.toString());
+        }
+    }
+}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java
new file mode 100644
index 0000000..6d31f63
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java
@@ -0,0 +1,389 @@
+/*
+ * Copyright 2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.recordmanagergenerator;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Map;
+
+public class RecordType {
+    
+    enum Type {
+        BYTE,
+        SHORT,
+        INT,
+        GLOBAL
+    }
+    
+    static class Field {
+                
+        String name;
+        Type type;
+        String initial;
+        int offset;
+        boolean accessible = true;
+
+        Field(String name, Type type, String initial, int offset, boolean accessible) {
+            this.name = name;
+            this.type = type;
+            this.initial = initial;
+            this.offset = offset;
+            this.accessible = accessible;
+        }
+        
+        String methodName(String prefix) {
+            String words[] = name.split(" ");
+            assert(words.length > 0);
+            StringBuilder sb = new StringBuilder(prefix);
+            for (int j = 0; j < words.length; ++j) {
+                String word = words[j];
+                sb.append(word.substring(0, 1).toUpperCase());
+                sb.append(word.substring(1));
+            }
+            return sb.toString();        
+        }
+
+        StringBuilder appendMemoryManagerGetMethod(StringBuilder sb, String indent, int level) {
+            sb = indent(sb, indent, level);
+            sb.append("public ")
+              .append(javaType(type))
+              .append(' ')
+              .append(methodName("get"))
+              .append("(int slotNum) {\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("final Buffer buf = buffers.get(slotNum / NO_SLOTS);\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("buf.checkSlot(slotNum % NO_SLOTS);\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("final ByteBuffer b = buf.bb;\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("return b.")
+              .append(bbGetter(type))
+              .append("((slotNum % NO_SLOTS) * ITEM_SIZE + ")
+              .append(offsetName())
+              .append(");\n");
+            sb = indent(sb, indent, level);
+            sb.append("}\n");
+            return sb;
+        }
+            
+        StringBuilder appendMemoryManagerSetMethod(StringBuilder sb, String indent, int level) {
+            sb = indent(sb, indent, level);
+            sb.append("public void ")
+              .append(methodName("set"))
+              .append("(int slotNum, ")
+              .append(javaType(type))
+              .append(" value) {\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("b.")
+              .append(bbSetter(type))
+              .append("((slotNum % NO_SLOTS) * ITEM_SIZE + ")
+              .append(offsetName())
+              .append(", value);\n");
+            sb = indent(sb, indent, level);
+            sb.append("}\n");
+            return sb;
+        }
+
+        StringBuilder appendArenaManagerGetMethod(StringBuilder sb, String indent, int level) {
+            sb = indent(sb, indent, level);
+            sb.append("public ")
+              .append(javaType(type))
+              .append(' ')
+              .append(methodName("get"))
+              .append("(long slotNum) {\n");
+            if (initial != null) {
+              sb = indent(sb, indent, level + 1);
+              sb.append("if (TRACK_ALLOC) checkSlot(slotNum);\n");
+            }
+            sb = indent(sb, indent, level + 1);
+            sb.append("final int arenaId = RecordManagerTypes.Global.arenaId(slotNum);\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("final int localId = RecordManagerTypes.Global.localId(slotNum);\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("return get(arenaId).")
+              .append(methodName("get"))
+              .append("(localId);\n");
+            sb = indent(sb, indent, level);
+            sb.append("}\n");
+            return sb;
+        }
+            
+        StringBuilder appendArenaManagerSetMethod(StringBuilder sb, String indent, int level) {
+            sb = indent(sb, indent, level);
+            sb.append("public void ")
+              .append(methodName("set"))
+              .append("(long slotNum, ")
+              .append(javaType(type))
+              .append(" value) {\n");
+            if (initial != null) {
+              sb = indent(sb, indent, level + 1);
+              sb.append("if (TRACK_ALLOC) checkSlot(slotNum);\n");
+            }
+            sb = indent(sb, indent, level + 1);
+            sb.append("final int arenaId = RecordManagerTypes.Global.arenaId(slotNum);\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("final int localId = RecordManagerTypes.Global.localId(slotNum);\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("get(arenaId).")
+              .append(methodName("set"))
+              .append("(localId, value);\n");
+            sb = indent(sb, indent, level);
+            sb.append("}\n");
+            return sb;
+        }
+        
+        StringBuilder appendInitializers(StringBuilder sb, String indent, int level) {
+            sb = indent(sb, indent, level);
+            sb.append("bb.")
+              .append(bbSetter(type))
+              .append("(slotNum * ITEM_SIZE + ")
+              .append(offsetName())
+              .append(", ");
+            if (initial != null) {
+                sb.append(initial);
+            } else {
+                sb.append(deadMemInitializer(type));
+            }
+            sb.append(");\n");
+            return sb;
+        }
+        
+        StringBuilder appendChecks(StringBuilder sb, String indent, int level) {
+            if (initial == null) {
+                return sb;
+            }
+            sb = indent(sb, indent, level);
+            sb.append("if (bb.")
+              .append(bbGetter(type))
+              .append("(itemOffset + ")
+              .append(offsetName())
+              .append(") == ")
+              .append(deadMemInitializer(type))
+              .append(") {\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("String msg = \"invalid value in field ")
+              .append(offsetName())
+              .append(" of slot \" + slotNum;\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("throw new IllegalStateException(msg);\n");
+            sb = indent(sb, indent, level);
+            sb.append("}\n");
+            return sb;
+        }
+        
+        String offsetName() {
+            String words[] = name.split(" ");
+            assert(words.length > 0);
+            StringBuilder sb = new StringBuilder(words[0].toUpperCase());
+            for (int j = 1; j < words.length; ++j) {
+                sb.append("_").append(words[j].toUpperCase());
+            }
+            sb.append("_OFF");
+            return sb.toString();
+        }
+        
+        int offset() {
+            return offset;
+        }
+    }
+
+    String name;
+    ArrayList<Field> fields;
+    int totalSize;
+    boolean modifiable = true;
+    
+    static StringBuilder indent(StringBuilder sb, String indent, int level) {
+        for (int i = 0; i < level; ++i) {
+            sb.append(indent);
+        }
+        return sb;
+    }
+    
+    public RecordType(String name) {
+        this.name = name;
+        fields = new ArrayList<Field>();
+        addField("next free slot", Type.INT, "-1", false);
+    }
+    
+    public void addToMap(Map<String, RecordType> map) {
+        modifiable = false;
+        calcOffsetsAndSize();
+        map.put(name, this);
+    }
+
+    public void addField(String name, Type type, String initial) {
+        addField(name, type, initial, true);
+    }    
+    
+    private void addField(String name, Type type, String initial, boolean accessible) {
+        if (! modifiable) {
+            throw new IllegalStateException("cannot modify type anmore");
+        }
+        fields.add(new Field(name, type, initial, -1, accessible));
+    }
+     
+    private void calcOffsetsAndSize() {
+        Collections.sort(fields, new Comparator<Field>() {
+            public int compare(Field left, Field right) {
+                return size(right.type) - size(left.type);
+            }
+        });
+        // sort fields by size and align the items
+        totalSize = 0;
+        int alignment = 0;
+        for (int i = 0; i < fields.size(); ++i) {            
+            final Field field = fields.get(i);
+            assert field.offset == -1;
+            field.offset = totalSize;
+            final int size = size(field.type);
+            totalSize += size;
+            if (size > alignment) alignment = size;
+        }
+        if (totalSize % alignment != 0) {
+            totalSize = ((totalSize / alignment) + 1) * alignment; 
+        }
+    }
+    
+    int size() {
+        return fields.size();
+    }
+    
+    static int size(Type t) {
+        switch(t) {
+            case BYTE:   return 1;
+            case SHORT:  return 2;
+            case INT:    return 4;
+            case GLOBAL: return 8;
+            default:     throw new IllegalArgumentException();
+        }
+    }
+    
+    static String javaType(Type t) {
+        switch(t) {
+            case BYTE:   return "byte";
+            case SHORT:  return "short";
+            case INT:    return "int";
+            case GLOBAL: return "long";
+            default:     throw new IllegalArgumentException();
+        }
+    }
+    
+    static String bbGetter(Type t) {
+        switch(t) {
+            case BYTE:   return "get";
+            case SHORT:  return "getShort";
+            case INT:    return "getInt";
+            case GLOBAL: return "getLong";
+            default:     throw new IllegalArgumentException();
+        }
+    }
+    
+    static String bbSetter(Type t) {
+        switch(t) {
+            case BYTE:   return "put";
+            case SHORT:  return "putShort";
+            case INT:    return "putInt";
+            case GLOBAL: return "putLong";
+            default:     throw new IllegalArgumentException();
+        }
+    }
+    
+    static String deadMemInitializer(Type t) {
+        switch(t) {
+            case BYTE:   return "(byte)0xde";
+            case SHORT:  return "(short)0xdead";
+            case INT:    return "0xdeadbeef";
+            case GLOBAL: return "0xdeadbeefdeadbeefl";
+            default:     throw new IllegalArgumentException();
+        }        
+    }
+    
+    static String appender(Type t) {
+        switch(t) {
+            case BYTE:   return "RecordManagerTypes.Byte.append";
+            case SHORT:  return "RecordManagerTypes.Short.append";
+            case INT:    return "RecordManagerTypes.Int.append";
+            case GLOBAL: return "RecordManagerTypes.Global.append";
+            default:     throw new IllegalArgumentException();
+        }        
+    }
+    
+    static String padRight(String s, int n) {
+        return String.format("%1$-" + n + "s", s);  
+    }
+
+    static String padLeft(String s, int n) {
+        return String.format("%1$" + n + "s", s);  
+    }
+    
+    StringBuilder appendConstants(StringBuilder sb, String indent, int level) {
+        sb = indent(sb, indent, level);
+        sb.append("public static int ITEM_SIZE = ")
+          .append(totalSize)
+          .append(";\n");
+        for (int i = 0; i < fields.size(); ++i) {
+            final Field field = fields.get(i);
+            sb = indent(sb, indent, level);
+            sb.append("public static int ")
+              .append(field.offsetName())
+              .append(" = ")
+              .append(field.offset).append("; // size: ")
+              .append(size(field.type)).append("\n");
+        }
+        return sb;
+    }
+    
+    StringBuilder appendBufferPrinter(StringBuilder sb, String indent, int level) {
+        int maxNameWidth = 0;
+        for (int i = 0; i < fields.size(); ++i) {
+            int width = fields.get(i).name.length();
+            if (width > maxNameWidth) {
+                maxNameWidth = width;
+            }
+        }
+        for (int i = 0; i < fields.size(); ++i) {
+            final Field field = fields.get(i);
+            sb = indent(sb, indent, level);
+            sb.append("sb.append(\"")
+              .append(padRight(field.name, maxNameWidth))
+              .append(" | \");\n");
+            sb = indent(sb, indent, level);
+            sb.append("for (int i = 0; i < NO_SLOTS; ++i) {\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append(javaType(field.type))
+              .append(" value = bb.")
+              .append(bbGetter(field.type))
+              .append("(i * ITEM_SIZE + ")
+              .append(field.offsetName())
+              .append(");\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("sb = ")
+              .append(appender(field.type))
+              .append("(sb, value);\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("sb.append(\" | \");\n");
+            sb = indent(sb, indent, level);
+            sb.append("}\n");
+            sb = indent(sb, indent, level);
+            sb.append("sb.append(\"\\n\");\n");
+        }
+        return sb;
+    }
+}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java
new file mode 100644
index 0000000..58f8972
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.asterix.transaction.management.service.locking.AllocInfo;
+import edu.uci.ics.asterix.transaction.management.service.locking.RecordManagerTypes;
+
+public class @E@ArenaManager {
+    
+    public static final boolean TRACK_ALLOC = @DEBUG@;
+
+    private final int noArenas;
+    private final long txnShrinkTimer;
+    private ArrayList<@E@RecordManager> arenas;
+    private volatile int nextArena; 
+    private ThreadLocal<LocalManager> local;    
+    
+    public @E@ArenaManager(long txnShrinkTimer) {
+        this.txnShrinkTimer = txnShrinkTimer;
+        noArenas = Runtime.getRuntime().availableProcessors() * 2;
+        arenas = new ArrayList<@E@RecordManager>(noArenas);
+        nextArena = 0;
+        local = new ThreadLocal<LocalManager>() {
+            @Override
+            protected LocalManager initialValue() {
+                return getNext();
+            }
+        };
+    }
+
+    public long allocate() {
+        final LocalManager localManager = local.get();
+        long result = localManager.arenaId;
+        result = result << 48;
+        final int localId = localManager.mgr.allocate();
+        result |= localId;
+        if (TRACK_ALLOC) {
+            final long allocId = (++localManager.mgr.allocCounter % 0x7fff);
+            result |= (allocId << 32);
+            setAllocId(result, (short) allocId);
+            assert RecordManagerTypes.Global.allocId(result) == allocId;
+        }
+        assert RecordManagerTypes.Global.arenaId(result) == localManager.arenaId;
+        assert RecordManagerTypes.Global.localId(result) == localId;
+        return result;
+    }
+    
+    public void deallocate(long slotNum) {
+        if (TRACK_ALLOC) {
+            checkSlot(slotNum);
+        }
+        final int arenaId = RecordManagerTypes.Global.arenaId(slotNum);
+        get(arenaId).deallocate(RecordManagerTypes.Global.localId(slotNum));
+    }
+    
+    public synchronized LocalManager getNext() {
+        if (nextArena >= arenas.size()) { 
+            arenas.add(new @E@RecordManager(txnShrinkTimer));
+        }
+        @E@RecordManager mgr = arenas.get(nextArena);
+        LocalManager res = new LocalManager();
+        res.mgr = mgr;
+        res.arenaId = nextArena;
+        nextArena = (nextArena + 1) % noArenas;
+        return res;
+    }
+    
+    public @E@RecordManager get(int i) {
+        return arenas.get(i);
+    }
+    
+    public @E@RecordManager local() {
+        return local.get().mgr;
+    }
+    
+    @METHODS@
+    
+    private void checkSlot(long slotNum) {
+        final int refAllocId = RecordManagerTypes.Global.allocId(slotNum);
+        final short curAllocId = getAllocId(slotNum);
+        if (refAllocId != curAllocId) {
+            String msg = "reference to slot " + slotNum
+                + " of arena " + RecordManagerTypes.Global.arenaId(slotNum)
+                + " refers to version " + Integer.toHexString(refAllocId)
+                + " current version is " + Integer.toHexString(curAllocId);
+            AllocInfo a = getAllocInfo(slotNum);
+            if (a != null) {
+                msg += "\n" + a.toString();
+            }
+            throw new IllegalStateException(msg);
+        }
+    }
+    
+    public AllocInfo getAllocInfo(long slotNum) {
+        final int arenaId = RecordManagerTypes.Global.arenaId(slotNum);
+        return get(arenaId).getAllocInfo(RecordManagerTypes.Global.localId(slotNum));
+    }
+    
+    static class LocalManager {
+        int arenaId;
+        @E@RecordManager mgr;
+    }
+    
+    public StringBuilder append(StringBuilder sb) {
+        for (int i = 0; i < arenas.size(); ++i) {
+            sb.append("++++ arena ").append(i).append(" ++++\n");
+            arenas.get(i).append(sb);
+        }
+        return sb;
+    }
+    
+    public String toString() {
+        return append(new StringBuilder()).toString();
+    }
+
+    public Stats addTo(Stats s) {
+        final int size = arenas.size();
+        s.arenas += size;
+        for (int i = 0; i < size; ++i) {
+            arenas.get(i).addTo(s);
+        }
+        return s;
+    }
+}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
new file mode 100644
index 0000000..01049d3
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
@@ -0,0 +1,304 @@
+/*
+ * Copyright 2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import edu.uci.ics.asterix.transaction.management.service.locking.AllocInfo;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+
+public class @E@RecordManager {
+
+    public static final boolean TRACK_ALLOC = @DEBUG@;
+    
+    static final int NO_SLOTS = 10;
+
+    @CONSTS@
+
+    private final long txnShrinkTimer;
+    private long shrinkTimer;
+    private ArrayList<Buffer> buffers;
+    private int current;
+    private int occupiedSlots;
+    private boolean isShrinkTimerOn;
+
+    int allocCounter;
+    
+    public @E@RecordManager(long txnShrinkTimer) {
+        this.txnShrinkTimer = txnShrinkTimer;
+        buffers = new ArrayList<Buffer>();
+        buffers.add(new Buffer());
+        current = 0;
+        
+        allocCounter = 0;
+    }
+    
+    synchronized int allocate() {
+        if (buffers.get(current).isFull()) {
+            int size = buffers.size();
+            boolean needNewBuffer = true;
+            for (int i = 0; i < size; i++) {
+                Buffer buffer = buffers.get(i);
+                if (! buffer.isInitialized()) {
+                    buffer.initialize();
+                    current = i;
+                    needNewBuffer = false;
+                    break;
+                }
+            }
+
+            if (needNewBuffer) {
+                buffers.add(new Buffer());
+                current = buffers.size() - 1;
+            }
+        }
+        ++occupiedSlots;
+        return buffers.get(current).allocate() + current * NO_SLOTS;
+    }
+
+    synchronized void deallocate(int slotNum) {
+        buffers.get(slotNum / NO_SLOTS).deallocate(slotNum % NO_SLOTS);
+        --occupiedSlots;
+
+        if (needShrink()) {
+            shrink();
+        }
+    }
+
+    /**
+     * Shrink policy:
+     * Shrink when the resource under-utilization lasts for a certain amount of time.
+     * TODO Need to figure out which of the policies is better
+     * case1.
+     * buffers status : O x x x x x O (O is initialized, x is deinitialized)
+     * In the above status, 'CURRENT' needShrink() returns 'TRUE'
+     * even if there is nothing to shrink or deallocate.
+     * It doesn't distinguish the deinitialized children from initialized children
+     * by calculating totalNumOfSlots = buffers.size() * ChildEntityLockInfoArrayManager.NUM_OF_SLOTS.
+     * In other words, it doesn't subtract the deinitialized children's slots.
+     * case2.
+     * buffers status : O O x x x x x
+     * However, in the above case, if we subtract the deinitialized children's slots,
+     * needShrink() will return false even if we shrink the buffers at this case.
+     * 
+     * @return
+     */
+    private boolean needShrink() {
+        int size = buffers.size();
+        int usedSlots = occupiedSlots;
+        if (usedSlots == 0) {
+            usedSlots = 1;
+        }
+
+        if (size > 1 && size * NO_SLOTS / usedSlots >= 3) {
+            if (isShrinkTimerOn) {
+                if (System.currentTimeMillis() - shrinkTimer >= txnShrinkTimer) {
+                    isShrinkTimerOn = false;
+                    return true;
+                }
+            } else {
+                //turn on timer
+                isShrinkTimerOn = true;
+                shrinkTimer = System.currentTimeMillis();
+            }
+        } else {
+            //turn off timer
+            isShrinkTimerOn = false;
+        }
+
+        return false;
+    }
+
+    /**
+     * Shrink() may
+     * deinitialize(:deallocates ByteBuffer of child) Children(s) or
+     * shrink buffers according to the deinitialized children's contiguity status.
+     * It doesn't deinitialze or shrink more than half of children at a time.
+     */
+    private void shrink() {
+        int i;
+        int removeCount = 0;
+        int size = buffers.size();
+        int maxDecreaseCount = size / 2;
+        Buffer buffer;
+
+        //The first buffer never be deinitialized.
+        for (i = 1; i < size; i++) {
+            if (buffers.get(i).isEmpty()) {
+                buffers.get(i).deinitialize();
+            }
+        }
+
+        //remove the empty buffers from the end
+        for (i = size - 1; i >= 1; i--) {
+            buffer = buffers.get(i);
+            if (! buffer.isInitialized()) {
+                buffers.remove(i);
+                if (++removeCount == maxDecreaseCount) {
+                    break;
+                }
+            } else {
+                break;
+            }
+        }
+        
+        //reset allocChild to the first buffer
+        current = 0;
+
+        isShrinkTimerOn = false;
+    }
+
+    @METHODS@
+    
+    public AllocInfo getAllocInfo(int slotNum) {
+        final Buffer buf = buffers.get(slotNum / NO_SLOTS);
+        if (buf.allocList == null) {
+            return null;
+        } else {
+            return buf.allocList.get(slotNum % NO_SLOTS);
+        }
+    }
+    
+    StringBuilder append(StringBuilder sb) {
+        sb.append("+++ current: ")
+          .append(current)
+          .append(" no occupied slots: ")
+          .append(occupiedSlots)
+          .append(" +++\n");
+        for (int i = 0; i < buffers.size(); ++i) {
+            buffers.get(i).append(sb);
+            sb.append("\n");
+        }
+        return sb;
+    }
+    
+    public String toString() {
+        return append(new StringBuilder()).toString();
+    }
+
+    public Stats addTo(Stats s) {
+        final int size = buffers.size();
+        s.buffers += size;
+        s.slots += size * NO_SLOTS;
+        s.size += size * NO_SLOTS * ITEM_SIZE;
+        for (int i = 0; i < size; ++i) {
+            buffers.get(i).addTo(s);
+        }
+        return s;
+    }
+    
+    static class Buffer {
+        private ByteBuffer bb;
+        private int freeSlotNum;
+        private int occupiedSlots = -1; //-1 represents 'deinitialized' state.
+        
+        ArrayList<AllocInfo> allocList;
+        
+        Buffer() {
+            initialize();
+        }
+
+        void initialize() {
+            bb = ByteBuffer.allocate(NO_SLOTS * ITEM_SIZE);
+            freeSlotNum = 0;
+            occupiedSlots = 0;
+
+            for (int i = 0; i < NO_SLOTS - 1; i++) {
+                setNextFreeSlot(i, i + 1);
+            }
+            setNextFreeSlot(NO_SLOTS - 1, -1); //-1 represents EOL(end of link)
+            
+            if (TRACK_ALLOC) {
+                allocList = new ArrayList<AllocInfo>(NO_SLOTS);
+                for (int i = 0; i < NO_SLOTS; ++i) {
+                    allocList.add(new AllocInfo());
+                }
+            }
+        }
+        
+        public void deinitialize() {
+            bb = null;
+            occupiedSlots = -1;
+        }
+        
+        public boolean isInitialized() {
+            return occupiedSlots >= 0;
+        }
+
+        public boolean isFull() {
+            return occupiedSlots == NO_SLOTS;
+        }
+
+        public boolean isEmpty() {
+            return occupiedSlots == 0;
+        }
+        
+        public int allocate() {
+            int slotNum = freeSlotNum;
+            freeSlotNum = getNextFreeSlot(slotNum);
+            @INIT_SLOT@
+            occupiedSlots++;
+            if (TRACK_ALLOC) allocList.get(slotNum).alloc();
+            return slotNum;
+        }
+    
+        public void deallocate(int slotNum) {
+            @INIT_SLOT@
+            setNextFreeSlot(slotNum, freeSlotNum);
+            freeSlotNum = slotNum;
+            occupiedSlots--;
+            if (TRACK_ALLOC) allocList.get(slotNum).free();
+        }
+
+        public int getNextFreeSlot(int slotNum) {
+            return bb.getInt(slotNum * ITEM_SIZE + NEXT_FREE_SLOT_OFF);
+        }    
+            
+        public void setNextFreeSlot(int slotNum, int nextFreeSlot) {
+            bb.putInt(slotNum * ITEM_SIZE + NEXT_FREE_SLOT_OFF, nextFreeSlot);
+        }
+
+        StringBuilder append(StringBuilder sb) {
+            sb.append("++ free slot: ")
+              .append(freeSlotNum)
+              .append(" no occupied slots: ")
+              .append(occupiedSlots)
+              .append(" ++\n");
+            @PRINT_BUFFER@
+            return sb;
+        }
+        
+        public String toString() {
+            return append(new StringBuilder()).toString();
+        }
+        
+        public void addTo(Stats s) {
+            if (isInitialized()) {
+                s.items += occupiedSlots;
+            }
+        }
+        
+        private void checkSlot(int slotNum) {
+            if (! TRACK_ALLOC) {
+                return;
+            }
+            final int itemOffset = (slotNum % NO_SLOTS) * ITEM_SIZE;
+            // @CHECK_SLOT@
+        }
+    }
+    
+}
diff --git a/asterix-transactions/pom.xml b/asterix-transactions/pom.xml
index 3123008..13a01a6 100644
--- a/asterix-transactions/pom.xml
+++ b/asterix-transactions/pom.xml
@@ -33,6 +33,49 @@
 					<fork>true</fork>
 				</configuration>
 			</plugin>
+            <plugin>
+                <groupId>edu.uci.ics.asterix</groupId>
+                <artifactId>record-manager-generator-maven-plugin</artifactId>
+                <version>0.8.1-SNAPSHOT</version>
+                <configuration>
+                    <debug>false</debug>
+                    <arenaManagerTemplate>ArenaManager.java</arenaManagerTemplate>
+                    <recordManagerTemplate>RecordManager.java</recordManagerTemplate>
+                    <recordTypes>
+                        <param>Job</param>
+                        <param>Request</param>
+                        <param>Resource</param>
+                    </recordTypes>
+                    <outputDir>${project.build.directory}/generated-sources/java/edu/uci/ics/asterix/transaction/management/service/locking</outputDir>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>generate-record-manager</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>generate-record-manager</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>${project.build.directory}/generated-sources/java/</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
 		</plugins>
 
 	</build>
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/AllocInfo.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/AllocInfo.java
new file mode 100644
index 0000000..dfba774
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/AllocInfo.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+public class AllocInfo {
+    String alloc;
+    String free;
+    
+    void alloc() {
+        alloc = getStackTrace();
+    }
+    
+    void free() {
+        free = getStackTrace();
+    }
+
+    private String getStackTrace() {
+        StringWriter sw = new StringWriter();
+        PrintWriter pw = new PrintWriter(sw);
+        new Exception().printStackTrace(pw);
+        pw.close();
+        String res = sw.toString();
+        // remove first 3 lines
+        int nlPos = 0;
+        for (int i = 0; i < 3; ++i) {
+            nlPos = res.indexOf('\n', nlPos) + 1; 
+        }
+        return res.substring(nlPos);
+    }
+    
+    public String toString() {
+        return "allocation stack:\n" + alloc + "\nfree stack\n" + free;
+    }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
new file mode 100644
index 0000000..80da1ab
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -0,0 +1,845 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.transactions.DatasetId;
+import edu.uci.ics.asterix.common.transactions.ILockManager;
+import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.transactions.ITransactionManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
+
+/**
+ * An implementation of the ILockManager interface.
+ * 
+ * @author tillw
+ */
+
+public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent {
+
+    public static final boolean IS_DEBUG_MODE = false;//true
+
+    private TransactionSubsystem txnSubsystem;
+
+    private ResourceGroupTable table;
+    private ResourceArenaManager resArenaMgr;
+    private RequestArenaManager reqArenaMgr;
+    private JobArenaManager jobArenaMgr;
+    private ConcurrentHashMap<Integer, Long> jobIdSlotMap;
+    private ThreadLocal<DatasetLockCache> dsLockCache;
+        
+    enum LockAction {
+        GET,
+        UPD, // special version of GET that updates the max lock mode
+        WAIT,
+        CONV // convert (upgrade) a lock (e.g. from S to X)
+    }
+    
+    static LockAction[][] ACTION_MATRIX = {
+        // new    NL              IS               IX                S                X
+        { LockAction.GET, LockAction.UPD,  LockAction.UPD,  LockAction.UPD,  LockAction.UPD  }, // NL
+        { LockAction.GET, LockAction.GET,  LockAction.UPD,  LockAction.UPD,  LockAction.WAIT }, // IS
+        { LockAction.GET, LockAction.GET,  LockAction.GET,  LockAction.WAIT, LockAction.WAIT }, // IX
+        { LockAction.GET, LockAction.GET,  LockAction.WAIT, LockAction.GET,  LockAction.WAIT }, // S
+        { LockAction.GET, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT }  // X
+    };
+        
+    public ConcurrentLockManager(TransactionSubsystem txnSubsystem) throws ACIDException {
+        this.txnSubsystem = txnSubsystem;
+        
+        this.table = new ResourceGroupTable();
+        
+        final int lockManagerShrinkTimer = txnSubsystem.getTransactionProperties()
+                .getLockManagerShrinkTimer();
+
+        resArenaMgr = new ResourceArenaManager(lockManagerShrinkTimer);
+        reqArenaMgr = new RequestArenaManager(lockManagerShrinkTimer);
+        jobArenaMgr = new JobArenaManager(lockManagerShrinkTimer);
+        jobIdSlotMap = new ConcurrentHashMap<>();
+        dsLockCache = new ThreadLocal<DatasetLockCache>() {
+            protected DatasetLockCache initialValue() {
+                return new DatasetLockCache();
+            }
+        };
+    }
+
+    public AsterixTransactionProperties getTransactionProperties() {
+        return this.txnSubsystem.getTransactionProperties();
+    }
+
+    @Override
+    public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+            throws ACIDException {
+        
+        log("lock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+        
+        final int dsId = datasetId.getId();        
+        final int jobId = txnContext.getJobId().getId();
+        
+        if (entityHashValue != -1) {
+            // get the intention lock on the dataset, if we want to lock an individual item
+            final byte dsLockMode = lockMode == LockMode.X ? LockMode.IX : LockMode.IS;
+            if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
+                lock(datasetId, -1, dsLockMode, txnContext);
+                dsLockCache.get().put(jobId, dsId, dsLockMode);
+            }
+        }
+
+        long jobSlot = findOrAllocJobSlot(jobId);
+        
+        ResourceGroup group = table.get(datasetId, entityHashValue);
+        group.getLatch();
+
+        try {
+            validateJob(txnContext);
+            
+            // 1) Find the resource in the hash table
+            long resSlot = findOrAllocResourceSlot(group, dsId, entityHashValue);
+            // 2) create a request entry
+            long reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
+            // 3) check lock compatibility
+            boolean locked = false;
+
+            while (! locked) {
+                int curLockMode = resArenaMgr.getMaxMode(resSlot);
+                LockAction act = ACTION_MATRIX[curLockMode][lockMode];
+                if (act == LockAction.WAIT) {
+                    act = updateActionForSameJob(resSlot, jobSlot, lockMode);
+                }
+                switch (act) {
+                    case UPD:
+                        resArenaMgr.setMaxMode(resSlot, lockMode);
+                        // no break
+                    case GET:
+                        addHolder(reqSlot, resSlot, jobSlot);
+                        locked = true;
+                        break;
+                    case WAIT:
+                        if (! introducesDeadlock(resSlot, jobSlot)) {
+                            addWaiter(reqSlot, resSlot, jobSlot);
+                        } else {
+                            requestAbort(txnContext);
+                        }
+                        group.await(txnContext);
+                        removeWaiter(reqSlot, resSlot, jobSlot);
+                        break;
+                    case CONV:
+                        // TODO can we have more than on upgrader? Or do we need to
+                        // abort if we get a second upgrader?
+                        addUpgrader(reqSlot, resSlot, jobSlot);
+                        group.await(txnContext);
+                        removeUpgrader(reqSlot, resSlot, jobSlot);
+                        break;                        
+                    default:
+                        throw new IllegalStateException();
+                }
+            }
+        } finally {
+            group.releaseLatch();
+        }
+    }
+
+    /**
+     * determine if adding a job to the waiters of a resource will introduce a 
+     * cycle in the wait-graph where the job waits on itself
+     * @param resSlot the slot that contains the information about the resource
+     * @param jobSlot the slot that contains the information about the job
+     * @return true if a cycle would be introduced, false otherwise
+     */
+    private boolean introducesDeadlock(long resSlot, long jobSlot) {
+        long reqSlot = resArenaMgr.getLastHolder(resSlot);
+        while (reqSlot >= 0) {
+            long holderJobSlot = reqArenaMgr.getJobSlot(reqSlot);
+            if (holderJobSlot == jobSlot) {
+                return true;
+            }
+            long waiter = jobArenaMgr.getLastWaiter(holderJobSlot);
+            while (waiter >= 0) {
+                long watingOnResSlot = reqArenaMgr.getResourceId(waiter);
+                if (introducesDeadlock(watingOnResSlot, jobSlot)) {
+                    return true;
+                }
+                waiter = reqArenaMgr.getNextJobRequest(waiter);
+            }
+            reqSlot = reqArenaMgr.getNextRequest(reqSlot);
+        }
+        return false;
+    }
+
+    @Override
+    public void instantLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+            throws ACIDException {
+        log("instantLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+
+        lock(datasetId, entityHashValue, lockMode, txnContext);
+        unlock(datasetId, entityHashValue, txnContext);
+    }
+
+    @Override
+    public boolean tryLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+            throws ACIDException {
+        log("tryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+        
+        final int dsId = datasetId.getId();
+        final int jobId = txnContext.getJobId().getId();
+
+        if (entityHashValue != -1) {
+            // get the intention lock on the dataset, if we want to lock an individual item
+            byte dsLockMode = lockMode == LockMode.X ? LockMode.IX : LockMode.IS;
+            if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
+                if (! tryLock(datasetId, -1, dsLockMode, txnContext)) {
+                    return false;
+                }
+                dsLockCache.get().put(jobId, dsId, dsLockMode);
+            }
+        }
+
+        long jobSlot = findOrAllocJobSlot(jobId);
+        
+        boolean locked = false;
+
+        ResourceGroup group = table.get(datasetId, entityHashValue);
+        group.getLatch();
+
+        try {
+            validateJob(txnContext);
+
+            // 1) Find the resource in the hash table
+            long resSlot = findOrAllocResourceSlot(group, dsId, entityHashValue);
+            // 2) create a request entry
+            long reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
+            // 3) check lock compatibility
+            
+            int curLockMode = resArenaMgr.getMaxMode(resSlot);
+            LockAction act = ACTION_MATRIX[curLockMode][lockMode];
+            if (act == LockAction.WAIT) {
+                act = updateActionForSameJob(resSlot, jobSlot, lockMode);
+            }
+            switch (act) {
+                case UPD:
+                    resArenaMgr.setMaxMode(resSlot, lockMode);
+                    // no break
+                case GET:
+                    addHolder(reqSlot, resSlot, jobSlot);
+                    locked = true;
+                    break;
+                case WAIT:
+                case CONV:
+                    locked = false;
+                    break;
+                default:
+                    throw new IllegalStateException();
+            }
+        } finally {
+            group.releaseLatch();
+        }
+        
+        // if we did acquire the dataset lock, but not the entity lock, we keep
+        // it anyway and clean it up at the end of the job
+        
+        return locked;
+    }
+
+    @Override
+    public boolean instantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
+            ITransactionContext txnContext) throws ACIDException {
+        log("instantTryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+        
+        if (tryLock(datasetId, entityHashValue, lockMode, txnContext)) {
+            unlock(datasetId, entityHashValue, txnContext);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext) throws ACIDException {
+        log("unlock", datasetId.getId(), entityHashValue, LockMode.NL, txnContext);
+
+        ResourceGroup group = table.get(datasetId, entityHashValue);
+        group.getLatch();
+
+        try {
+
+            int dsId = datasetId.getId();
+            long resource = findResourceInGroup(group, dsId, entityHashValue);
+
+            if (resource < 0) {
+                throw new IllegalStateException("resource (" + dsId + ",  " + entityHashValue + ") not found");
+            }
+
+            int jobId = txnContext.getJobId().getId();
+            long jobSlot = findOrAllocJobSlot(jobId);
+
+            long holder = removeLastHolder(resource, jobSlot);
+
+            // deallocate request
+            reqArenaMgr.deallocate(holder);
+            // deallocate resource or fix max lock mode
+            if (resourceNotUsed(resource)) {
+                long prev = group.firstResourceIndex.get();
+                if (prev == resource) {
+                    group.firstResourceIndex.set(resArenaMgr.getNext(resource));
+                } else {
+                    while (resArenaMgr.getNext(prev) != resource) {
+                        prev = resArenaMgr.getNext(prev);
+                    }
+                    resArenaMgr.setNext(prev, resArenaMgr.getNext(resource));
+                }
+                resArenaMgr.deallocate(resource);
+            } else {
+                final int oldMaxMode = resArenaMgr.getMaxMode(resource);
+                final int newMaxMode = determineNewMaxMode(resource, oldMaxMode);
+                resArenaMgr.setMaxMode(resource, newMaxMode);
+                if (oldMaxMode != newMaxMode) {
+                    // the locking mode didn't change, current waiters won't be
+                    // able to acquire the lock, so we do not need to signal them
+                    group.wakeUp();
+                }
+            }
+        } finally {
+            group.releaseLatch();
+        }
+
+        // dataset intention locks are cleaned up at the end of the job
+    }
+    
+    @Override
+    public void releaseLocks(ITransactionContext txnContext) throws ACIDException {
+        log("releaseLocks", -1, -1, LockMode.NL, txnContext);
+
+        int jobId = txnContext.getJobId().getId();
+        Long jobSlot = jobIdSlotMap.get(jobId);
+        if (jobSlot == null) {
+            // we don't know the job, so there are no locks for it - we're done
+            return;
+        }
+        long holder = jobArenaMgr.getLastHolder(jobSlot);
+        while (holder != -1) {
+            long resource = reqArenaMgr.getResourceId(holder);
+            int dsId = resArenaMgr.getDatasetId(resource);
+            int pkHashVal = resArenaMgr.getPkHashVal(resource);
+            unlock(new DatasetId(dsId), pkHashVal, txnContext);
+            holder = jobArenaMgr.getLastHolder(jobSlot);
+        }
+        jobArenaMgr.deallocate(jobSlot);
+        
+        //System.err.println(table.append(new StringBuilder(), true).toString());        
+        //System.out.println("jobArenaMgr " + jobArenaMgr.addTo(new Stats()).toString());
+        //System.out.println("resArenaMgr " + resArenaMgr.addTo(new Stats()).toString());
+        //System.out.println("reqArenaMgr " + reqArenaMgr.addTo(new Stats()).toString());
+    }
+        
+    private long findOrAllocJobSlot(int jobId) {
+        Long jobSlot = jobIdSlotMap.get(jobId);
+        if (jobSlot == null) {
+            jobSlot = new Long(jobArenaMgr.allocate());
+            jobArenaMgr.setJobId(jobSlot, jobId);
+            Long oldSlot = jobIdSlotMap.putIfAbsent(jobId, jobSlot);
+            if (oldSlot != null) {
+                // if another thread allocated a slot for this jobId between
+                // get(..) and putIfAbsent(..), we'll use that slot and
+                // deallocate the one we allocated
+                jobArenaMgr.deallocate(jobSlot);
+                jobSlot = oldSlot;
+            }
+        }
+        assert(jobSlot >= 0);
+        return jobSlot;
+    }
+
+    private long findOrAllocResourceSlot(ResourceGroup group, int dsId, int entityHashValue) {
+        long resSlot = findResourceInGroup(group, dsId, entityHashValue);
+        
+        if (resSlot == -1) {
+            // we don't know about this resource, let's alloc a slot
+            resSlot = resArenaMgr.allocate();
+            resArenaMgr.setDatasetId(resSlot, dsId);
+            resArenaMgr.setPkHashVal(resSlot, entityHashValue);
+            resArenaMgr.setNext(resSlot, group.firstResourceIndex.get());
+            group.firstResourceIndex.set(resSlot);
+        }
+        return resSlot;
+    }
+
+    private long allocRequestSlot(long resSlot, long jobSlot, byte lockMode) {
+        long reqSlot = reqArenaMgr.allocate();
+        reqArenaMgr.setResourceId(reqSlot, resSlot);
+        reqArenaMgr.setLockMode(reqSlot, lockMode); // lock mode is a byte!!
+        reqArenaMgr.setJobSlot(reqSlot, jobSlot);
+        return reqSlot;
+    }
+
+    /**
+     * when we've got a lock conflict for a different job, we always have to
+     * wait, if it is for the same job we either have to
+     * a) (wait and) convert the lock once conversion becomes viable or
+     * b) acquire the lock if we want to lock the same resource with the same 
+     * lock mode for the same job.
+     * @param resource the resource slot that's being locked
+     * @param job the job slot of the job locking the resource
+     * @param lockMode the lock mode that the resource should be locked with
+     * @return
+     */
+    private LockAction updateActionForSameJob(long resource, long job, byte lockMode) {
+        // TODO we can reduce the number of things we have to look at by
+        // carefully distinguishing the different lock modes
+        long holder = resArenaMgr.getLastHolder(resource);
+        LockAction res = LockAction.WAIT;
+        while (holder != -1) {
+            if (job == reqArenaMgr.getJobSlot(holder)) {
+                if (reqArenaMgr.getLockMode(holder) == lockMode) {
+                    return LockAction.GET;
+                } else {
+                    res = LockAction.CONV;
+                }
+            }
+            holder = reqArenaMgr.getNextRequest(holder);
+        }
+        return res;
+    }
+    
+    private long findResourceInGroup(ResourceGroup group, int dsId, int entityHashValue) {
+        long resSlot = group.firstResourceIndex.get();
+        while (resSlot != -1) {
+            // either we already have a lock on this resource or we have a 
+            // hash collision
+            if (resArenaMgr.getDatasetId(resSlot) == dsId && 
+                    resArenaMgr.getPkHashVal(resSlot) == entityHashValue) {
+                return resSlot;
+            } else {
+                resSlot = resArenaMgr.getNext(resSlot);
+            }
+        }
+        return -1;        
+    }
+    
+    private void addHolder(long request, long resource, long job) {
+        long lastHolder = resArenaMgr.getLastHolder(resource);
+        reqArenaMgr.setNextRequest(request, lastHolder);
+        resArenaMgr.setLastHolder(resource, request);
+        
+        synchronized (jobArenaMgr) {
+            long lastJobHolder = jobArenaMgr.getLastHolder(job);
+            insertIntoJobQueue(request, lastJobHolder);
+            jobArenaMgr.setLastHolder(job, request);
+        }
+    }
+    
+    private long removeLastHolder(long resource, long jobSlot) {
+        long holder = resArenaMgr.getLastHolder(resource);
+        if (holder < 0) {
+            throw new IllegalStateException("no holder for resource " + resource);
+        }
+        
+        // remove from the list of holders for a resource
+        if (reqArenaMgr.getJobSlot(holder) == jobSlot) {
+            // if the head of the queue matches, we need to update the resource
+            long next = reqArenaMgr.getNextRequest(holder);
+            resArenaMgr.setLastHolder(resource, next);
+        } else {
+            holder = removeRequestFromQueueForJob(holder, jobSlot);
+        }
+        
+        synchronized (jobArenaMgr) {
+            // remove from the list of requests for a job
+            long newHead = removeRequestFromJob(jobSlot, holder);
+            jobArenaMgr.setLastHolder(jobSlot, newHead);            
+        }
+        return holder;
+    }
+
+    private long removeRequestFromJob(long jobSlot, long holder) {
+        long prevForJob = reqArenaMgr.getPrevJobRequest(holder);
+        long nextForJob = reqArenaMgr.getNextJobRequest(holder);
+        if (nextForJob != -1) {
+            reqArenaMgr.setPrevJobRequest(nextForJob, prevForJob);
+        }
+        if (prevForJob == -1) {
+            return nextForJob;
+        } else {
+            reqArenaMgr.setNextJobRequest(prevForJob, nextForJob);
+            return -1;
+        }
+    }
+
+    private void addWaiter(long request, long resource, long job) {
+        long waiter = resArenaMgr.getFirstWaiter(resource);
+        reqArenaMgr.setNextRequest(request, -1);
+        if (waiter == -1) {
+            resArenaMgr.setFirstWaiter(resource, request);
+        } else {
+            appendToRequestQueue(waiter, request);
+        }
+        
+        synchronized (jobArenaMgr) {
+            waiter = jobArenaMgr.getLastWaiter(job);
+            insertIntoJobQueue(request, waiter);
+            jobArenaMgr.setLastWaiter(job, request);
+        }
+    }
+
+    private void removeWaiter(long request, long resource, long job) {
+        long waiter = resArenaMgr.getFirstWaiter(resource);
+        if (waiter == request) {
+            long next = reqArenaMgr.getNextRequest(waiter);
+            resArenaMgr.setFirstWaiter(resource, next);
+        } else {
+            waiter = removeRequestFromQueueForSlot(waiter, request);
+        }
+        synchronized (jobArenaMgr) {
+            // remove from the list of requests for a job
+            long newHead = removeRequestFromJob(job, waiter);
+            jobArenaMgr.setLastWaiter(job, newHead);            
+        }
+    }
+
+    private void addUpgrader(long request, long resource, long job) {
+        long upgrader = resArenaMgr.getFirstUpgrader(resource);
+        reqArenaMgr.setNextRequest(request, -1);
+        if (upgrader == -1) {
+            resArenaMgr.setFirstUpgrader(resource, request);
+        } else {
+            appendToRequestQueue(upgrader, request);
+        }
+        synchronized (jobArenaMgr) {
+            upgrader = jobArenaMgr.getLastUpgrader(job);
+            insertIntoJobQueue(request, upgrader);
+            jobArenaMgr.setLastUpgrader(job, request);            
+        }
+    }
+
+    private void removeUpgrader(long request, long resource, long job) {
+        long upgrader = resArenaMgr.getFirstUpgrader(resource);
+        if (upgrader == request) {
+            long next = reqArenaMgr.getNextRequest(upgrader);
+            resArenaMgr.setFirstUpgrader(resource, next);
+        } else {
+            upgrader = removeRequestFromQueueForSlot(upgrader, request);
+        }
+        synchronized (jobArenaMgr) {
+            // remove from the list of requests for a job
+            long newHead = removeRequestFromJob(job, upgrader);
+            jobArenaMgr.setLastUpgrader(job, newHead);            
+        }
+    }
+
+    private void insertIntoJobQueue(long newRequest, long oldRequest) {
+        reqArenaMgr.setNextJobRequest(newRequest, oldRequest);
+        reqArenaMgr.setPrevJobRequest(newRequest, -1);
+        if (oldRequest >= 0) {
+            reqArenaMgr.setPrevJobRequest(oldRequest, newRequest);
+        }
+    }
+
+    private void appendToRequestQueue(long head, long appendee) {
+        long next = reqArenaMgr.getNextRequest(head);
+        while(next != -1) {
+            head = next;
+            next = reqArenaMgr.getNextRequest(head);
+        }
+        reqArenaMgr.setNextRequest(head, appendee);        
+    }
+        
+    /**
+     * 
+     * @param head
+     * @param reqSlot
+     * @return
+     */
+    private long removeRequestFromQueueForSlot(long head, long reqSlot) {
+        long cur = head;
+        long prev = cur;
+        while (prev != -1) {
+            cur = reqArenaMgr.getNextRequest(prev);
+            if (cur == -1) {
+                throw new IllegalStateException("request " + reqSlot+ " not in queue");
+            }
+            if (cur == reqSlot) {
+                break;
+            }
+            prev = cur;
+        }
+        long next = reqArenaMgr.getNextRequest(cur);
+        reqArenaMgr.setNextRequest(prev, next);
+        return cur;        
+    }
+    
+    /**
+     * remove the first request for a given job from a request queue
+     * @param head the head of the request queue
+     * @param jobSlot the job slot
+     * @return the slot of the first request that matched the given job
+     */
+    private long removeRequestFromQueueForJob(long head, long jobSlot) {
+        long holder = head;
+        long prev = holder;
+        while (prev != -1) {
+            holder = reqArenaMgr.getNextRequest(prev);
+            if (holder == -1) {
+                throw new IllegalStateException("no entry for job " + jobSlot + " in queue");
+            }
+            if (jobSlot == reqArenaMgr.getJobSlot(holder)) {
+                break;
+            }
+            prev = holder;
+        }
+        long next = reqArenaMgr.getNextRequest(holder);
+        reqArenaMgr.setNextRequest(prev, next);
+        return holder;
+    }
+    
+    private int determineNewMaxMode(long resource, int oldMaxMode) {
+        int newMaxMode = LockMode.NL;
+        long holder = resArenaMgr.getLastHolder(resource);
+        while (holder != -1) {
+            int curLockMode = reqArenaMgr.getLockMode(holder);
+            if (curLockMode == oldMaxMode) {
+                // we have another lock of the same mode - we're done
+                return oldMaxMode;
+            }
+            switch (ACTION_MATRIX[newMaxMode][curLockMode]) {
+                case UPD:
+                    newMaxMode = curLockMode;
+                    break;
+                case GET:
+                    break;
+                case WAIT:
+                    throw new IllegalStateException("incompatible locks in holder queue");
+            }
+            holder = reqArenaMgr.getNextRequest(holder);
+        }
+        return newMaxMode;        
+    }
+    
+    private boolean resourceNotUsed(long resource) {
+        return resArenaMgr.getLastHolder(resource) == -1
+                && resArenaMgr.getFirstUpgrader(resource) == -1
+                && resArenaMgr.getFirstWaiter(resource) == -1;
+    }
+
+    private void log(String string, int id, int entityHashValue, byte lockMode, ITransactionContext txnContext) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("{ op : ").append(string);
+        if (id != -1) {
+            sb.append(" , dataset : ").append(id);
+        }
+        if (entityHashValue != -1) {
+            sb.append(" , entity : ").append(entityHashValue);
+        }
+        if (lockMode != LockMode.NL) {
+            sb.append(" , mode : ").append(LockMode.toString(lockMode));
+        }
+        if (txnContext != null) {
+            sb.append(" , jobId : ").append(txnContext.getJobId());            
+        }
+        sb.append(" }");
+        //System.err.println("XXX" + sb.toString());
+    }
+
+    private void validateJob(ITransactionContext txnContext) throws ACIDException {
+        if (txnContext.getTxnState() == ITransactionManager.ABORTED) {
+            throw new ACIDException("" + txnContext.getJobId() + " is in ABORTED state.");
+        } else if (txnContext.isTimeout()) {
+            requestAbort(txnContext);
+        }
+    }
+
+    private void requestAbort(ITransactionContext txnContext) throws ACIDException {
+        txnContext.setTimeout(true);
+        throw new ACIDException("Transaction " + txnContext.getJobId()
+                + " should abort (requested by the Lock Manager)");
+    }
+
+    public StringBuilder append(StringBuilder sb) {
+        sb.append(">>dump_begin\t>>----- [resTable] -----\n");
+        table.append(sb);
+        sb.append(">>dump_end\t>>----- [resTable] -----\n");
+
+        sb.append(">>dump_begin\t>>----- [resArenaMgr] -----\n");
+        resArenaMgr.append(sb);
+        sb.append(">>dump_end\t>>----- [resArenaMgr] -----\n");
+        
+        sb.append(">>dump_begin\t>>----- [reqArenaMgr] -----\n");
+        reqArenaMgr.append(sb);
+        sb.append(">>dump_end\t>>----- [reqArenaMgr] -----\n");
+        
+        sb.append(">>dump_begin\t>>----- [jobIdSlotMap] -----\n");
+        for(Integer i : jobIdSlotMap.keySet()) {
+            sb.append(i).append(" : ");
+            RecordManagerTypes.Global.append(sb, jobIdSlotMap.get(i));
+            sb.append("\n");
+        }
+        sb.append(">>dump_end\t>>----- [jobIdSlotMap] -----\n");
+
+        sb.append(">>dump_begin\t>>----- [jobArenaMgr] -----\n");
+        jobArenaMgr.append(sb);
+        sb.append(">>dump_end\t>>----- [jobArenaMgr] -----\n");
+
+        return sb;
+    }
+    
+    public String toString() {
+        return append(new StringBuilder()).toString();
+    }
+    
+    @Override
+    public String prettyPrint() throws ACIDException {
+        StringBuilder s = new StringBuilder("\n########### LockManager Status #############\n");
+        return append(s).toString() + "\n";
+    }
+
+    @Override
+    public void start() {
+        //no op
+    }
+
+    @Override
+    public void stop(boolean dumpState, OutputStream os) {
+        if (dumpState) {
+            try {
+                os.write(toString().getBytes());
+                os.flush();
+            } catch (IOException e) {
+                //ignore
+            }
+        }
+    }
+    
+    private static class DatasetLockCache {
+        private long jobId = -1;
+        private HashMap<Integer,Byte> lockCache =  new HashMap<Integer,Byte>();
+        
+        public boolean contains(final int jobId, final int dsId, byte dsLockMode) {
+            if (this.jobId == jobId) {
+                final Byte cachedLockMode = this.lockCache.get(dsId);
+                if (cachedLockMode != null && cachedLockMode == dsLockMode) {
+                    return true;
+                }            
+            } else {
+                this.jobId = -1;
+                this.lockCache.clear();
+            }
+            return false;
+        }
+        
+        public void put(final int jobId, final int dsId, byte dsLockMode) {
+            this.jobId = jobId;
+            this.lockCache.put(dsId, dsLockMode);
+        }
+        
+        public String toString() {
+            return "[ " + jobId + " : " + lockCache.toString() + "]";
+        }
+    }
+
+    private static class ResourceGroupTable {
+        public static final int TABLE_SIZE = 1024; // TODO increase?
+
+        private ResourceGroup[] table;
+        
+        public ResourceGroupTable() {
+            table = new ResourceGroup[TABLE_SIZE];
+            for (int i = 0; i < TABLE_SIZE; ++i) {
+                table[i] = new ResourceGroup();
+            }
+        }
+        
+        ResourceGroup get(DatasetId dId, int entityHashValue) {
+            // TODO ensure good properties of hash function
+            int h = Math.abs(dId.getId() ^ entityHashValue);
+            return table[h % TABLE_SIZE];
+        }
+        
+        public StringBuilder append(StringBuilder sb) {
+            return append(sb, false);
+        }
+
+        public StringBuilder append(StringBuilder sb, boolean detail) {
+            for (int i = 0; i < table.length; ++i) {
+                sb.append(i).append(" : ");
+                if (detail) {
+                    sb.append(table[i]);
+                } else {
+                    sb.append(table[i].firstResourceIndex);
+                }
+                sb.append('\n');
+            }
+            return sb;
+        }
+    }
+    
+    private static class ResourceGroup {
+        private ReentrantReadWriteLock latch;
+        private Condition condition;
+        AtomicLong firstResourceIndex;
+
+        ResourceGroup() {
+            latch = new ReentrantReadWriteLock();
+            condition = latch.writeLock().newCondition();
+            firstResourceIndex = new AtomicLong(-1);
+        }
+        
+        void getLatch() {
+            log("latch " + toString());
+            latch.writeLock().lock();
+        }
+        
+        void releaseLatch() {
+            log("release " + toString());
+            latch.writeLock().unlock();
+        }
+        
+        boolean hasWaiters() {
+            return latch.hasQueuedThreads();
+        }
+        
+        void await(ITransactionContext txnContext) throws ACIDException {
+            log("wait for " + toString());
+            try {
+                condition.await();
+            } catch (InterruptedException e) {
+                throw new ACIDException(txnContext, "interrupted", e);
+            }
+        }
+        
+        void wakeUp() {
+            log("notify " + toString());
+            condition.signalAll();
+        }
+        
+        void log(String s) {
+            //System.out.println("XXXX " + s);
+        }
+        
+        public String toString() {
+            return "{ id : " + hashCode()
+                    + ", first : " + firstResourceIndex.toString() 
+                    + ", waiters : " + (hasWaiters() ? "true" : "false") + " }";
+        }
+    }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
index e61cb55..ea2d4de 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
@@ -24,6 +24,7 @@
 
 import org.apache.commons.io.FileUtils;
 
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
 import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
 import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
@@ -32,6 +33,7 @@
 import edu.uci.ics.asterix.common.transactions.ILockManager;
 import edu.uci.ics.asterix.common.transactions.ITransactionManager;
 import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -62,7 +64,7 @@
     ArrayList<LockRequest> requestList;
     ArrayList<ArrayList<Integer>> expectedResultList;
     int resultListIndex;
-    LockManager lockMgr;
+    ILockManager lockMgr;
     String requestFileName;
     long defaultWaitTime;
 
@@ -72,7 +74,7 @@
         this.workerReadyQueue = new WorkerReadyQueue();
         this.requestList = new ArrayList<LockRequest>();
         this.expectedResultList = new ArrayList<ArrayList<Integer>>();
-        this.lockMgr = (LockManager) txnProvider.getLockManager();
+        this.lockMgr = txnProvider.getLockManager();
         this.requestFileName = new String(requestFileName);
         this.resultListIndex = 0;
         this.defaultWaitTime = 10;
@@ -151,6 +153,8 @@
         if (isSuccess) {
             log("\n*** Test Passed ***");
         }
+        ((LogManager) txnProvider.getLogManager()).stop(false, null);
+        AsterixThreadExecutor.INSTANCE.shutdown();
     }
 
     public boolean handleRequest(LockRequest request) throws ACIDException {
@@ -511,6 +515,18 @@
     public void log(String s) {
         System.out.println(s);
     }
+    
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("{ t : \"").append(threadName).append("\", r : ");
+        if (lockRequest == null) {
+            sb.append("null");
+        } else {
+            sb.append("\"").append(lockRequest.toString()).append("\""); 
+        }
+        sb.append(" }");
+        return sb.toString();
+    }
 }
 
 class WorkerReadyQueue {
@@ -618,7 +634,7 @@
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
-            log(Thread.currentThread().getName() + "Waiting for worker to finish its task...");
+            log(Thread.currentThread().getName() + " Waiting for worker to finish its task...");
             queueSize = workerReadyQueue.size();
         }
 
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
index e6f2798..11c48a7 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
@@ -14,9 +14,14 @@
  */
 package edu.uci.ics.asterix.transaction.management.service.locking;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Random;
 
+import org.apache.commons.io.FileUtils;
+
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
 import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
 import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
@@ -26,6 +31,7 @@
 import edu.uci.ics.asterix.common.transactions.ITransactionContext;
 import edu.uci.ics.asterix.common.transactions.ITransactionManager;
 import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -45,9 +51,16 @@
     private static int jobId = 0;
     private static Random rand;
 
-    public static void main(String args[]) throws ACIDException, AsterixException {
+    public static void main(String args[]) throws ACIDException, AsterixException, IOException {
         int i;
-        TransactionSubsystem txnProvider = new TransactionSubsystem("LockManagerRandomUnitTest", null,
+        //prepare configuration file
+        File cwd = new File(System.getProperty("user.dir"));
+        File asterixdbDir = cwd.getParentFile();
+        File srcFile = new File(asterixdbDir.getAbsoluteFile(), "asterix-app/src/main/resources/asterix-build-configuration.xml");
+        File destFile = new File(cwd, "target/classes/asterix-configuration.xml");
+        FileUtils.copyFile(srcFile, destFile);
+
+        TransactionSubsystem txnProvider = new TransactionSubsystem("nc1", null,
                 new AsterixTransactionProperties(new AsterixPropertiesAccessor()));
         rand = new Random(System.currentTimeMillis());
         for (i = 0; i < MAX_NUM_OF_ENTITY_LOCK_JOB; i++) {
@@ -64,6 +77,8 @@
             System.out.println("Creating " + i + "th EntityLockUpgradeJob..");
             generateEntityLockUpgradeThread(txnProvider);
         }
+        ((LogManager) txnProvider.getLogManager()).stop(false, null);
+        AsterixThreadExecutor.INSTANCE.shutdown();
     }
 
     private static void generateEntityLockThread(TransactionSubsystem txnProvider) {
@@ -555,6 +570,11 @@
         this.entityHashValue = waitTime;
     }
 
+    @Override
+    public String toString() {
+        return prettyPrint();
+    }
+    
     public String prettyPrint() {
         StringBuilder s = new StringBuilder();
         //s.append(threadName.charAt(7)).append("\t").append("\t");
@@ -595,23 +615,7 @@
         }
         s.append("\tJ").append(txnContext.getJobId().getId()).append("\tD").append(datasetIdObj.getId()).append("\tE")
                 .append(entityHashValue).append("\t");
-        switch (lockMode) {
-            case LockMode.S:
-                s.append("S");
-                break;
-            case LockMode.X:
-                s.append("X");
-                break;
-            case LockMode.IS:
-                s.append("IS");
-                break;
-            case LockMode.IX:
-                s.append("IX");
-                break;
-            default:
-                throw new UnsupportedOperationException("Unsupported lock mode");
-        }
-        s.append("\n");
+        s.append(LockMode.toString(lockMode)).append("\n");
         return s.toString();
     }
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RecordManagerTypes.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RecordManagerTypes.java
new file mode 100644
index 0000000..b467cc3
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RecordManagerTypes.java
@@ -0,0 +1,51 @@
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+public class RecordManagerTypes {
+    
+    public static class Byte {
+        public static StringBuilder append(StringBuilder sb, byte b) {
+            return sb.append(String.format("%1$18x", b));
+        }
+    }
+
+    public static class Short {
+        public static StringBuilder append(StringBuilder sb, short s) {
+            return sb.append(String.format("%1$18x", s));
+        }
+    }
+
+    public static class Int {
+        public static StringBuilder append(StringBuilder sb, int i) {
+            return sb.append(String.format("%1$18x", i));
+        }
+    }
+
+    public static class Global {
+
+        public static int arenaId(long l) {
+            return (int)((l >>> 48) & 0xffff);
+        }
+
+        public static int allocId(long l) {
+            return (int)((l >>> 32) & 0xffff);
+        }
+
+        public static int localId(long l) {
+            return (int) (l & 0xffffffffL);
+        }
+        
+        public static StringBuilder append(StringBuilder sb, long l) {
+            sb.append(String.format("%1$4x", RecordManagerTypes.Global.arenaId(l)));
+            sb.append(':');
+            sb.append(String.format("%1$4x", RecordManagerTypes.Global.allocId(l)));
+            sb.append(':');
+            sb.append(String.format("%1$8x", RecordManagerTypes.Global.localId(l)));
+            return sb;
+        }
+        
+        public static String toString(long l) {
+            return append(new StringBuilder(), l).toString();
+        }
+        
+    }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Stats.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Stats.java
new file mode 100644
index 0000000..ff2c92a
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Stats.java
@@ -0,0 +1,19 @@
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+public class Stats {
+    int arenas  = 0;
+    int buffers = 0;
+    int slots   = 0;
+    int items   = 0;
+    int size    = 0;
+    
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("{ arenas : ").append(arenas);
+        sb.append(", buffers : ").append(buffers);
+        sb.append(", slots : ").append(slots);
+        sb.append(", items : ").append(items);
+        sb.append(", size : ").append(size).append(" }");
+        return sb.toString();
+    }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 933afcd..4f13b9f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -41,7 +41,6 @@
 import edu.uci.ics.asterix.common.transactions.ITransactionManager;
 import edu.uci.ics.asterix.common.transactions.LogManagerProperties;
 import edu.uci.ics.asterix.common.transactions.MutableLong;
-import edu.uci.ics.asterix.transaction.management.service.locking.LockManager;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
 
@@ -82,7 +81,7 @@
         emptyQ = new LinkedBlockingQueue<LogPage>(numLogPages);
         flushQ = new LinkedBlockingQueue<LogPage>(numLogPages);
         for (int i = 0; i < numLogPages; i++) {
-            emptyQ.offer(new LogPage((LockManager) txnSubsystem.getLockManager(), logPageSize, flushLSN));
+            emptyQ.offer(new LogPage(txnSubsystem, logPageSize, flushLSN));
         }
         appendLSN = initializeLogAnchor(nextLogFileId);
         flushLSN.set(appendLSN);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
index a3f42a7..75a74c9 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
@@ -22,16 +22,20 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.transactions.DatasetId;
 import edu.uci.ics.asterix.common.transactions.ILogPage;
 import edu.uci.ics.asterix.common.transactions.ILogRecord;
+import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.transactions.JobId;
 import edu.uci.ics.asterix.common.transactions.MutableLong;
 import edu.uci.ics.asterix.transaction.management.service.locking.LockManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
 
 public class LogPage implements ILogPage {
 
     public static final boolean IS_DEBUG_MODE = false;//true
     private static final Logger LOGGER = Logger.getLogger(LogPage.class.getName());
-    private final LockManager lockMgr;
+    private final TransactionSubsystem txnSubsystem;
     private final LogPageReader logPageReader;
     private final int logPageSize;
     private final MutableLong flushLSN;
@@ -46,8 +50,8 @@
     private FileChannel fileChannel;
     private boolean stop;
 
-    public LogPage(LockManager lockMgr, int logPageSize, MutableLong flushLSN) {
-        this.lockMgr = lockMgr;
+    public LogPage(TransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLSN) {
+        this.txnSubsystem = txnSubsystem;
         this.logPageSize = logPageSize;
         this.flushLSN = flushLSN;
         appendBuffer = ByteBuffer.allocate(logPageSize);
@@ -187,7 +191,27 @@
     private void batchUnlock(int beginOffset, int endOffset) throws ACIDException {
         if (endOffset > beginOffset) {
             logPageReader.initializeScan(beginOffset, endOffset);
-            lockMgr.batchUnlock(this, logPageReader);
+
+            DatasetId dsId = new DatasetId(-1);
+            JobId jId = new JobId(-1);
+            ITransactionContext txnCtx = null;
+
+            LogRecord logRecord = logPageReader.next();
+            while (logRecord != null) {
+                if (logRecord.getLogType() == LogType.ENTITY_COMMIT) {
+                    dsId.setId(logRecord.getDatasetId());
+                    jId.setId(logRecord.getJobId());
+                    txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jId, false);
+                    txnSubsystem.getLockManager().unlock(dsId, logRecord.getPKHashValue(), txnCtx);
+                    txnCtx.notifyOptracker(false);
+                } else if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
+                    jId.setId(logRecord.getJobId());
+                    txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jId, false);
+                    txnCtx.notifyOptracker(true);
+                    notifyJobTerminator();
+                }
+                logRecord = logPageReader.next();
+            }
         }
     }
 
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
index 78bca42..1a73fe0 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
@@ -30,16 +30,23 @@
     }
 
     public static class LockManagerConstants {
-        public static final String LOCK_CONF_DIR = "lock_conf";
-        public static final String LOCK_CONF_FILE = "lock.conf";
-        public static final int[] LOCK_CONFLICT_MATRIX = new int[] { 2, 3 };
-        public static final int[] LOCK_CONVERT_MATRIX = new int[] { 2, 0 };
-
         public static class LockMode {
-            public static final byte S = 0;
-            public static final byte X = 1;
-            public static final byte IS = 2;
-            public static final byte IX = 3;
+            public static final byte NL = 0;
+            public static final byte IS = 1;
+            public static final byte IX = 2;
+            public static final byte S  = 3;
+            public static final byte X  = 4;
+            
+            public static String toString(byte mode) {
+                switch (mode) {
+                    case NL: return "NL";
+                    case IS: return "IS";
+                    case IX: return "IX";
+                    case S:  return "S";
+                    case X:  return "X";
+                    default: throw new IllegalArgumentException("no such lock mode");
+                }
+            }
         }
     }
 
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index aceeb82..4cb5fac 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -22,7 +22,7 @@
 import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
 import edu.uci.ics.asterix.common.transactions.ITransactionManager;
 import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
-import edu.uci.ics.asterix.transaction.management.service.locking.LockManager;
+import edu.uci.ics.asterix.transaction.management.service.locking.ConcurrentLockManager;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
 import edu.uci.ics.asterix.transaction.management.service.recovery.CheckpointThread;
 import edu.uci.ics.asterix.transaction.management.service.recovery.RecoveryManager;
@@ -46,7 +46,7 @@
         this.id = id;
         this.txnProperties = txnProperties;
         this.transactionManager = new TransactionManager(this);
-        this.lockManager = new LockManager(this);
+        this.lockManager = new ConcurrentLockManager(this);
         this.logManager = new LogManager(this);
         this.recoveryManager = new RecoveryManager(this);
         this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;