Merge branch 'master' into westmann/locks
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 236df53..ccbf68f 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -17,6 +17,7 @@
import java.io.File;
import java.util.EnumSet;
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.hyracks.bootstrap.CCApplicationEntryPoint;
import edu.uci.ics.asterix.hyracks.bootstrap.NCApplicationEntryPoint;
@@ -99,6 +100,7 @@
nc2.stop();
nc1.stop();
cc.stop();
+ AsterixThreadExecutor.INSTANCE.shutdown();
}
public static void runJob(JobSpecification spec) throws Exception {
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
index edd4b2a..fb87f5e 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
@@ -36,4 +36,8 @@
public Future<Object> submit(Callable command) {
return (Future<Object>) executorService.submit(command);
}
+
+ public void shutdown() {
+ executorService.shutdown();
+ }
}
diff --git a/asterix-maven-plugins/pom.xml b/asterix-maven-plugins/pom.xml
index 56bc697..b26a6d6 100644
--- a/asterix-maven-plugins/pom.xml
+++ b/asterix-maven-plugins/pom.xml
@@ -35,5 +35,6 @@
<modules>
<module>lexer-generator-maven-plugin</module>
+ <module>record-manager-generator-maven-plugin</module>
</modules>
</project>
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/pom.xml b/asterix-maven-plugins/record-manager-generator-maven-plugin/pom.xml
new file mode 100644
index 0000000..9ffc8d2
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/pom.xml
@@ -0,0 +1,56 @@
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <artifactId>record-manager-generator-maven-plugin</artifactId>
+ <parent>
+ <artifactId>asterix-maven-plugins</artifactId>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <version>0.8.1-SNAPSHOT</version>
+ </parent>
+
+ <packaging>maven-plugin</packaging>
+ <name>record-manager-generator-maven-plugin</name>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ <fork>true</fork>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.maven</groupId>
+ <artifactId>maven-plugin-api</artifactId>
+ <version>2.0.2</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java
new file mode 100644
index 0000000..31c4855
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.recordmanagergenerator;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import edu.uci.ics.asterix.recordmanagergenerator.RecordType.Field;
+
+public class Generator {
+
+ public enum Manager {
+ RECORD,
+ ARENA
+ }
+
+ public static void generateSource(
+ Manager mgr,
+ RecordType rec,
+ InputStream is,
+ StringBuilder sb,
+ boolean debug) {
+ switch (mgr) {
+ case RECORD:
+ generateMemoryManagerSource(rec, is, sb, debug);
+ break;
+ case ARENA:
+ generateArenaManagerSource(rec, is, sb, debug);
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ private static void generateMemoryManagerSource(
+ RecordType resource,
+ InputStream is,
+ StringBuilder sb,
+ boolean debug) {
+ BufferedReader in = new BufferedReader(new InputStreamReader(is));
+ String line = null;
+
+ try {
+
+ String indent = " ";
+
+ while((line = in.readLine()) != null) {
+ if (line.contains("@E@")) {
+ line = line.replace("@E@", resource.name);
+ }
+ if (line.contains("@DEBUG@")) {
+ line = line.replace("@DEBUG@", Boolean.toString(debug));
+ }
+ if (line.contains("@CONSTS@")) {
+ resource.appendConstants(sb, indent, 1);
+ sb.append('\n');
+ } else if (line.contains("@METHODS@")) {
+ for (int i = 0; i < resource.size(); ++i) {
+ final Field field = resource.fields.get(i);
+ if (field.accessible) {
+ field.appendMemoryManagerGetMethod(sb, indent, 1);
+ sb.append('\n');
+ field.appendMemoryManagerSetMethod(sb, indent, 1);
+ sb.append('\n');
+ }
+ }
+ } else if (line.contains("@INIT_SLOT@")) {
+ for (int i = 0; i < resource.size(); ++i) {
+ final Field field = resource.fields.get(i);
+ field.appendInitializers(sb, indent, 3);
+ }
+ } else if (line.contains("@CHECK_SLOT@")) {
+ for (int i = 0; i < resource.size(); ++i) {
+ final Field field = resource.fields.get(i);
+ field.appendChecks(sb, indent, 3);
+ }
+ } else if (line.contains("@PRINT_BUFFER@")) {
+ resource.appendBufferPrinter(sb, indent, 3);
+ sb.append('\n');
+ } else {
+ sb.append(line).append('\n');
+ }
+ }
+
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ }
+ }
+
+ private static void generateArenaManagerSource(
+ RecordType resource,
+ InputStream is,
+ StringBuilder sb,
+ boolean debug) {
+ BufferedReader in = new BufferedReader(new InputStreamReader(is));
+ String line = null;
+
+ try {
+
+ String indent = " ";
+
+ while((line = in.readLine()) != null) {
+ if (line.contains("@E@")) {
+ line = line.replace("@E@", resource.name);
+ }
+ if (line.contains("@DEBUG@")) {
+ line = line.replace("@DEBUG@", Boolean.toString(debug));
+ }
+ if (line.contains("@METHODS@")) {
+ for (int i = 0; i < resource.size(); ++i) {
+ final Field field = resource.fields.get(i);
+ if (field.accessible) {
+ field.appendArenaManagerGetMethod(sb, indent, 1);
+ sb.append('\n');
+ field.appendArenaManagerSetMethod(sb, indent, 1);
+ sb.append('\n');
+ }
+ }
+ } else {
+ sb.append(line).append('\n');
+ }
+ }
+
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ }
+ }
+}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java
new file mode 100644
index 0000000..8b6b581
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.recordmanagergenerator;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.maven.plugin.AbstractMojo;
+import org.apache.maven.plugin.MojoExecutionException;
+import org.apache.maven.plugin.MojoFailureException;
+
+/**
+ * @goal generate-record-manager
+ * @phase generate-sources
+ * @requiresDependencyResolution compile
+ */
+public class RecordManagerGeneratorMojo extends AbstractMojo {
+
+ /**
+ * parameter injected from pom.xml
+ *
+ * @parameter
+ */
+ private boolean debug;
+ /**
+ * parameter injected from pom.xml
+ *
+ * @parameter
+ */
+ private String arenaManagerTemplate;
+ /**
+ * parameter injected from pom.xml
+ *
+ * @parameter
+ */
+ private String recordManagerTemplate;
+ /**
+ * parameter injected from pom.xml
+ *
+ * @parameter
+ * @required
+ */
+ private String[] recordTypes;
+ /**
+ * parameter injected from pom.xml
+ *
+ * @parameter
+ * @required
+ */
+ private File outputDir;
+
+ private Map<String, RecordType> typeMap;
+
+ public RecordManagerGeneratorMojo() {
+ }
+
+ private void defineRecordTypes() {
+ if (debug) {
+ getLog().info("generating debug code");
+ }
+
+ typeMap = new HashMap<String, RecordType>();
+
+ RecordType resource = new RecordType("Resource");
+ resource.addField("last holder", RecordType.Type.GLOBAL, "-1");
+ resource.addField("first waiter", RecordType.Type.GLOBAL, "-1");
+ resource.addField("first upgrader", RecordType.Type.GLOBAL, "-1");
+ resource.addField("next", RecordType.Type.GLOBAL, null);
+ resource.addField("max mode", RecordType.Type.INT, "LockMode.NL");
+ resource.addField("dataset id", RecordType.Type.INT, null);
+ resource.addField("pk hash val", RecordType.Type.INT, null);
+ resource.addField("alloc id", RecordType.Type.SHORT, null);
+
+ resource.addToMap(typeMap);
+
+ RecordType request = new RecordType("Request");
+ request.addField("resource id", RecordType.Type.GLOBAL, null);
+ request.addField("job slot", RecordType.Type.GLOBAL, null);
+ request.addField("prev job request", RecordType.Type.GLOBAL, null);
+ request.addField("next job request", RecordType.Type.GLOBAL, null);
+ request.addField("next request", RecordType.Type.GLOBAL, null);
+ request.addField("lock mode", RecordType.Type.INT, null);
+ request.addField("alloc id", RecordType.Type.SHORT, null);
+
+ request.addToMap(typeMap);
+
+ RecordType job = new RecordType("Job");
+ job.addField("last holder", RecordType.Type.GLOBAL, "-1");
+ job.addField("last waiter", RecordType.Type.GLOBAL, "-1");
+ job.addField("last upgrader", RecordType.Type.GLOBAL, "-1");
+ job.addField("job id", RecordType.Type.INT, null);
+ job.addField("alloc id", RecordType.Type.SHORT, null);
+
+ job.addToMap(typeMap);
+ }
+
+ public void execute() throws MojoExecutionException, MojoFailureException {
+ if (!outputDir.exists()) {
+ outputDir.mkdirs();
+ }
+
+ defineRecordTypes();
+
+ for (int i = 0; i < recordTypes.length; ++i) {
+ final String recordType = recordTypes[i];
+
+ if (recordManagerTemplate != null) {
+ generateSource(Generator.Manager.RECORD, recordManagerTemplate, recordType);
+ }
+
+ if (arenaManagerTemplate != null) {
+ generateSource(Generator.Manager.ARENA, arenaManagerTemplate, recordType);
+ }
+ }
+ }
+
+ private void generateSource(Generator.Manager mgrType, String template, String recordType) throws MojoFailureException {
+ InputStream is = getClass().getClassLoader().getResourceAsStream(template);
+ if (is == null) {
+ throw new MojoFailureException("template '" + template + "' not found in classpath");
+ }
+
+ StringBuilder sb = new StringBuilder();
+ File outputFile = new File(outputDir, recordType + template);
+
+ try {
+ getLog().info("generating " + outputFile.toString());
+
+ Generator.generateSource(mgrType, typeMap.get(recordType), is, sb, debug);
+ is.close();
+
+ FileWriter outWriter = new FileWriter(outputFile);
+ outWriter.write(sb.toString());
+ outWriter.close();
+ } catch (Exception ex) {
+ getLog().error(ex);
+ throw new MojoFailureException("failed to generate " + outputFile.toString());
+ }
+ }
+}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java
new file mode 100644
index 0000000..6d31f63
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java
@@ -0,0 +1,389 @@
+/*
+ * Copyright 2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.recordmanagergenerator;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Map;
+
+public class RecordType {
+
+ enum Type {
+ BYTE,
+ SHORT,
+ INT,
+ GLOBAL
+ }
+
+ static class Field {
+
+ String name;
+ Type type;
+ String initial;
+ int offset;
+ boolean accessible = true;
+
+ Field(String name, Type type, String initial, int offset, boolean accessible) {
+ this.name = name;
+ this.type = type;
+ this.initial = initial;
+ this.offset = offset;
+ this.accessible = accessible;
+ }
+
+ String methodName(String prefix) {
+ String words[] = name.split(" ");
+ assert(words.length > 0);
+ StringBuilder sb = new StringBuilder(prefix);
+ for (int j = 0; j < words.length; ++j) {
+ String word = words[j];
+ sb.append(word.substring(0, 1).toUpperCase());
+ sb.append(word.substring(1));
+ }
+ return sb.toString();
+ }
+
+ StringBuilder appendMemoryManagerGetMethod(StringBuilder sb, String indent, int level) {
+ sb = indent(sb, indent, level);
+ sb.append("public ")
+ .append(javaType(type))
+ .append(' ')
+ .append(methodName("get"))
+ .append("(int slotNum) {\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("final Buffer buf = buffers.get(slotNum / NO_SLOTS);\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("buf.checkSlot(slotNum % NO_SLOTS);\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("final ByteBuffer b = buf.bb;\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("return b.")
+ .append(bbGetter(type))
+ .append("((slotNum % NO_SLOTS) * ITEM_SIZE + ")
+ .append(offsetName())
+ .append(");\n");
+ sb = indent(sb, indent, level);
+ sb.append("}\n");
+ return sb;
+ }
+
+ StringBuilder appendMemoryManagerSetMethod(StringBuilder sb, String indent, int level) {
+ sb = indent(sb, indent, level);
+ sb.append("public void ")
+ .append(methodName("set"))
+ .append("(int slotNum, ")
+ .append(javaType(type))
+ .append(" value) {\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("b.")
+ .append(bbSetter(type))
+ .append("((slotNum % NO_SLOTS) * ITEM_SIZE + ")
+ .append(offsetName())
+ .append(", value);\n");
+ sb = indent(sb, indent, level);
+ sb.append("}\n");
+ return sb;
+ }
+
+ StringBuilder appendArenaManagerGetMethod(StringBuilder sb, String indent, int level) {
+ sb = indent(sb, indent, level);
+ sb.append("public ")
+ .append(javaType(type))
+ .append(' ')
+ .append(methodName("get"))
+ .append("(long slotNum) {\n");
+ if (initial != null) {
+ sb = indent(sb, indent, level + 1);
+ sb.append("if (TRACK_ALLOC) checkSlot(slotNum);\n");
+ }
+ sb = indent(sb, indent, level + 1);
+ sb.append("final int arenaId = RecordManagerTypes.Global.arenaId(slotNum);\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("final int localId = RecordManagerTypes.Global.localId(slotNum);\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("return get(arenaId).")
+ .append(methodName("get"))
+ .append("(localId);\n");
+ sb = indent(sb, indent, level);
+ sb.append("}\n");
+ return sb;
+ }
+
+ StringBuilder appendArenaManagerSetMethod(StringBuilder sb, String indent, int level) {
+ sb = indent(sb, indent, level);
+ sb.append("public void ")
+ .append(methodName("set"))
+ .append("(long slotNum, ")
+ .append(javaType(type))
+ .append(" value) {\n");
+ if (initial != null) {
+ sb = indent(sb, indent, level + 1);
+ sb.append("if (TRACK_ALLOC) checkSlot(slotNum);\n");
+ }
+ sb = indent(sb, indent, level + 1);
+ sb.append("final int arenaId = RecordManagerTypes.Global.arenaId(slotNum);\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("final int localId = RecordManagerTypes.Global.localId(slotNum);\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("get(arenaId).")
+ .append(methodName("set"))
+ .append("(localId, value);\n");
+ sb = indent(sb, indent, level);
+ sb.append("}\n");
+ return sb;
+ }
+
+ StringBuilder appendInitializers(StringBuilder sb, String indent, int level) {
+ sb = indent(sb, indent, level);
+ sb.append("bb.")
+ .append(bbSetter(type))
+ .append("(slotNum * ITEM_SIZE + ")
+ .append(offsetName())
+ .append(", ");
+ if (initial != null) {
+ sb.append(initial);
+ } else {
+ sb.append(deadMemInitializer(type));
+ }
+ sb.append(");\n");
+ return sb;
+ }
+
+ StringBuilder appendChecks(StringBuilder sb, String indent, int level) {
+ if (initial == null) {
+ return sb;
+ }
+ sb = indent(sb, indent, level);
+ sb.append("if (bb.")
+ .append(bbGetter(type))
+ .append("(itemOffset + ")
+ .append(offsetName())
+ .append(") == ")
+ .append(deadMemInitializer(type))
+ .append(") {\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("String msg = \"invalid value in field ")
+ .append(offsetName())
+ .append(" of slot \" + slotNum;\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("throw new IllegalStateException(msg);\n");
+ sb = indent(sb, indent, level);
+ sb.append("}\n");
+ return sb;
+ }
+
+ String offsetName() {
+ String words[] = name.split(" ");
+ assert(words.length > 0);
+ StringBuilder sb = new StringBuilder(words[0].toUpperCase());
+ for (int j = 1; j < words.length; ++j) {
+ sb.append("_").append(words[j].toUpperCase());
+ }
+ sb.append("_OFF");
+ return sb.toString();
+ }
+
+ int offset() {
+ return offset;
+ }
+ }
+
+ String name;
+ ArrayList<Field> fields;
+ int totalSize;
+ boolean modifiable = true;
+
+ static StringBuilder indent(StringBuilder sb, String indent, int level) {
+ for (int i = 0; i < level; ++i) {
+ sb.append(indent);
+ }
+ return sb;
+ }
+
+ public RecordType(String name) {
+ this.name = name;
+ fields = new ArrayList<Field>();
+ addField("next free slot", Type.INT, "-1", false);
+ }
+
+ public void addToMap(Map<String, RecordType> map) {
+ modifiable = false;
+ calcOffsetsAndSize();
+ map.put(name, this);
+ }
+
+ public void addField(String name, Type type, String initial) {
+ addField(name, type, initial, true);
+ }
+
+ private void addField(String name, Type type, String initial, boolean accessible) {
+ if (! modifiable) {
+ throw new IllegalStateException("cannot modify type anmore");
+ }
+ fields.add(new Field(name, type, initial, -1, accessible));
+ }
+
+ private void calcOffsetsAndSize() {
+ Collections.sort(fields, new Comparator<Field>() {
+ public int compare(Field left, Field right) {
+ return size(right.type) - size(left.type);
+ }
+ });
+ // sort fields by size and align the items
+ totalSize = 0;
+ int alignment = 0;
+ for (int i = 0; i < fields.size(); ++i) {
+ final Field field = fields.get(i);
+ assert field.offset == -1;
+ field.offset = totalSize;
+ final int size = size(field.type);
+ totalSize += size;
+ if (size > alignment) alignment = size;
+ }
+ if (totalSize % alignment != 0) {
+ totalSize = ((totalSize / alignment) + 1) * alignment;
+ }
+ }
+
+ int size() {
+ return fields.size();
+ }
+
+ static int size(Type t) {
+ switch(t) {
+ case BYTE: return 1;
+ case SHORT: return 2;
+ case INT: return 4;
+ case GLOBAL: return 8;
+ default: throw new IllegalArgumentException();
+ }
+ }
+
+ static String javaType(Type t) {
+ switch(t) {
+ case BYTE: return "byte";
+ case SHORT: return "short";
+ case INT: return "int";
+ case GLOBAL: return "long";
+ default: throw new IllegalArgumentException();
+ }
+ }
+
+ static String bbGetter(Type t) {
+ switch(t) {
+ case BYTE: return "get";
+ case SHORT: return "getShort";
+ case INT: return "getInt";
+ case GLOBAL: return "getLong";
+ default: throw new IllegalArgumentException();
+ }
+ }
+
+ static String bbSetter(Type t) {
+ switch(t) {
+ case BYTE: return "put";
+ case SHORT: return "putShort";
+ case INT: return "putInt";
+ case GLOBAL: return "putLong";
+ default: throw new IllegalArgumentException();
+ }
+ }
+
+ static String deadMemInitializer(Type t) {
+ switch(t) {
+ case BYTE: return "(byte)0xde";
+ case SHORT: return "(short)0xdead";
+ case INT: return "0xdeadbeef";
+ case GLOBAL: return "0xdeadbeefdeadbeefl";
+ default: throw new IllegalArgumentException();
+ }
+ }
+
+ static String appender(Type t) {
+ switch(t) {
+ case BYTE: return "RecordManagerTypes.Byte.append";
+ case SHORT: return "RecordManagerTypes.Short.append";
+ case INT: return "RecordManagerTypes.Int.append";
+ case GLOBAL: return "RecordManagerTypes.Global.append";
+ default: throw new IllegalArgumentException();
+ }
+ }
+
+ static String padRight(String s, int n) {
+ return String.format("%1$-" + n + "s", s);
+ }
+
+ static String padLeft(String s, int n) {
+ return String.format("%1$" + n + "s", s);
+ }
+
+ StringBuilder appendConstants(StringBuilder sb, String indent, int level) {
+ sb = indent(sb, indent, level);
+ sb.append("public static int ITEM_SIZE = ")
+ .append(totalSize)
+ .append(";\n");
+ for (int i = 0; i < fields.size(); ++i) {
+ final Field field = fields.get(i);
+ sb = indent(sb, indent, level);
+ sb.append("public static int ")
+ .append(field.offsetName())
+ .append(" = ")
+ .append(field.offset).append("; // size: ")
+ .append(size(field.type)).append("\n");
+ }
+ return sb;
+ }
+
+ StringBuilder appendBufferPrinter(StringBuilder sb, String indent, int level) {
+ int maxNameWidth = 0;
+ for (int i = 0; i < fields.size(); ++i) {
+ int width = fields.get(i).name.length();
+ if (width > maxNameWidth) {
+ maxNameWidth = width;
+ }
+ }
+ for (int i = 0; i < fields.size(); ++i) {
+ final Field field = fields.get(i);
+ sb = indent(sb, indent, level);
+ sb.append("sb.append(\"")
+ .append(padRight(field.name, maxNameWidth))
+ .append(" | \");\n");
+ sb = indent(sb, indent, level);
+ sb.append("for (int i = 0; i < NO_SLOTS; ++i) {\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append(javaType(field.type))
+ .append(" value = bb.")
+ .append(bbGetter(field.type))
+ .append("(i * ITEM_SIZE + ")
+ .append(field.offsetName())
+ .append(");\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("sb = ")
+ .append(appender(field.type))
+ .append("(sb, value);\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("sb.append(\" | \");\n");
+ sb = indent(sb, indent, level);
+ sb.append("}\n");
+ sb = indent(sb, indent, level);
+ sb.append("sb.append(\"\\n\");\n");
+ }
+ return sb;
+ }
+}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java
new file mode 100644
index 0000000..58f8972
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.asterix.transaction.management.service.locking.AllocInfo;
+import edu.uci.ics.asterix.transaction.management.service.locking.RecordManagerTypes;
+
+public class @E@ArenaManager {
+
+ public static final boolean TRACK_ALLOC = @DEBUG@;
+
+ private final int noArenas;
+ private final long txnShrinkTimer;
+ private ArrayList<@E@RecordManager> arenas;
+ private volatile int nextArena;
+ private ThreadLocal<LocalManager> local;
+
+ public @E@ArenaManager(long txnShrinkTimer) {
+ this.txnShrinkTimer = txnShrinkTimer;
+ noArenas = Runtime.getRuntime().availableProcessors() * 2;
+ arenas = new ArrayList<@E@RecordManager>(noArenas);
+ nextArena = 0;
+ local = new ThreadLocal<LocalManager>() {
+ @Override
+ protected LocalManager initialValue() {
+ return getNext();
+ }
+ };
+ }
+
+ public long allocate() {
+ final LocalManager localManager = local.get();
+ long result = localManager.arenaId;
+ result = result << 48;
+ final int localId = localManager.mgr.allocate();
+ result |= localId;
+ if (TRACK_ALLOC) {
+ final long allocId = (++localManager.mgr.allocCounter % 0x7fff);
+ result |= (allocId << 32);
+ setAllocId(result, (short) allocId);
+ assert RecordManagerTypes.Global.allocId(result) == allocId;
+ }
+ assert RecordManagerTypes.Global.arenaId(result) == localManager.arenaId;
+ assert RecordManagerTypes.Global.localId(result) == localId;
+ return result;
+ }
+
+ public void deallocate(long slotNum) {
+ if (TRACK_ALLOC) {
+ checkSlot(slotNum);
+ }
+ final int arenaId = RecordManagerTypes.Global.arenaId(slotNum);
+ get(arenaId).deallocate(RecordManagerTypes.Global.localId(slotNum));
+ }
+
+ public synchronized LocalManager getNext() {
+ if (nextArena >= arenas.size()) {
+ arenas.add(new @E@RecordManager(txnShrinkTimer));
+ }
+ @E@RecordManager mgr = arenas.get(nextArena);
+ LocalManager res = new LocalManager();
+ res.mgr = mgr;
+ res.arenaId = nextArena;
+ nextArena = (nextArena + 1) % noArenas;
+ return res;
+ }
+
+ public @E@RecordManager get(int i) {
+ return arenas.get(i);
+ }
+
+ public @E@RecordManager local() {
+ return local.get().mgr;
+ }
+
+ @METHODS@
+
+ private void checkSlot(long slotNum) {
+ final int refAllocId = RecordManagerTypes.Global.allocId(slotNum);
+ final short curAllocId = getAllocId(slotNum);
+ if (refAllocId != curAllocId) {
+ String msg = "reference to slot " + slotNum
+ + " of arena " + RecordManagerTypes.Global.arenaId(slotNum)
+ + " refers to version " + Integer.toHexString(refAllocId)
+ + " current version is " + Integer.toHexString(curAllocId);
+ AllocInfo a = getAllocInfo(slotNum);
+ if (a != null) {
+ msg += "\n" + a.toString();
+ }
+ throw new IllegalStateException(msg);
+ }
+ }
+
+ public AllocInfo getAllocInfo(long slotNum) {
+ final int arenaId = RecordManagerTypes.Global.arenaId(slotNum);
+ return get(arenaId).getAllocInfo(RecordManagerTypes.Global.localId(slotNum));
+ }
+
+ static class LocalManager {
+ int arenaId;
+ @E@RecordManager mgr;
+ }
+
+ public StringBuilder append(StringBuilder sb) {
+ for (int i = 0; i < arenas.size(); ++i) {
+ sb.append("++++ arena ").append(i).append(" ++++\n");
+ arenas.get(i).append(sb);
+ }
+ return sb;
+ }
+
+ public String toString() {
+ return append(new StringBuilder()).toString();
+ }
+
+ public Stats addTo(Stats s) {
+ final int size = arenas.size();
+ s.arenas += size;
+ for (int i = 0; i < size; ++i) {
+ arenas.get(i).addTo(s);
+ }
+ return s;
+ }
+}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
new file mode 100644
index 0000000..01049d3
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
@@ -0,0 +1,304 @@
+/*
+ * Copyright 2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import edu.uci.ics.asterix.transaction.management.service.locking.AllocInfo;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+
+public class @E@RecordManager {
+
+ public static final boolean TRACK_ALLOC = @DEBUG@;
+
+ static final int NO_SLOTS = 10;
+
+ @CONSTS@
+
+ private final long txnShrinkTimer;
+ private long shrinkTimer;
+ private ArrayList<Buffer> buffers;
+ private int current;
+ private int occupiedSlots;
+ private boolean isShrinkTimerOn;
+
+ int allocCounter;
+
+ public @E@RecordManager(long txnShrinkTimer) {
+ this.txnShrinkTimer = txnShrinkTimer;
+ buffers = new ArrayList<Buffer>();
+ buffers.add(new Buffer());
+ current = 0;
+
+ allocCounter = 0;
+ }
+
+ synchronized int allocate() {
+ if (buffers.get(current).isFull()) {
+ int size = buffers.size();
+ boolean needNewBuffer = true;
+ for (int i = 0; i < size; i++) {
+ Buffer buffer = buffers.get(i);
+ if (! buffer.isInitialized()) {
+ buffer.initialize();
+ current = i;
+ needNewBuffer = false;
+ break;
+ }
+ }
+
+ if (needNewBuffer) {
+ buffers.add(new Buffer());
+ current = buffers.size() - 1;
+ }
+ }
+ ++occupiedSlots;
+ return buffers.get(current).allocate() + current * NO_SLOTS;
+ }
+
+ synchronized void deallocate(int slotNum) {
+ buffers.get(slotNum / NO_SLOTS).deallocate(slotNum % NO_SLOTS);
+ --occupiedSlots;
+
+ if (needShrink()) {
+ shrink();
+ }
+ }
+
+ /**
+ * Shrink policy:
+ * Shrink when the resource under-utilization lasts for a certain amount of time.
+ * TODO Need to figure out which of the policies is better
+ * case1.
+ * buffers status : O x x x x x O (O is initialized, x is deinitialized)
+ * In the above status, 'CURRENT' needShrink() returns 'TRUE'
+ * even if there is nothing to shrink or deallocate.
+ * It doesn't distinguish the deinitialized children from initialized children
+ * by calculating totalNumOfSlots = buffers.size() * ChildEntityLockInfoArrayManager.NUM_OF_SLOTS.
+ * In other words, it doesn't subtract the deinitialized children's slots.
+ * case2.
+ * buffers status : O O x x x x x
+ * However, in the above case, if we subtract the deinitialized children's slots,
+ * needShrink() will return false even if we shrink the buffers at this case.
+ *
+ * @return
+ */
+ private boolean needShrink() {
+ int size = buffers.size();
+ int usedSlots = occupiedSlots;
+ if (usedSlots == 0) {
+ usedSlots = 1;
+ }
+
+ if (size > 1 && size * NO_SLOTS / usedSlots >= 3) {
+ if (isShrinkTimerOn) {
+ if (System.currentTimeMillis() - shrinkTimer >= txnShrinkTimer) {
+ isShrinkTimerOn = false;
+ return true;
+ }
+ } else {
+ //turn on timer
+ isShrinkTimerOn = true;
+ shrinkTimer = System.currentTimeMillis();
+ }
+ } else {
+ //turn off timer
+ isShrinkTimerOn = false;
+ }
+
+ return false;
+ }
+
+ /**
+ * Shrink() may
+ * deinitialize(:deallocates ByteBuffer of child) Children(s) or
+ * shrink buffers according to the deinitialized children's contiguity status.
+ * It doesn't deinitialze or shrink more than half of children at a time.
+ */
+ private void shrink() {
+ int i;
+ int removeCount = 0;
+ int size = buffers.size();
+ int maxDecreaseCount = size / 2;
+ Buffer buffer;
+
+ //The first buffer never be deinitialized.
+ for (i = 1; i < size; i++) {
+ if (buffers.get(i).isEmpty()) {
+ buffers.get(i).deinitialize();
+ }
+ }
+
+ //remove the empty buffers from the end
+ for (i = size - 1; i >= 1; i--) {
+ buffer = buffers.get(i);
+ if (! buffer.isInitialized()) {
+ buffers.remove(i);
+ if (++removeCount == maxDecreaseCount) {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+
+ //reset allocChild to the first buffer
+ current = 0;
+
+ isShrinkTimerOn = false;
+ }
+
+ @METHODS@
+
+ public AllocInfo getAllocInfo(int slotNum) {
+ final Buffer buf = buffers.get(slotNum / NO_SLOTS);
+ if (buf.allocList == null) {
+ return null;
+ } else {
+ return buf.allocList.get(slotNum % NO_SLOTS);
+ }
+ }
+
+ StringBuilder append(StringBuilder sb) {
+ sb.append("+++ current: ")
+ .append(current)
+ .append(" no occupied slots: ")
+ .append(occupiedSlots)
+ .append(" +++\n");
+ for (int i = 0; i < buffers.size(); ++i) {
+ buffers.get(i).append(sb);
+ sb.append("\n");
+ }
+ return sb;
+ }
+
+ public String toString() {
+ return append(new StringBuilder()).toString();
+ }
+
+ public Stats addTo(Stats s) {
+ final int size = buffers.size();
+ s.buffers += size;
+ s.slots += size * NO_SLOTS;
+ s.size += size * NO_SLOTS * ITEM_SIZE;
+ for (int i = 0; i < size; ++i) {
+ buffers.get(i).addTo(s);
+ }
+ return s;
+ }
+
+ static class Buffer {
+ private ByteBuffer bb;
+ private int freeSlotNum;
+ private int occupiedSlots = -1; //-1 represents 'deinitialized' state.
+
+ ArrayList<AllocInfo> allocList;
+
+ Buffer() {
+ initialize();
+ }
+
+ void initialize() {
+ bb = ByteBuffer.allocate(NO_SLOTS * ITEM_SIZE);
+ freeSlotNum = 0;
+ occupiedSlots = 0;
+
+ for (int i = 0; i < NO_SLOTS - 1; i++) {
+ setNextFreeSlot(i, i + 1);
+ }
+ setNextFreeSlot(NO_SLOTS - 1, -1); //-1 represents EOL(end of link)
+
+ if (TRACK_ALLOC) {
+ allocList = new ArrayList<AllocInfo>(NO_SLOTS);
+ for (int i = 0; i < NO_SLOTS; ++i) {
+ allocList.add(new AllocInfo());
+ }
+ }
+ }
+
+ public void deinitialize() {
+ bb = null;
+ occupiedSlots = -1;
+ }
+
+ public boolean isInitialized() {
+ return occupiedSlots >= 0;
+ }
+
+ public boolean isFull() {
+ return occupiedSlots == NO_SLOTS;
+ }
+
+ public boolean isEmpty() {
+ return occupiedSlots == 0;
+ }
+
+ public int allocate() {
+ int slotNum = freeSlotNum;
+ freeSlotNum = getNextFreeSlot(slotNum);
+ @INIT_SLOT@
+ occupiedSlots++;
+ if (TRACK_ALLOC) allocList.get(slotNum).alloc();
+ return slotNum;
+ }
+
+ public void deallocate(int slotNum) {
+ @INIT_SLOT@
+ setNextFreeSlot(slotNum, freeSlotNum);
+ freeSlotNum = slotNum;
+ occupiedSlots--;
+ if (TRACK_ALLOC) allocList.get(slotNum).free();
+ }
+
+ public int getNextFreeSlot(int slotNum) {
+ return bb.getInt(slotNum * ITEM_SIZE + NEXT_FREE_SLOT_OFF);
+ }
+
+ public void setNextFreeSlot(int slotNum, int nextFreeSlot) {
+ bb.putInt(slotNum * ITEM_SIZE + NEXT_FREE_SLOT_OFF, nextFreeSlot);
+ }
+
+ StringBuilder append(StringBuilder sb) {
+ sb.append("++ free slot: ")
+ .append(freeSlotNum)
+ .append(" no occupied slots: ")
+ .append(occupiedSlots)
+ .append(" ++\n");
+ @PRINT_BUFFER@
+ return sb;
+ }
+
+ public String toString() {
+ return append(new StringBuilder()).toString();
+ }
+
+ public void addTo(Stats s) {
+ if (isInitialized()) {
+ s.items += occupiedSlots;
+ }
+ }
+
+ private void checkSlot(int slotNum) {
+ if (! TRACK_ALLOC) {
+ return;
+ }
+ final int itemOffset = (slotNum % NO_SLOTS) * ITEM_SIZE;
+ // @CHECK_SLOT@
+ }
+ }
+
+}
diff --git a/asterix-transactions/pom.xml b/asterix-transactions/pom.xml
index 3123008..13a01a6 100644
--- a/asterix-transactions/pom.xml
+++ b/asterix-transactions/pom.xml
@@ -33,6 +33,49 @@
<fork>true</fork>
</configuration>
</plugin>
+ <plugin>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <artifactId>record-manager-generator-maven-plugin</artifactId>
+ <version>0.8.1-SNAPSHOT</version>
+ <configuration>
+ <debug>false</debug>
+ <arenaManagerTemplate>ArenaManager.java</arenaManagerTemplate>
+ <recordManagerTemplate>RecordManager.java</recordManagerTemplate>
+ <recordTypes>
+ <param>Job</param>
+ <param>Request</param>
+ <param>Resource</param>
+ </recordTypes>
+ <outputDir>${project.build.directory}/generated-sources/java/edu/uci/ics/asterix/transaction/management/service/locking</outputDir>
+ </configuration>
+ <executions>
+ <execution>
+ <id>generate-record-manager</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>generate-record-manager</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.build.directory}/generated-sources/java/</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/AllocInfo.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/AllocInfo.java
new file mode 100644
index 0000000..dfba774
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/AllocInfo.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+public class AllocInfo {
+ String alloc;
+ String free;
+
+ void alloc() {
+ alloc = getStackTrace();
+ }
+
+ void free() {
+ free = getStackTrace();
+ }
+
+ private String getStackTrace() {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ new Exception().printStackTrace(pw);
+ pw.close();
+ String res = sw.toString();
+ // remove first 3 lines
+ int nlPos = 0;
+ for (int i = 0; i < 3; ++i) {
+ nlPos = res.indexOf('\n', nlPos) + 1;
+ }
+ return res.substring(nlPos);
+ }
+
+ public String toString() {
+ return "allocation stack:\n" + alloc + "\nfree stack\n" + free;
+ }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
new file mode 100644
index 0000000..80da1ab
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -0,0 +1,845 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.transactions.DatasetId;
+import edu.uci.ics.asterix.common.transactions.ILockManager;
+import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.transactions.ITransactionManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
+
+/**
+ * An implementation of the ILockManager interface.
+ *
+ * @author tillw
+ */
+
+public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent {
+
+ public static final boolean IS_DEBUG_MODE = false;//true
+
+ private TransactionSubsystem txnSubsystem;
+
+ private ResourceGroupTable table;
+ private ResourceArenaManager resArenaMgr;
+ private RequestArenaManager reqArenaMgr;
+ private JobArenaManager jobArenaMgr;
+ private ConcurrentHashMap<Integer, Long> jobIdSlotMap;
+ private ThreadLocal<DatasetLockCache> dsLockCache;
+
+ enum LockAction {
+ GET,
+ UPD, // special version of GET that updates the max lock mode
+ WAIT,
+ CONV // convert (upgrade) a lock (e.g. from S to X)
+ }
+
+ static LockAction[][] ACTION_MATRIX = {
+ // new NL IS IX S X
+ { LockAction.GET, LockAction.UPD, LockAction.UPD, LockAction.UPD, LockAction.UPD }, // NL
+ { LockAction.GET, LockAction.GET, LockAction.UPD, LockAction.UPD, LockAction.WAIT }, // IS
+ { LockAction.GET, LockAction.GET, LockAction.GET, LockAction.WAIT, LockAction.WAIT }, // IX
+ { LockAction.GET, LockAction.GET, LockAction.WAIT, LockAction.GET, LockAction.WAIT }, // S
+ { LockAction.GET, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT } // X
+ };
+
+ public ConcurrentLockManager(TransactionSubsystem txnSubsystem) throws ACIDException {
+ this.txnSubsystem = txnSubsystem;
+
+ this.table = new ResourceGroupTable();
+
+ final int lockManagerShrinkTimer = txnSubsystem.getTransactionProperties()
+ .getLockManagerShrinkTimer();
+
+ resArenaMgr = new ResourceArenaManager(lockManagerShrinkTimer);
+ reqArenaMgr = new RequestArenaManager(lockManagerShrinkTimer);
+ jobArenaMgr = new JobArenaManager(lockManagerShrinkTimer);
+ jobIdSlotMap = new ConcurrentHashMap<>();
+ dsLockCache = new ThreadLocal<DatasetLockCache>() {
+ protected DatasetLockCache initialValue() {
+ return new DatasetLockCache();
+ }
+ };
+ }
+
+ public AsterixTransactionProperties getTransactionProperties() {
+ return this.txnSubsystem.getTransactionProperties();
+ }
+
+ @Override
+ public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+ throws ACIDException {
+
+ log("lock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+
+ final int dsId = datasetId.getId();
+ final int jobId = txnContext.getJobId().getId();
+
+ if (entityHashValue != -1) {
+ // get the intention lock on the dataset, if we want to lock an individual item
+ final byte dsLockMode = lockMode == LockMode.X ? LockMode.IX : LockMode.IS;
+ if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
+ lock(datasetId, -1, dsLockMode, txnContext);
+ dsLockCache.get().put(jobId, dsId, dsLockMode);
+ }
+ }
+
+ long jobSlot = findOrAllocJobSlot(jobId);
+
+ ResourceGroup group = table.get(datasetId, entityHashValue);
+ group.getLatch();
+
+ try {
+ validateJob(txnContext);
+
+ // 1) Find the resource in the hash table
+ long resSlot = findOrAllocResourceSlot(group, dsId, entityHashValue);
+ // 2) create a request entry
+ long reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
+ // 3) check lock compatibility
+ boolean locked = false;
+
+ while (! locked) {
+ int curLockMode = resArenaMgr.getMaxMode(resSlot);
+ LockAction act = ACTION_MATRIX[curLockMode][lockMode];
+ if (act == LockAction.WAIT) {
+ act = updateActionForSameJob(resSlot, jobSlot, lockMode);
+ }
+ switch (act) {
+ case UPD:
+ resArenaMgr.setMaxMode(resSlot, lockMode);
+ // no break
+ case GET:
+ addHolder(reqSlot, resSlot, jobSlot);
+ locked = true;
+ break;
+ case WAIT:
+ if (! introducesDeadlock(resSlot, jobSlot)) {
+ addWaiter(reqSlot, resSlot, jobSlot);
+ } else {
+ requestAbort(txnContext);
+ }
+ group.await(txnContext);
+ removeWaiter(reqSlot, resSlot, jobSlot);
+ break;
+ case CONV:
+ // TODO can we have more than on upgrader? Or do we need to
+ // abort if we get a second upgrader?
+ addUpgrader(reqSlot, resSlot, jobSlot);
+ group.await(txnContext);
+ removeUpgrader(reqSlot, resSlot, jobSlot);
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+ } finally {
+ group.releaseLatch();
+ }
+ }
+
+ /**
+ * determine if adding a job to the waiters of a resource will introduce a
+ * cycle in the wait-graph where the job waits on itself
+ * @param resSlot the slot that contains the information about the resource
+ * @param jobSlot the slot that contains the information about the job
+ * @return true if a cycle would be introduced, false otherwise
+ */
+ private boolean introducesDeadlock(long resSlot, long jobSlot) {
+ long reqSlot = resArenaMgr.getLastHolder(resSlot);
+ while (reqSlot >= 0) {
+ long holderJobSlot = reqArenaMgr.getJobSlot(reqSlot);
+ if (holderJobSlot == jobSlot) {
+ return true;
+ }
+ long waiter = jobArenaMgr.getLastWaiter(holderJobSlot);
+ while (waiter >= 0) {
+ long watingOnResSlot = reqArenaMgr.getResourceId(waiter);
+ if (introducesDeadlock(watingOnResSlot, jobSlot)) {
+ return true;
+ }
+ waiter = reqArenaMgr.getNextJobRequest(waiter);
+ }
+ reqSlot = reqArenaMgr.getNextRequest(reqSlot);
+ }
+ return false;
+ }
+
+ @Override
+ public void instantLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+ throws ACIDException {
+ log("instantLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+
+ lock(datasetId, entityHashValue, lockMode, txnContext);
+ unlock(datasetId, entityHashValue, txnContext);
+ }
+
+ @Override
+ public boolean tryLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+ throws ACIDException {
+ log("tryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+
+ final int dsId = datasetId.getId();
+ final int jobId = txnContext.getJobId().getId();
+
+ if (entityHashValue != -1) {
+ // get the intention lock on the dataset, if we want to lock an individual item
+ byte dsLockMode = lockMode == LockMode.X ? LockMode.IX : LockMode.IS;
+ if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
+ if (! tryLock(datasetId, -1, dsLockMode, txnContext)) {
+ return false;
+ }
+ dsLockCache.get().put(jobId, dsId, dsLockMode);
+ }
+ }
+
+ long jobSlot = findOrAllocJobSlot(jobId);
+
+ boolean locked = false;
+
+ ResourceGroup group = table.get(datasetId, entityHashValue);
+ group.getLatch();
+
+ try {
+ validateJob(txnContext);
+
+ // 1) Find the resource in the hash table
+ long resSlot = findOrAllocResourceSlot(group, dsId, entityHashValue);
+ // 2) create a request entry
+ long reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
+ // 3) check lock compatibility
+
+ int curLockMode = resArenaMgr.getMaxMode(resSlot);
+ LockAction act = ACTION_MATRIX[curLockMode][lockMode];
+ if (act == LockAction.WAIT) {
+ act = updateActionForSameJob(resSlot, jobSlot, lockMode);
+ }
+ switch (act) {
+ case UPD:
+ resArenaMgr.setMaxMode(resSlot, lockMode);
+ // no break
+ case GET:
+ addHolder(reqSlot, resSlot, jobSlot);
+ locked = true;
+ break;
+ case WAIT:
+ case CONV:
+ locked = false;
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ } finally {
+ group.releaseLatch();
+ }
+
+ // if we did acquire the dataset lock, but not the entity lock, we keep
+ // it anyway and clean it up at the end of the job
+
+ return locked;
+ }
+
+ @Override
+ public boolean instantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
+ ITransactionContext txnContext) throws ACIDException {
+ log("instantTryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+
+ if (tryLock(datasetId, entityHashValue, lockMode, txnContext)) {
+ unlock(datasetId, entityHashValue, txnContext);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext) throws ACIDException {
+ log("unlock", datasetId.getId(), entityHashValue, LockMode.NL, txnContext);
+
+ ResourceGroup group = table.get(datasetId, entityHashValue);
+ group.getLatch();
+
+ try {
+
+ int dsId = datasetId.getId();
+ long resource = findResourceInGroup(group, dsId, entityHashValue);
+
+ if (resource < 0) {
+ throw new IllegalStateException("resource (" + dsId + ", " + entityHashValue + ") not found");
+ }
+
+ int jobId = txnContext.getJobId().getId();
+ long jobSlot = findOrAllocJobSlot(jobId);
+
+ long holder = removeLastHolder(resource, jobSlot);
+
+ // deallocate request
+ reqArenaMgr.deallocate(holder);
+ // deallocate resource or fix max lock mode
+ if (resourceNotUsed(resource)) {
+ long prev = group.firstResourceIndex.get();
+ if (prev == resource) {
+ group.firstResourceIndex.set(resArenaMgr.getNext(resource));
+ } else {
+ while (resArenaMgr.getNext(prev) != resource) {
+ prev = resArenaMgr.getNext(prev);
+ }
+ resArenaMgr.setNext(prev, resArenaMgr.getNext(resource));
+ }
+ resArenaMgr.deallocate(resource);
+ } else {
+ final int oldMaxMode = resArenaMgr.getMaxMode(resource);
+ final int newMaxMode = determineNewMaxMode(resource, oldMaxMode);
+ resArenaMgr.setMaxMode(resource, newMaxMode);
+ if (oldMaxMode != newMaxMode) {
+ // the locking mode didn't change, current waiters won't be
+ // able to acquire the lock, so we do not need to signal them
+ group.wakeUp();
+ }
+ }
+ } finally {
+ group.releaseLatch();
+ }
+
+ // dataset intention locks are cleaned up at the end of the job
+ }
+
+ @Override
+ public void releaseLocks(ITransactionContext txnContext) throws ACIDException {
+ log("releaseLocks", -1, -1, LockMode.NL, txnContext);
+
+ int jobId = txnContext.getJobId().getId();
+ Long jobSlot = jobIdSlotMap.get(jobId);
+ if (jobSlot == null) {
+ // we don't know the job, so there are no locks for it - we're done
+ return;
+ }
+ long holder = jobArenaMgr.getLastHolder(jobSlot);
+ while (holder != -1) {
+ long resource = reqArenaMgr.getResourceId(holder);
+ int dsId = resArenaMgr.getDatasetId(resource);
+ int pkHashVal = resArenaMgr.getPkHashVal(resource);
+ unlock(new DatasetId(dsId), pkHashVal, txnContext);
+ holder = jobArenaMgr.getLastHolder(jobSlot);
+ }
+ jobArenaMgr.deallocate(jobSlot);
+
+ //System.err.println(table.append(new StringBuilder(), true).toString());
+ //System.out.println("jobArenaMgr " + jobArenaMgr.addTo(new Stats()).toString());
+ //System.out.println("resArenaMgr " + resArenaMgr.addTo(new Stats()).toString());
+ //System.out.println("reqArenaMgr " + reqArenaMgr.addTo(new Stats()).toString());
+ }
+
+ private long findOrAllocJobSlot(int jobId) {
+ Long jobSlot = jobIdSlotMap.get(jobId);
+ if (jobSlot == null) {
+ jobSlot = new Long(jobArenaMgr.allocate());
+ jobArenaMgr.setJobId(jobSlot, jobId);
+ Long oldSlot = jobIdSlotMap.putIfAbsent(jobId, jobSlot);
+ if (oldSlot != null) {
+ // if another thread allocated a slot for this jobId between
+ // get(..) and putIfAbsent(..), we'll use that slot and
+ // deallocate the one we allocated
+ jobArenaMgr.deallocate(jobSlot);
+ jobSlot = oldSlot;
+ }
+ }
+ assert(jobSlot >= 0);
+ return jobSlot;
+ }
+
+ private long findOrAllocResourceSlot(ResourceGroup group, int dsId, int entityHashValue) {
+ long resSlot = findResourceInGroup(group, dsId, entityHashValue);
+
+ if (resSlot == -1) {
+ // we don't know about this resource, let's alloc a slot
+ resSlot = resArenaMgr.allocate();
+ resArenaMgr.setDatasetId(resSlot, dsId);
+ resArenaMgr.setPkHashVal(resSlot, entityHashValue);
+ resArenaMgr.setNext(resSlot, group.firstResourceIndex.get());
+ group.firstResourceIndex.set(resSlot);
+ }
+ return resSlot;
+ }
+
+ private long allocRequestSlot(long resSlot, long jobSlot, byte lockMode) {
+ long reqSlot = reqArenaMgr.allocate();
+ reqArenaMgr.setResourceId(reqSlot, resSlot);
+ reqArenaMgr.setLockMode(reqSlot, lockMode); // lock mode is a byte!!
+ reqArenaMgr.setJobSlot(reqSlot, jobSlot);
+ return reqSlot;
+ }
+
+ /**
+ * when we've got a lock conflict for a different job, we always have to
+ * wait, if it is for the same job we either have to
+ * a) (wait and) convert the lock once conversion becomes viable or
+ * b) acquire the lock if we want to lock the same resource with the same
+ * lock mode for the same job.
+ * @param resource the resource slot that's being locked
+ * @param job the job slot of the job locking the resource
+ * @param lockMode the lock mode that the resource should be locked with
+ * @return
+ */
+ private LockAction updateActionForSameJob(long resource, long job, byte lockMode) {
+ // TODO we can reduce the number of things we have to look at by
+ // carefully distinguishing the different lock modes
+ long holder = resArenaMgr.getLastHolder(resource);
+ LockAction res = LockAction.WAIT;
+ while (holder != -1) {
+ if (job == reqArenaMgr.getJobSlot(holder)) {
+ if (reqArenaMgr.getLockMode(holder) == lockMode) {
+ return LockAction.GET;
+ } else {
+ res = LockAction.CONV;
+ }
+ }
+ holder = reqArenaMgr.getNextRequest(holder);
+ }
+ return res;
+ }
+
+ private long findResourceInGroup(ResourceGroup group, int dsId, int entityHashValue) {
+ long resSlot = group.firstResourceIndex.get();
+ while (resSlot != -1) {
+ // either we already have a lock on this resource or we have a
+ // hash collision
+ if (resArenaMgr.getDatasetId(resSlot) == dsId &&
+ resArenaMgr.getPkHashVal(resSlot) == entityHashValue) {
+ return resSlot;
+ } else {
+ resSlot = resArenaMgr.getNext(resSlot);
+ }
+ }
+ return -1;
+ }
+
+ private void addHolder(long request, long resource, long job) {
+ long lastHolder = resArenaMgr.getLastHolder(resource);
+ reqArenaMgr.setNextRequest(request, lastHolder);
+ resArenaMgr.setLastHolder(resource, request);
+
+ synchronized (jobArenaMgr) {
+ long lastJobHolder = jobArenaMgr.getLastHolder(job);
+ insertIntoJobQueue(request, lastJobHolder);
+ jobArenaMgr.setLastHolder(job, request);
+ }
+ }
+
+ private long removeLastHolder(long resource, long jobSlot) {
+ long holder = resArenaMgr.getLastHolder(resource);
+ if (holder < 0) {
+ throw new IllegalStateException("no holder for resource " + resource);
+ }
+
+ // remove from the list of holders for a resource
+ if (reqArenaMgr.getJobSlot(holder) == jobSlot) {
+ // if the head of the queue matches, we need to update the resource
+ long next = reqArenaMgr.getNextRequest(holder);
+ resArenaMgr.setLastHolder(resource, next);
+ } else {
+ holder = removeRequestFromQueueForJob(holder, jobSlot);
+ }
+
+ synchronized (jobArenaMgr) {
+ // remove from the list of requests for a job
+ long newHead = removeRequestFromJob(jobSlot, holder);
+ jobArenaMgr.setLastHolder(jobSlot, newHead);
+ }
+ return holder;
+ }
+
+ private long removeRequestFromJob(long jobSlot, long holder) {
+ long prevForJob = reqArenaMgr.getPrevJobRequest(holder);
+ long nextForJob = reqArenaMgr.getNextJobRequest(holder);
+ if (nextForJob != -1) {
+ reqArenaMgr.setPrevJobRequest(nextForJob, prevForJob);
+ }
+ if (prevForJob == -1) {
+ return nextForJob;
+ } else {
+ reqArenaMgr.setNextJobRequest(prevForJob, nextForJob);
+ return -1;
+ }
+ }
+
+ private void addWaiter(long request, long resource, long job) {
+ long waiter = resArenaMgr.getFirstWaiter(resource);
+ reqArenaMgr.setNextRequest(request, -1);
+ if (waiter == -1) {
+ resArenaMgr.setFirstWaiter(resource, request);
+ } else {
+ appendToRequestQueue(waiter, request);
+ }
+
+ synchronized (jobArenaMgr) {
+ waiter = jobArenaMgr.getLastWaiter(job);
+ insertIntoJobQueue(request, waiter);
+ jobArenaMgr.setLastWaiter(job, request);
+ }
+ }
+
+ private void removeWaiter(long request, long resource, long job) {
+ long waiter = resArenaMgr.getFirstWaiter(resource);
+ if (waiter == request) {
+ long next = reqArenaMgr.getNextRequest(waiter);
+ resArenaMgr.setFirstWaiter(resource, next);
+ } else {
+ waiter = removeRequestFromQueueForSlot(waiter, request);
+ }
+ synchronized (jobArenaMgr) {
+ // remove from the list of requests for a job
+ long newHead = removeRequestFromJob(job, waiter);
+ jobArenaMgr.setLastWaiter(job, newHead);
+ }
+ }
+
+ private void addUpgrader(long request, long resource, long job) {
+ long upgrader = resArenaMgr.getFirstUpgrader(resource);
+ reqArenaMgr.setNextRequest(request, -1);
+ if (upgrader == -1) {
+ resArenaMgr.setFirstUpgrader(resource, request);
+ } else {
+ appendToRequestQueue(upgrader, request);
+ }
+ synchronized (jobArenaMgr) {
+ upgrader = jobArenaMgr.getLastUpgrader(job);
+ insertIntoJobQueue(request, upgrader);
+ jobArenaMgr.setLastUpgrader(job, request);
+ }
+ }
+
+ private void removeUpgrader(long request, long resource, long job) {
+ long upgrader = resArenaMgr.getFirstUpgrader(resource);
+ if (upgrader == request) {
+ long next = reqArenaMgr.getNextRequest(upgrader);
+ resArenaMgr.setFirstUpgrader(resource, next);
+ } else {
+ upgrader = removeRequestFromQueueForSlot(upgrader, request);
+ }
+ synchronized (jobArenaMgr) {
+ // remove from the list of requests for a job
+ long newHead = removeRequestFromJob(job, upgrader);
+ jobArenaMgr.setLastUpgrader(job, newHead);
+ }
+ }
+
+ private void insertIntoJobQueue(long newRequest, long oldRequest) {
+ reqArenaMgr.setNextJobRequest(newRequest, oldRequest);
+ reqArenaMgr.setPrevJobRequest(newRequest, -1);
+ if (oldRequest >= 0) {
+ reqArenaMgr.setPrevJobRequest(oldRequest, newRequest);
+ }
+ }
+
+ private void appendToRequestQueue(long head, long appendee) {
+ long next = reqArenaMgr.getNextRequest(head);
+ while(next != -1) {
+ head = next;
+ next = reqArenaMgr.getNextRequest(head);
+ }
+ reqArenaMgr.setNextRequest(head, appendee);
+ }
+
+ /**
+ *
+ * @param head
+ * @param reqSlot
+ * @return
+ */
+ private long removeRequestFromQueueForSlot(long head, long reqSlot) {
+ long cur = head;
+ long prev = cur;
+ while (prev != -1) {
+ cur = reqArenaMgr.getNextRequest(prev);
+ if (cur == -1) {
+ throw new IllegalStateException("request " + reqSlot+ " not in queue");
+ }
+ if (cur == reqSlot) {
+ break;
+ }
+ prev = cur;
+ }
+ long next = reqArenaMgr.getNextRequest(cur);
+ reqArenaMgr.setNextRequest(prev, next);
+ return cur;
+ }
+
+ /**
+ * remove the first request for a given job from a request queue
+ * @param head the head of the request queue
+ * @param jobSlot the job slot
+ * @return the slot of the first request that matched the given job
+ */
+ private long removeRequestFromQueueForJob(long head, long jobSlot) {
+ long holder = head;
+ long prev = holder;
+ while (prev != -1) {
+ holder = reqArenaMgr.getNextRequest(prev);
+ if (holder == -1) {
+ throw new IllegalStateException("no entry for job " + jobSlot + " in queue");
+ }
+ if (jobSlot == reqArenaMgr.getJobSlot(holder)) {
+ break;
+ }
+ prev = holder;
+ }
+ long next = reqArenaMgr.getNextRequest(holder);
+ reqArenaMgr.setNextRequest(prev, next);
+ return holder;
+ }
+
+ private int determineNewMaxMode(long resource, int oldMaxMode) {
+ int newMaxMode = LockMode.NL;
+ long holder = resArenaMgr.getLastHolder(resource);
+ while (holder != -1) {
+ int curLockMode = reqArenaMgr.getLockMode(holder);
+ if (curLockMode == oldMaxMode) {
+ // we have another lock of the same mode - we're done
+ return oldMaxMode;
+ }
+ switch (ACTION_MATRIX[newMaxMode][curLockMode]) {
+ case UPD:
+ newMaxMode = curLockMode;
+ break;
+ case GET:
+ break;
+ case WAIT:
+ throw new IllegalStateException("incompatible locks in holder queue");
+ }
+ holder = reqArenaMgr.getNextRequest(holder);
+ }
+ return newMaxMode;
+ }
+
+ private boolean resourceNotUsed(long resource) {
+ return resArenaMgr.getLastHolder(resource) == -1
+ && resArenaMgr.getFirstUpgrader(resource) == -1
+ && resArenaMgr.getFirstWaiter(resource) == -1;
+ }
+
+ private void log(String string, int id, int entityHashValue, byte lockMode, ITransactionContext txnContext) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{ op : ").append(string);
+ if (id != -1) {
+ sb.append(" , dataset : ").append(id);
+ }
+ if (entityHashValue != -1) {
+ sb.append(" , entity : ").append(entityHashValue);
+ }
+ if (lockMode != LockMode.NL) {
+ sb.append(" , mode : ").append(LockMode.toString(lockMode));
+ }
+ if (txnContext != null) {
+ sb.append(" , jobId : ").append(txnContext.getJobId());
+ }
+ sb.append(" }");
+ //System.err.println("XXX" + sb.toString());
+ }
+
+ private void validateJob(ITransactionContext txnContext) throws ACIDException {
+ if (txnContext.getTxnState() == ITransactionManager.ABORTED) {
+ throw new ACIDException("" + txnContext.getJobId() + " is in ABORTED state.");
+ } else if (txnContext.isTimeout()) {
+ requestAbort(txnContext);
+ }
+ }
+
+ private void requestAbort(ITransactionContext txnContext) throws ACIDException {
+ txnContext.setTimeout(true);
+ throw new ACIDException("Transaction " + txnContext.getJobId()
+ + " should abort (requested by the Lock Manager)");
+ }
+
+ public StringBuilder append(StringBuilder sb) {
+ sb.append(">>dump_begin\t>>----- [resTable] -----\n");
+ table.append(sb);
+ sb.append(">>dump_end\t>>----- [resTable] -----\n");
+
+ sb.append(">>dump_begin\t>>----- [resArenaMgr] -----\n");
+ resArenaMgr.append(sb);
+ sb.append(">>dump_end\t>>----- [resArenaMgr] -----\n");
+
+ sb.append(">>dump_begin\t>>----- [reqArenaMgr] -----\n");
+ reqArenaMgr.append(sb);
+ sb.append(">>dump_end\t>>----- [reqArenaMgr] -----\n");
+
+ sb.append(">>dump_begin\t>>----- [jobIdSlotMap] -----\n");
+ for(Integer i : jobIdSlotMap.keySet()) {
+ sb.append(i).append(" : ");
+ RecordManagerTypes.Global.append(sb, jobIdSlotMap.get(i));
+ sb.append("\n");
+ }
+ sb.append(">>dump_end\t>>----- [jobIdSlotMap] -----\n");
+
+ sb.append(">>dump_begin\t>>----- [jobArenaMgr] -----\n");
+ jobArenaMgr.append(sb);
+ sb.append(">>dump_end\t>>----- [jobArenaMgr] -----\n");
+
+ return sb;
+ }
+
+ public String toString() {
+ return append(new StringBuilder()).toString();
+ }
+
+ @Override
+ public String prettyPrint() throws ACIDException {
+ StringBuilder s = new StringBuilder("\n########### LockManager Status #############\n");
+ return append(s).toString() + "\n";
+ }
+
+ @Override
+ public void start() {
+ //no op
+ }
+
+ @Override
+ public void stop(boolean dumpState, OutputStream os) {
+ if (dumpState) {
+ try {
+ os.write(toString().getBytes());
+ os.flush();
+ } catch (IOException e) {
+ //ignore
+ }
+ }
+ }
+
+ private static class DatasetLockCache {
+ private long jobId = -1;
+ private HashMap<Integer,Byte> lockCache = new HashMap<Integer,Byte>();
+
+ public boolean contains(final int jobId, final int dsId, byte dsLockMode) {
+ if (this.jobId == jobId) {
+ final Byte cachedLockMode = this.lockCache.get(dsId);
+ if (cachedLockMode != null && cachedLockMode == dsLockMode) {
+ return true;
+ }
+ } else {
+ this.jobId = -1;
+ this.lockCache.clear();
+ }
+ return false;
+ }
+
+ public void put(final int jobId, final int dsId, byte dsLockMode) {
+ this.jobId = jobId;
+ this.lockCache.put(dsId, dsLockMode);
+ }
+
+ public String toString() {
+ return "[ " + jobId + " : " + lockCache.toString() + "]";
+ }
+ }
+
+ private static class ResourceGroupTable {
+ public static final int TABLE_SIZE = 1024; // TODO increase?
+
+ private ResourceGroup[] table;
+
+ public ResourceGroupTable() {
+ table = new ResourceGroup[TABLE_SIZE];
+ for (int i = 0; i < TABLE_SIZE; ++i) {
+ table[i] = new ResourceGroup();
+ }
+ }
+
+ ResourceGroup get(DatasetId dId, int entityHashValue) {
+ // TODO ensure good properties of hash function
+ int h = Math.abs(dId.getId() ^ entityHashValue);
+ return table[h % TABLE_SIZE];
+ }
+
+ public StringBuilder append(StringBuilder sb) {
+ return append(sb, false);
+ }
+
+ public StringBuilder append(StringBuilder sb, boolean detail) {
+ for (int i = 0; i < table.length; ++i) {
+ sb.append(i).append(" : ");
+ if (detail) {
+ sb.append(table[i]);
+ } else {
+ sb.append(table[i].firstResourceIndex);
+ }
+ sb.append('\n');
+ }
+ return sb;
+ }
+ }
+
+ private static class ResourceGroup {
+ private ReentrantReadWriteLock latch;
+ private Condition condition;
+ AtomicLong firstResourceIndex;
+
+ ResourceGroup() {
+ latch = new ReentrantReadWriteLock();
+ condition = latch.writeLock().newCondition();
+ firstResourceIndex = new AtomicLong(-1);
+ }
+
+ void getLatch() {
+ log("latch " + toString());
+ latch.writeLock().lock();
+ }
+
+ void releaseLatch() {
+ log("release " + toString());
+ latch.writeLock().unlock();
+ }
+
+ boolean hasWaiters() {
+ return latch.hasQueuedThreads();
+ }
+
+ void await(ITransactionContext txnContext) throws ACIDException {
+ log("wait for " + toString());
+ try {
+ condition.await();
+ } catch (InterruptedException e) {
+ throw new ACIDException(txnContext, "interrupted", e);
+ }
+ }
+
+ void wakeUp() {
+ log("notify " + toString());
+ condition.signalAll();
+ }
+
+ void log(String s) {
+ //System.out.println("XXXX " + s);
+ }
+
+ public String toString() {
+ return "{ id : " + hashCode()
+ + ", first : " + firstResourceIndex.toString()
+ + ", waiters : " + (hasWaiters() ? "true" : "false") + " }";
+ }
+ }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
index e61cb55..ea2d4de 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
@@ -24,6 +24,7 @@
import org.apache.commons.io.FileUtils;
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
@@ -32,6 +33,7 @@
import edu.uci.ics.asterix.common.transactions.ILockManager;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -62,7 +64,7 @@
ArrayList<LockRequest> requestList;
ArrayList<ArrayList<Integer>> expectedResultList;
int resultListIndex;
- LockManager lockMgr;
+ ILockManager lockMgr;
String requestFileName;
long defaultWaitTime;
@@ -72,7 +74,7 @@
this.workerReadyQueue = new WorkerReadyQueue();
this.requestList = new ArrayList<LockRequest>();
this.expectedResultList = new ArrayList<ArrayList<Integer>>();
- this.lockMgr = (LockManager) txnProvider.getLockManager();
+ this.lockMgr = txnProvider.getLockManager();
this.requestFileName = new String(requestFileName);
this.resultListIndex = 0;
this.defaultWaitTime = 10;
@@ -151,6 +153,8 @@
if (isSuccess) {
log("\n*** Test Passed ***");
}
+ ((LogManager) txnProvider.getLogManager()).stop(false, null);
+ AsterixThreadExecutor.INSTANCE.shutdown();
}
public boolean handleRequest(LockRequest request) throws ACIDException {
@@ -511,6 +515,18 @@
public void log(String s) {
System.out.println(s);
}
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{ t : \"").append(threadName).append("\", r : ");
+ if (lockRequest == null) {
+ sb.append("null");
+ } else {
+ sb.append("\"").append(lockRequest.toString()).append("\"");
+ }
+ sb.append(" }");
+ return sb.toString();
+ }
}
class WorkerReadyQueue {
@@ -618,7 +634,7 @@
} catch (InterruptedException e) {
e.printStackTrace();
}
- log(Thread.currentThread().getName() + "Waiting for worker to finish its task...");
+ log(Thread.currentThread().getName() + " Waiting for worker to finish its task...");
queueSize = workerReadyQueue.size();
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
index e6f2798..11c48a7 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
@@ -14,9 +14,14 @@
*/
package edu.uci.ics.asterix.transaction.management.service.locking;
+import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Random;
+import org.apache.commons.io.FileUtils;
+
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
@@ -26,6 +31,7 @@
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -45,9 +51,16 @@
private static int jobId = 0;
private static Random rand;
- public static void main(String args[]) throws ACIDException, AsterixException {
+ public static void main(String args[]) throws ACIDException, AsterixException, IOException {
int i;
- TransactionSubsystem txnProvider = new TransactionSubsystem("LockManagerRandomUnitTest", null,
+ //prepare configuration file
+ File cwd = new File(System.getProperty("user.dir"));
+ File asterixdbDir = cwd.getParentFile();
+ File srcFile = new File(asterixdbDir.getAbsoluteFile(), "asterix-app/src/main/resources/asterix-build-configuration.xml");
+ File destFile = new File(cwd, "target/classes/asterix-configuration.xml");
+ FileUtils.copyFile(srcFile, destFile);
+
+ TransactionSubsystem txnProvider = new TransactionSubsystem("nc1", null,
new AsterixTransactionProperties(new AsterixPropertiesAccessor()));
rand = new Random(System.currentTimeMillis());
for (i = 0; i < MAX_NUM_OF_ENTITY_LOCK_JOB; i++) {
@@ -64,6 +77,8 @@
System.out.println("Creating " + i + "th EntityLockUpgradeJob..");
generateEntityLockUpgradeThread(txnProvider);
}
+ ((LogManager) txnProvider.getLogManager()).stop(false, null);
+ AsterixThreadExecutor.INSTANCE.shutdown();
}
private static void generateEntityLockThread(TransactionSubsystem txnProvider) {
@@ -555,6 +570,11 @@
this.entityHashValue = waitTime;
}
+ @Override
+ public String toString() {
+ return prettyPrint();
+ }
+
public String prettyPrint() {
StringBuilder s = new StringBuilder();
//s.append(threadName.charAt(7)).append("\t").append("\t");
@@ -595,23 +615,7 @@
}
s.append("\tJ").append(txnContext.getJobId().getId()).append("\tD").append(datasetIdObj.getId()).append("\tE")
.append(entityHashValue).append("\t");
- switch (lockMode) {
- case LockMode.S:
- s.append("S");
- break;
- case LockMode.X:
- s.append("X");
- break;
- case LockMode.IS:
- s.append("IS");
- break;
- case LockMode.IX:
- s.append("IX");
- break;
- default:
- throw new UnsupportedOperationException("Unsupported lock mode");
- }
- s.append("\n");
+ s.append(LockMode.toString(lockMode)).append("\n");
return s.toString();
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RecordManagerTypes.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RecordManagerTypes.java
new file mode 100644
index 0000000..b467cc3
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RecordManagerTypes.java
@@ -0,0 +1,51 @@
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+public class RecordManagerTypes {
+
+ public static class Byte {
+ public static StringBuilder append(StringBuilder sb, byte b) {
+ return sb.append(String.format("%1$18x", b));
+ }
+ }
+
+ public static class Short {
+ public static StringBuilder append(StringBuilder sb, short s) {
+ return sb.append(String.format("%1$18x", s));
+ }
+ }
+
+ public static class Int {
+ public static StringBuilder append(StringBuilder sb, int i) {
+ return sb.append(String.format("%1$18x", i));
+ }
+ }
+
+ public static class Global {
+
+ public static int arenaId(long l) {
+ return (int)((l >>> 48) & 0xffff);
+ }
+
+ public static int allocId(long l) {
+ return (int)((l >>> 32) & 0xffff);
+ }
+
+ public static int localId(long l) {
+ return (int) (l & 0xffffffffL);
+ }
+
+ public static StringBuilder append(StringBuilder sb, long l) {
+ sb.append(String.format("%1$4x", RecordManagerTypes.Global.arenaId(l)));
+ sb.append(':');
+ sb.append(String.format("%1$4x", RecordManagerTypes.Global.allocId(l)));
+ sb.append(':');
+ sb.append(String.format("%1$8x", RecordManagerTypes.Global.localId(l)));
+ return sb;
+ }
+
+ public static String toString(long l) {
+ return append(new StringBuilder(), l).toString();
+ }
+
+ }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Stats.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Stats.java
new file mode 100644
index 0000000..ff2c92a
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Stats.java
@@ -0,0 +1,19 @@
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+public class Stats {
+ int arenas = 0;
+ int buffers = 0;
+ int slots = 0;
+ int items = 0;
+ int size = 0;
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{ arenas : ").append(arenas);
+ sb.append(", buffers : ").append(buffers);
+ sb.append(", slots : ").append(slots);
+ sb.append(", items : ").append(items);
+ sb.append(", size : ").append(size).append(" }");
+ return sb.toString();
+ }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 933afcd..4f13b9f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -41,7 +41,6 @@
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.LogManagerProperties;
import edu.uci.ics.asterix.common.transactions.MutableLong;
-import edu.uci.ics.asterix.transaction.management.service.locking.LockManager;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
@@ -82,7 +81,7 @@
emptyQ = new LinkedBlockingQueue<LogPage>(numLogPages);
flushQ = new LinkedBlockingQueue<LogPage>(numLogPages);
for (int i = 0; i < numLogPages; i++) {
- emptyQ.offer(new LogPage((LockManager) txnSubsystem.getLockManager(), logPageSize, flushLSN));
+ emptyQ.offer(new LogPage(txnSubsystem, logPageSize, flushLSN));
}
appendLSN = initializeLogAnchor(nextLogFileId);
flushLSN.set(appendLSN);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
index a3f42a7..75a74c9 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
@@ -22,16 +22,20 @@
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.ILogPage;
import edu.uci.ics.asterix.common.transactions.ILogRecord;
+import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.common.transactions.MutableLong;
import edu.uci.ics.asterix.transaction.management.service.locking.LockManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
public class LogPage implements ILogPage {
public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(LogPage.class.getName());
- private final LockManager lockMgr;
+ private final TransactionSubsystem txnSubsystem;
private final LogPageReader logPageReader;
private final int logPageSize;
private final MutableLong flushLSN;
@@ -46,8 +50,8 @@
private FileChannel fileChannel;
private boolean stop;
- public LogPage(LockManager lockMgr, int logPageSize, MutableLong flushLSN) {
- this.lockMgr = lockMgr;
+ public LogPage(TransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLSN) {
+ this.txnSubsystem = txnSubsystem;
this.logPageSize = logPageSize;
this.flushLSN = flushLSN;
appendBuffer = ByteBuffer.allocate(logPageSize);
@@ -187,7 +191,27 @@
private void batchUnlock(int beginOffset, int endOffset) throws ACIDException {
if (endOffset > beginOffset) {
logPageReader.initializeScan(beginOffset, endOffset);
- lockMgr.batchUnlock(this, logPageReader);
+
+ DatasetId dsId = new DatasetId(-1);
+ JobId jId = new JobId(-1);
+ ITransactionContext txnCtx = null;
+
+ LogRecord logRecord = logPageReader.next();
+ while (logRecord != null) {
+ if (logRecord.getLogType() == LogType.ENTITY_COMMIT) {
+ dsId.setId(logRecord.getDatasetId());
+ jId.setId(logRecord.getJobId());
+ txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jId, false);
+ txnSubsystem.getLockManager().unlock(dsId, logRecord.getPKHashValue(), txnCtx);
+ txnCtx.notifyOptracker(false);
+ } else if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
+ jId.setId(logRecord.getJobId());
+ txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jId, false);
+ txnCtx.notifyOptracker(true);
+ notifyJobTerminator();
+ }
+ logRecord = logPageReader.next();
+ }
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
index 78bca42..1a73fe0 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
@@ -30,16 +30,23 @@
}
public static class LockManagerConstants {
- public static final String LOCK_CONF_DIR = "lock_conf";
- public static final String LOCK_CONF_FILE = "lock.conf";
- public static final int[] LOCK_CONFLICT_MATRIX = new int[] { 2, 3 };
- public static final int[] LOCK_CONVERT_MATRIX = new int[] { 2, 0 };
-
public static class LockMode {
- public static final byte S = 0;
- public static final byte X = 1;
- public static final byte IS = 2;
- public static final byte IX = 3;
+ public static final byte NL = 0;
+ public static final byte IS = 1;
+ public static final byte IX = 2;
+ public static final byte S = 3;
+ public static final byte X = 4;
+
+ public static String toString(byte mode) {
+ switch (mode) {
+ case NL: return "NL";
+ case IS: return "IS";
+ case IX: return "IX";
+ case S: return "S";
+ case X: return "X";
+ default: throw new IllegalArgumentException("no such lock mode");
+ }
+ }
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index aceeb82..4cb5fac 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -22,7 +22,7 @@
import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
-import edu.uci.ics.asterix.transaction.management.service.locking.LockManager;
+import edu.uci.ics.asterix.transaction.management.service.locking.ConcurrentLockManager;
import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
import edu.uci.ics.asterix.transaction.management.service.recovery.CheckpointThread;
import edu.uci.ics.asterix.transaction.management.service.recovery.RecoveryManager;
@@ -46,7 +46,7 @@
this.id = id;
this.txnProperties = txnProperties;
this.transactionManager = new TransactionManager(this);
- this.lockManager = new LockManager(this);
+ this.lockManager = new ConcurrentLockManager(this);
this.logManager = new LogManager(this);
this.recoveryManager = new RecoveryManager(this);
this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;