Merge branch 'master' into westmann/locks
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 236df53..ccbf68f 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -17,6 +17,7 @@
import java.io.File;
import java.util.EnumSet;
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.hyracks.bootstrap.CCApplicationEntryPoint;
import edu.uci.ics.asterix.hyracks.bootstrap.NCApplicationEntryPoint;
@@ -99,6 +100,7 @@
nc2.stop();
nc1.stop();
cc.stop();
+ AsterixThreadExecutor.INSTANCE.shutdown();
}
public static void runJob(JobSpecification spec) throws Exception {
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
index edd4b2a..fb87f5e 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
@@ -36,4 +36,8 @@
public Future<Object> submit(Callable command) {
return (Future<Object>) executorService.submit(command);
}
+
+ public void shutdown() {
+ executorService.shutdown();
+ }
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
index a752afa..410f207 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
@@ -61,12 +61,13 @@
/**
* @param datasetId
* @param entityHashValue
+ * @param lockMode
* @param txnContext
* @throws ACIDException
* TODO
* @return
*/
- public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext)
+ public void unlock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
throws ACIDException;
/**
diff --git a/asterix-maven-plugins/pom.xml b/asterix-maven-plugins/pom.xml
index 56bc697..b26a6d6 100644
--- a/asterix-maven-plugins/pom.xml
+++ b/asterix-maven-plugins/pom.xml
@@ -35,5 +35,6 @@
<modules>
<module>lexer-generator-maven-plugin</module>
+ <module>record-manager-generator-maven-plugin</module>
</modules>
</project>
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/pom.xml b/asterix-maven-plugins/record-manager-generator-maven-plugin/pom.xml
new file mode 100644
index 0000000..a4470d6
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/pom.xml
@@ -0,0 +1,72 @@
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <artifactId>record-manager-generator-maven-plugin</artifactId>
+ <parent>
+ <artifactId>asterix-maven-plugins</artifactId>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <version>0.8.1-SNAPSHOT</version>
+ </parent>
+
+ <packaging>maven-plugin</packaging>
+ <name>record-manager-generator-maven-plugin</name>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ <fork>true</fork>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.maven</groupId>
+ <artifactId>maven-plugin-api</artifactId>
+ <version>2.0.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.maven</groupId>
+ <artifactId>maven-artifact</artifactId>
+ <version>2.0.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.maven</groupId>
+ <artifactId>maven-project</artifactId>
+ <version>2.0.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.json</groupId>
+ <artifactId>json</artifactId>
+ <version>20090211</version>
+ <type>jar</type>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java
new file mode 100644
index 0000000..14d8a7e
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.recordmanagergenerator;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import edu.uci.ics.asterix.recordmanagergenerator.RecordType.Field;
+
+public class Generator {
+
+ public enum TemplateType {
+ RECORD_MANAGER,
+ ARENA_MANAGER,
+ SUPPORT
+ }
+
+ public static void generateSource(
+ TemplateType tmplType,
+ String packageName,
+ RecordType rec,
+ InputStream is,
+ StringBuilder sb,
+ boolean debug) {
+ try {
+ BufferedReader in = new BufferedReader(new InputStreamReader(is));
+
+ switch (tmplType) {
+ case RECORD_MANAGER:
+ generateMemoryManagerSource(packageName, rec, in, sb, debug);
+ break;
+ case ARENA_MANAGER:
+ generateArenaManagerSource(packageName, rec, in, sb, debug);
+ break;
+ case SUPPORT:
+ generateSupportFileSource(packageName, in, sb, debug);
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ }
+
+ }
+
+ private static void generateMemoryManagerSource(
+ String packageName,
+ RecordType resource,
+ BufferedReader in,
+ StringBuilder sb,
+ boolean debug) throws IOException {
+ String line = null;
+ String indent = " ";
+
+ while((line = in.readLine()) != null) {
+ if (line.contains("@PACKAGE@")) {
+ line = line.replace("@PACKAGE@", packageName);
+ }
+ if (line.contains("@E@")) {
+ line = line.replace("@E@", resource.name);
+ }
+ if (line.contains("@DEBUG@")) {
+ line = line.replace("@DEBUG@", Boolean.toString(debug));
+ }
+ if (line.contains("@CONSTS@")) {
+ resource.appendConstants(sb, indent, 1);
+ sb.append('\n');
+ } else if (line.contains("@METHODS@")) {
+ for (int i = 0; i < resource.size(); ++i) {
+ final Field field = resource.fields.get(i);
+ if (field.accessible) {
+ field.appendMemoryManagerGetMethod(sb, indent, 1);
+ sb.append('\n');
+ field.appendMemoryManagerSetMethod(sb, indent, 1);
+ sb.append('\n');
+ }
+ }
+ } else if (line.contains("@INIT_SLOT@")) {
+ for (int i = 0; i < resource.size(); ++i) {
+ final Field field = resource.fields.get(i);
+ field.appendInitializers(sb, indent, 3);
+ }
+ } else if (line.contains("@CHECK_SLOT@")) {
+ for (int i = 0; i < resource.size(); ++i) {
+ final Field field = resource.fields.get(i);
+ field.appendChecks(sb, indent, 3);
+ }
+ } else if (line.contains("@PRINT_BUFFER@")) {
+ resource.appendBufferPrinter(sb, indent, 3);
+ sb.append('\n');
+ } else {
+ sb.append(line).append('\n');
+ }
+ }
+ }
+
+ private static void generateArenaManagerSource(
+ String packageName,
+ RecordType resource,
+ BufferedReader in,
+ StringBuilder sb,
+ boolean debug) throws IOException {
+ String line = null;
+ String indent = " ";
+
+ while((line = in.readLine()) != null) {
+ if (line.contains("@PACKAGE@")) {
+ line = line.replace("@PACKAGE@", packageName);
+ }
+ if (line.contains("@E@")) {
+ line = line.replace("@E@", resource.name);
+ }
+ if (line.contains("@DEBUG@")) {
+ line = line.replace("@DEBUG@", Boolean.toString(debug));
+ }
+ if (line.contains("@METHODS@")) {
+ for (int i = 0; i < resource.size(); ++i) {
+ final Field field = resource.fields.get(i);
+ if (field.accessible) {
+ field.appendArenaManagerGetMethod(sb, indent, 1);
+ sb.append('\n');
+ field.appendArenaManagerSetMethod(sb, indent, 1);
+ sb.append('\n');
+ }
+ }
+ } else {
+ sb.append(line).append('\n');
+ }
+ }
+ }
+
+ private static void generateSupportFileSource(
+ String packageName,
+ BufferedReader in,
+ StringBuilder sb,
+ boolean debug) throws IOException {
+ String line = null;
+ while((line = in.readLine()) != null) {
+ if (line.contains("@PACKAGE@")) {
+ line = line.replace("@PACKAGE@", packageName);
+ }
+ sb.append(line).append('\n');
+ }
+ }
+}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java
new file mode 100644
index 0000000..79221c1
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.recordmanagergenerator;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.InputStream;
+import java.io.Reader;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.maven.plugin.AbstractMojo;
+import org.apache.maven.plugin.MojoExecutionException;
+import org.apache.maven.plugin.MojoFailureException;
+import org.apache.maven.project.MavenProject;
+import org.json.JSONException;
+
+/**
+ * @goal generate-record-manager
+ * @phase generate-sources
+ * @requiresDependencyResolution compile
+ */
+public class RecordManagerGeneratorMojo extends AbstractMojo {
+
+ /**
+ * parameter injected from pom.xml
+ *
+ * @parameter
+ */
+ private boolean debug;
+ /**
+ * parameter injected from pom.xml
+ *
+ * @parameter
+ * @required
+ */
+ private String packageName;
+ /**
+ * parameter injected from pom.xml
+ *
+ * @parameter
+ * @required
+ */
+ private File[] inputFiles;
+ /**
+ * @parameter default-value="${project}"
+ * @required
+ * @readonly
+ */
+ MavenProject project;
+
+
+ String recordManagerTemplate = "RecordManager.java";
+ String arenaManagerTemplate = "ArenaManager.java";
+ String[] supportTemplates = { "Stats.java", "AllocInfo.java", "TypeUtil.java" };
+
+ private Map<String, RecordType> typeMap;
+
+ public RecordManagerGeneratorMojo() {
+ }
+
+ private void readRecordTypes() throws MojoExecutionException {
+ if (debug) {
+ getLog().info("generating debug code");
+ }
+
+ typeMap = new HashMap<String, RecordType>();
+
+ for (int i = 0; i < inputFiles.length; ++i) {
+ try {
+ getLog().info("reading " + inputFiles[i].toString());
+ Reader read = new FileReader(inputFiles[i]);
+ RecordType type = RecordType.read(read);
+ // always add allocId to enable tracking of allocations
+ type.addField("alloc id", RecordType.Type.SHORT, null);
+ type.addToMap(typeMap);
+ } catch (FileNotFoundException fnfe) {
+ throw new MojoExecutionException("cound not find type description file " + inputFiles[i], fnfe);
+ } catch (JSONException jse) {
+ throw new MojoExecutionException("cound not parse type description file " + inputFiles[i], jse);
+ }
+ }
+ }
+
+ public void execute() throws MojoExecutionException, MojoFailureException {
+ String outputPath = project.getBuild().getDirectory() + File.separator
+ + "generated-sources" + File.separator
+ + "java" + File.separator
+ + packageName.replace('.', File.separatorChar);
+ File dir = new File(outputPath);
+ if (!dir.exists()) {
+ dir.mkdirs();
+ }
+
+ readRecordTypes();
+
+ for (String recordType : typeMap.keySet()) {
+ generateSource(Generator.TemplateType.RECORD_MANAGER, recordManagerTemplate, recordType, outputPath);
+ generateSource(Generator.TemplateType.ARENA_MANAGER, arenaManagerTemplate, recordType, outputPath);
+ }
+
+ for (int i = 0; i < supportTemplates.length; ++i) {
+ generateSource(Generator.TemplateType.SUPPORT, supportTemplates[i], "", outputPath);
+ }
+ }
+
+ private void generateSource(Generator.TemplateType mgrType, String template, String recordType, String outputPath) throws MojoFailureException {
+ InputStream is = getClass().getClassLoader().getResourceAsStream(template);
+ if (is == null) {
+ throw new MojoFailureException("template '" + template + "' not found in classpath");
+ }
+
+ StringBuilder sb = new StringBuilder();
+ File outputFile = new File(outputPath + File.separator + recordType + template);
+
+ try {
+ getLog().info("generating " + outputFile.toString());
+
+ Generator.generateSource(mgrType, packageName, typeMap.get(recordType), is, sb, debug);
+ is.close();
+
+ FileWriter outWriter = new FileWriter(outputFile);
+ outWriter.write(sb.toString());
+ outWriter.close();
+ } catch (Exception ex) {
+ getLog().error(ex);
+ throw new MojoFailureException("failed to generate " + outputFile.toString());
+ }
+ }
+}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java
new file mode 100644
index 0000000..a0f6c61
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java
@@ -0,0 +1,388 @@
+/*
+ * Copyright 2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.recordmanagergenerator;
+
+import java.io.Reader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Map;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+
+public class RecordType {
+
+ enum Type {
+ BYTE (1, "byte", "get", "put", "(byte)0xde", "TypeUtil.Byte.append"),
+ SHORT (2, "short", "getShort", "putShort", "(short)0xdead", "TypeUtil.Short.append"),
+ INT (4, "int", "getInt", "putInt", "0xdeadbeef", "TypeUtil.Int.append"),
+ GLOBAL(8, "long", "getLong", "putLong", "0xdeadbeefdeadbeefl", "TypeUtil.Global.append");
+
+ Type(int size, String javaType, String bbGetter, String bbSetter, String deadMemInitializer, String appender) {
+ this.size = size;
+ this.javaType = javaType;
+ this.bbGetter = bbGetter;
+ this.bbSetter = bbSetter;
+ this.deadMemInitializer = deadMemInitializer;
+ this.appender = appender;
+ }
+
+ int size;
+ String javaType;
+ String bbGetter;
+ String bbSetter;
+ String deadMemInitializer;
+ String appender;
+ }
+
+ static class Field {
+
+ String name;
+ Type type;
+ String initial;
+ int offset;
+ boolean accessible = true;
+
+ Field(String name, Type type, String initial, int offset, boolean accessible) {
+ this.name = name;
+ this.type = type;
+ this.initial = initial;
+ this.offset = offset;
+ this.accessible = accessible;
+ }
+
+ public static Field fromJSON(JSONObject obj) throws JSONException {
+ String name = obj.getString("name");
+ Type type = parseType(obj.getString("type"));
+ String initial = obj.optString("initial", null);
+ return new Field(name, type, initial, -1, true);
+ }
+
+ private static Type parseType(String string) {
+ string = string.toUpperCase();
+ if (string.equals("GLOBAL")) {
+ return Type.GLOBAL;
+ } else if (string.equals("INT")) {
+ return Type.INT;
+ } else if (string.equals("SHORT")) {
+ return Type.SHORT;
+ } else if (string.equals("BYTE")) {
+ return Type.BYTE;
+ }
+ throw new IllegalArgumentException("Unknown type \"" + string + "\"");
+ }
+
+ String methodName(String prefix) {
+ String words[] = name.split(" ");
+ assert(words.length > 0);
+ StringBuilder sb = new StringBuilder(prefix);
+ for (int j = 0; j < words.length; ++j) {
+ String word = words[j];
+ sb.append(word.substring(0, 1).toUpperCase());
+ sb.append(word.substring(1));
+ }
+ return sb.toString();
+ }
+
+ StringBuilder appendMemoryManagerGetMethod(StringBuilder sb, String indent, int level) {
+ sb = indent(sb, indent, level);
+ sb.append("public ")
+ .append(type.javaType)
+ .append(' ')
+ .append(methodName("get"))
+ .append("(int slotNum) {\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("final Buffer buf = buffers.get(slotNum / NO_SLOTS);\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("buf.checkSlot(slotNum % NO_SLOTS);\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("final ByteBuffer b = buf.bb;\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("return b.")
+ .append(type.bbGetter)
+ .append("((slotNum % NO_SLOTS) * ITEM_SIZE + ")
+ .append(offsetName())
+ .append(");\n");
+ sb = indent(sb, indent, level);
+ sb.append("}\n");
+ return sb;
+ }
+
+ StringBuilder appendMemoryManagerSetMethod(StringBuilder sb, String indent, int level) {
+ sb = indent(sb, indent, level);
+ sb.append("public void ")
+ .append(methodName("set"))
+ .append("(int slotNum, ")
+ .append(type.javaType)
+ .append(" value) {\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("b.")
+ .append(type.bbSetter)
+ .append("((slotNum % NO_SLOTS) * ITEM_SIZE + ")
+ .append(offsetName())
+ .append(", value);\n");
+ sb = indent(sb, indent, level);
+ sb.append("}\n");
+ return sb;
+ }
+
+ StringBuilder appendArenaManagerGetMethod(StringBuilder sb, String indent, int level) {
+ sb = indent(sb, indent, level);
+ sb.append("public ")
+ .append(type.javaType)
+ .append(' ')
+ .append(methodName("get"))
+ .append("(long slotNum) {\n");
+ if (initial != null) {
+ sb = indent(sb, indent, level + 1);
+ sb.append("if (TRACK_ALLOC_ID) checkAllocId(slotNum);\n");
+ }
+ sb = indent(sb, indent, level + 1);
+ sb.append("final int arenaId = TypeUtil.Global.arenaId(slotNum);\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("final int localId = TypeUtil.Global.localId(slotNum);\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("return get(arenaId).")
+ .append(methodName("get"))
+ .append("(localId);\n");
+ sb = indent(sb, indent, level);
+ sb.append("}\n");
+ return sb;
+ }
+
+ StringBuilder appendArenaManagerSetMethod(StringBuilder sb, String indent, int level) {
+ sb = indent(sb, indent, level);
+ sb.append("public void ")
+ .append(methodName("set"))
+ .append("(long slotNum, ")
+ .append(type.javaType)
+ .append(" value) {\n");
+ if (initial != null) {
+ sb = indent(sb, indent, level + 1);
+ sb.append("if (TRACK_ALLOC_ID) checkAllocId(slotNum);\n");
+ }
+ sb = indent(sb, indent, level + 1);
+ sb.append("final int arenaId = TypeUtil.Global.arenaId(slotNum);\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("final int localId = TypeUtil.Global.localId(slotNum);\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("get(arenaId).")
+ .append(methodName("set"))
+ .append("(localId, value);\n");
+ sb = indent(sb, indent, level);
+ sb.append("}\n");
+ return sb;
+ }
+
+ StringBuilder appendInitializers(StringBuilder sb, String indent, int level) {
+ sb = indent(sb, indent, level);
+ sb.append("bb.")
+ .append(type.bbSetter)
+ .append("(slotNum * ITEM_SIZE + ")
+ .append(offsetName())
+ .append(", ");
+ if (initial != null) {
+ sb.append(initial);
+ } else {
+ sb.append(type.deadMemInitializer);
+ }
+ sb.append(");\n");
+ return sb;
+ }
+
+ StringBuilder appendChecks(StringBuilder sb, String indent, int level) {
+ if (initial == null) {
+ return sb;
+ }
+ sb = indent(sb, indent, level);
+ sb.append("if (bb.")
+ .append(type.bbGetter)
+ .append("(itemOffset + ")
+ .append(offsetName())
+ .append(") == ")
+ .append(type.deadMemInitializer)
+ .append(") {\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("String msg = \"invalid value in field ")
+ .append(offsetName())
+ .append(" of slot \" + slotNum;\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("throw new IllegalStateException(msg);\n");
+ sb = indent(sb, indent, level);
+ sb.append("}\n");
+ return sb;
+ }
+
+ String offsetName() {
+ String words[] = name.split(" ");
+ assert(words.length > 0);
+ StringBuilder sb = new StringBuilder(words[0].toUpperCase());
+ for (int j = 1; j < words.length; ++j) {
+ sb.append("_").append(words[j].toUpperCase());
+ }
+ sb.append("_OFF");
+ return sb.toString();
+ }
+
+ int offset() {
+ return offset;
+ }
+ }
+
+ String name;
+ ArrayList<Field> fields;
+ int totalSize;
+ boolean modifiable = true;
+
+ static StringBuilder indent(StringBuilder sb, String indent, int level) {
+ for (int i = 0; i < level; ++i) {
+ sb.append(indent);
+ }
+ return sb;
+ }
+
+ public RecordType(String name) {
+ this.name = name;
+ fields = new ArrayList<Field>();
+ addField("next free slot", Type.INT, "-1", false);
+ }
+
+ public static RecordType read(Reader reader) throws JSONException {
+ JSONTokener tok = new JSONTokener(reader);
+ JSONObject obj = new JSONObject(tok);
+ return fromJSON(obj);
+ }
+
+ public static RecordType fromJSON(JSONObject obj) throws JSONException {
+ RecordType result = new RecordType(obj.getString("name"));
+ JSONArray fields = obj.getJSONArray("fields");
+ for (int i = 0; i < fields.length(); ++i) {
+ JSONObject field = fields.getJSONObject(i);
+ result.fields.add(Field.fromJSON(field));
+ }
+ return result;
+ }
+
+ public void addToMap(Map<String, RecordType> map) {
+ modifiable = false;
+ calcOffsetsAndSize();
+ map.put(name, this);
+ }
+
+ public void addField(String name, Type type, String initial) {
+ addField(name, type, initial, true);
+ }
+
+ private void addField(String name, Type type, String initial, boolean accessible) {
+ if (! modifiable) {
+ throw new IllegalStateException("cannot modify type anmore");
+ }
+ fields.add(new Field(name, type, initial, -1, accessible));
+ }
+
+ private void calcOffsetsAndSize() {
+ Collections.sort(fields, new Comparator<Field>() {
+ public int compare(Field left, Field right) {
+ return right.type.size - left.type.size;
+ }
+ });
+ // sort fields by size and align the items
+ totalSize = 0;
+ int alignment = 0;
+ for (int i = 0; i < fields.size(); ++i) {
+ final Field field = fields.get(i);
+ assert field.offset == -1;
+ field.offset = totalSize;
+ final int size = field.type.size;
+ totalSize += size;
+ if (size > alignment) alignment = size;
+ }
+ if (totalSize % alignment != 0) {
+ totalSize = ((totalSize / alignment) + 1) * alignment;
+ }
+ }
+
+ int size() {
+ return fields.size();
+ }
+
+ static String padRight(String s, int n) {
+ return String.format("%1$-" + n + "s", s);
+ }
+
+ static String padLeft(String s, int n) {
+ return String.format("%1$" + n + "s", s);
+ }
+
+ StringBuilder appendConstants(StringBuilder sb, String indent, int level) {
+ sb = indent(sb, indent, level);
+ sb.append("public static int ITEM_SIZE = ")
+ .append(totalSize)
+ .append(";\n");
+ for (int i = 0; i < fields.size(); ++i) {
+ final Field field = fields.get(i);
+ sb = indent(sb, indent, level);
+ sb.append("public static int ")
+ .append(field.offsetName())
+ .append(" = ")
+ .append(field.offset).append("; // size: ")
+ .append(field.type.size).append("\n");
+ }
+ return sb;
+ }
+
+ StringBuilder appendBufferPrinter(StringBuilder sb, String indent, int level) {
+ int maxNameWidth = 0;
+ for (int i = 0; i < fields.size(); ++i) {
+ int width = fields.get(i).name.length();
+ if (width > maxNameWidth) {
+ maxNameWidth = width;
+ }
+ }
+ for (int i = 0; i < fields.size(); ++i) {
+ final Field field = fields.get(i);
+ sb = indent(sb, indent, level);
+ sb.append("sb.append(\"")
+ .append(padRight(field.name, maxNameWidth))
+ .append(" | \");\n");
+ sb = indent(sb, indent, level);
+ sb.append("for (int i = 0; i < NO_SLOTS; ++i) {\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append(field.type.javaType)
+ .append(" value = bb.")
+ .append(field.type.bbGetter)
+ .append("(i * ITEM_SIZE + ")
+ .append(field.offsetName())
+ .append(");\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("sb = ")
+ .append(field.type.appender)
+ .append("(sb, value);\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("sb.append(\" | \");\n");
+ sb = indent(sb, indent, level);
+ sb.append("}\n");
+ sb = indent(sb, indent, level);
+ sb.append("sb.append(\"\\n\");\n");
+ }
+ return sb;
+ }
+}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/AllocInfo.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/AllocInfo.java
new file mode 100644
index 0000000..ef8415f
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/AllocInfo.java
@@ -0,0 +1,35 @@
+package @PACKAGE@;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+public class AllocInfo {
+ String alloc;
+ String free;
+
+ void alloc() {
+ alloc = getStackTrace();
+ }
+
+ void free() {
+ free = getStackTrace();
+ }
+
+ private String getStackTrace() {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ new Exception().printStackTrace(pw);
+ pw.close();
+ String res = sw.toString();
+ // remove first 3 lines
+ int nlPos = 0;
+ for (int i = 0; i < 3; ++i) {
+ nlPos = res.indexOf('\n', nlPos) + 1;
+ }
+ return res.substring(nlPos);
+ }
+
+ public String toString() {
+ return "allocation stack:\n" + alloc + "\nfree stack\n" + free;
+ }
+}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java
new file mode 100644
index 0000000..4abb3af
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package @PACKAGE@;
+
+public class @E@ArenaManager {
+
+ public static final boolean TRACK_ALLOC_ID = @DEBUG@;
+
+ private final int noArenas;
+ private final @E@RecordManager[] arenas;
+ private ThreadLocal<LocalManager> local;
+
+ static class LocalManager {
+ int arenaId;
+ @E@RecordManager mgr;
+ }
+
+ public @E@ArenaManager(final int noArenas, final long txnShrinkTimer) {
+ this.noArenas = noArenas;
+ arenas = new @E@RecordManager[noArenas];
+ for (int i = 0; i < noArenas; ++i) {
+ arenas[i] = new @E@RecordManager(txnShrinkTimer);
+ }
+ local = new ThreadLocal<LocalManager>() {
+ private int nextArena = 0;
+
+ @Override
+ protected synchronized LocalManager initialValue() {
+ @E@RecordManager mgr = arenas[nextArena];
+ LocalManager res = new LocalManager();
+ res.mgr = mgr;
+ res.arenaId = nextArena;
+ nextArena = (nextArena + 1) % noArenas;
+ return res;
+ }
+ };
+ }
+
+ public long allocate() {
+ final LocalManager localManager = local.get();
+ final @E@RecordManager recMgr = localManager.mgr;
+ final int allocId = TRACK_ALLOC_ID ? (++recMgr.allocCounter % 0x7fff) : 0;
+ final int localId = recMgr.allocate();
+
+ long result = TypeUtil.Global.build(localManager.arenaId, allocId, localId);
+
+ if (TRACK_ALLOC_ID) setAllocId(result, (short) allocId);
+
+ assert TypeUtil.Global.allocId(result) == allocId;
+ assert TypeUtil.Global.arenaId(result) == localManager.arenaId;
+ assert TypeUtil.Global.localId(result) == localId;
+ return result;
+ }
+
+ public void deallocate(long slotNum) {
+ if (TRACK_ALLOC_ID) checkAllocId(slotNum);
+ final int arenaId = TypeUtil.Global.arenaId(slotNum);
+ get(arenaId).deallocate(TypeUtil.Global.localId(slotNum));
+ }
+
+ public @E@RecordManager get(int i) {
+ return arenas[i];
+ }
+
+ public @E@RecordManager local() {
+ return local.get().mgr;
+ }
+
+ @METHODS@
+
+ private void checkAllocId(long slotNum) {
+ final int refAllocId = TypeUtil.Global.allocId(slotNum);
+ final short curAllocId = getAllocId(slotNum);
+ if (refAllocId != curAllocId) {
+ String msg = "reference to slot " + slotNum
+ + " of arena " + TypeUtil.Global.arenaId(slotNum)
+ + " refers to version " + Integer.toHexString(refAllocId)
+ + " current version is " + Integer.toHexString(curAllocId);
+ AllocInfo a = getAllocInfo(slotNum);
+ if (a != null) {
+ msg += "\n" + a.toString();
+ }
+ throw new IllegalStateException(msg);
+ }
+ }
+
+ public AllocInfo getAllocInfo(long slotNum) {
+ final int arenaId = TypeUtil.Global.arenaId(slotNum);
+ return get(arenaId).getAllocInfo(TypeUtil.Global.localId(slotNum));
+ }
+
+ public StringBuilder append(StringBuilder sb) {
+ for (int i = 0; i < noArenas; ++i) {
+ sb.append("++++ arena ").append(i).append(" ++++\n");
+ arenas[i].append(sb);
+ }
+ return sb;
+ }
+
+ public String toString() {
+ return append(new StringBuilder()).toString();
+ }
+
+ public Stats addTo(Stats s) {
+ s.arenas += noArenas;
+ for (int i = 0; i < noArenas; ++i) {
+ arenas[i].addTo(s);
+ }
+ return s;
+ }
+}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
new file mode 100644
index 0000000..9afc673
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
@@ -0,0 +1,320 @@
+/*
+ * Copyright 2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package @PACKAGE@;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+public class @E@RecordManager {
+
+ public static final boolean CHECK_SLOTS = @DEBUG@;
+ public static final boolean TRACK_ALLOC_LOC = @DEBUG@;
+
+ static final int NO_SLOTS = 1000;
+
+ @CONSTS@
+
+ private final long txnShrinkTimer;
+ private long shrinkTimer;
+ private ArrayList<Buffer> buffers;
+ private int current;
+ private int occupiedSlots;
+ private boolean isShrinkTimerOn;
+
+ int allocCounter;
+
+ public @E@RecordManager(long txnShrinkTimer) {
+ this.txnShrinkTimer = txnShrinkTimer;
+ buffers = new ArrayList<Buffer>();
+ buffers.add(new Buffer());
+ current = 0;
+
+ allocCounter = 0;
+ }
+
+ enum SlotSource {
+ NON_FULL,
+ UNINITIALIZED,
+ NEW
+ }
+
+ synchronized int allocate() {
+ if (buffers.get(current).isFull()) {
+ final int size = buffers.size();
+ final int start = current + 1;
+ SlotSource source = SlotSource.NEW;
+ for (int j = start; j < start + size; ++j) {
+ // If we find a buffer with space, we use it. Otherwise we
+ // remember the first uninitialized one and use that one.
+ final int i = j % size;
+ final Buffer buffer = buffers.get(i);
+ if (buffer.isInitialized() && ! buffer.isFull()) {
+ source = SlotSource.NON_FULL;
+ current = i;
+ break;
+ } else if (! buffer.isInitialized() && source == SlotSource.NEW) {
+ source = SlotSource.UNINITIALIZED;
+ current = i;
+ }
+ }
+
+ switch (source) {
+ case NEW:
+ buffers.add(new Buffer());
+ current = buffers.size() - 1;
+ break;
+ case UNINITIALIZED:
+ buffers.get(current).initialize();
+ case NON_FULL:
+ break;
+ }
+ }
+ ++occupiedSlots;
+ return buffers.get(current).allocate() + current * NO_SLOTS;
+ }
+
+ synchronized void deallocate(int slotNum) {
+ buffers.get(slotNum / NO_SLOTS).deallocate(slotNum % NO_SLOTS);
+ --occupiedSlots;
+
+ if (needShrink()) {
+ shrink();
+ }
+ }
+
+ /**
+ * Shrink policy:
+ * Shrink when the resource under-utilization lasts for a certain amount of time.
+ * TODO Need to figure out which of the policies is better
+ * case1.
+ * buffers status : O x x x x x O (O is initialized, x is deinitialized)
+ * In the above status, 'CURRENT' needShrink() returns 'TRUE'
+ * even if there is nothing to shrink or deallocate.
+ * It doesn't distinguish the deinitialized children from initialized children
+ * by calculating totalNumOfSlots = buffers.size() * ChildEntityLockInfoArrayManager.NUM_OF_SLOTS.
+ * In other words, it doesn't subtract the deinitialized children's slots.
+ * case2.
+ * buffers status : O O x x x x x
+ * However, in the above case, if we subtract the deinitialized children's slots,
+ * needShrink() will return false even if we shrink the buffers at this case.
+ *
+ * @return
+ */
+ private boolean needShrink() {
+ int size = buffers.size();
+ int usedSlots = occupiedSlots;
+ if (usedSlots == 0) {
+ usedSlots = 1;
+ }
+
+ if (size > 1 && size * NO_SLOTS / usedSlots >= 3) {
+ if (isShrinkTimerOn) {
+ if (System.currentTimeMillis() - shrinkTimer >= txnShrinkTimer) {
+ isShrinkTimerOn = false;
+ return true;
+ }
+ } else {
+ //turn on timer
+ isShrinkTimerOn = true;
+ shrinkTimer = System.currentTimeMillis();
+ }
+ } else {
+ //turn off timer
+ isShrinkTimerOn = false;
+ }
+
+ return false;
+ }
+
+ /**
+ * Shrink() may
+ * deinitialize(:deallocates ByteBuffer of child) Children(s) or
+ * shrink buffers according to the deinitialized children's contiguity status.
+ * It doesn't deinitialze or shrink more than half of children at a time.
+ */
+ private void shrink() {
+ int i;
+ int removeCount = 0;
+ int size = buffers.size();
+ int maxDecreaseCount = size / 2;
+ Buffer buffer;
+
+ //The first buffer never be deinitialized.
+ for (i = 1; i < size; i++) {
+ if (buffers.get(i).isEmpty()) {
+ buffers.get(i).deinitialize();
+ }
+ }
+
+ //remove the empty buffers from the end
+ for (i = size - 1; i >= 1; i--) {
+ buffer = buffers.get(i);
+ if (! buffer.isInitialized()) {
+ buffers.remove(i);
+ if (++removeCount == maxDecreaseCount) {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+
+ //reset allocChild to the first buffer
+ current = 0;
+
+ isShrinkTimerOn = false;
+ }
+
+ @METHODS@
+
+ public AllocInfo getAllocInfo(int slotNum) {
+ final Buffer buf = buffers.get(slotNum / NO_SLOTS);
+ if (buf.allocList == null) {
+ return null;
+ } else {
+ return buf.allocList.get(slotNum % NO_SLOTS);
+ }
+ }
+
+ StringBuilder append(StringBuilder sb) {
+ sb.append("+++ current: ")
+ .append(current)
+ .append(" no occupied slots: ")
+ .append(occupiedSlots)
+ .append(" +++\n");
+ for (int i = 0; i < buffers.size(); ++i) {
+ buffers.get(i).append(sb);
+ sb.append("\n");
+ }
+ return sb;
+ }
+
+ public String toString() {
+ return append(new StringBuilder()).toString();
+ }
+
+ public Stats addTo(Stats s) {
+ final int size = buffers.size();
+ s.buffers += size;
+ s.slots += size * NO_SLOTS;
+ s.size += size * NO_SLOTS * ITEM_SIZE;
+ for (int i = 0; i < size; ++i) {
+ buffers.get(i).addTo(s);
+ }
+ return s;
+ }
+
+ static class Buffer {
+ private ByteBuffer bb = null; // null represents 'deinitialized' state.
+ private int freeSlotNum;
+ private int occupiedSlots;
+
+ ArrayList<AllocInfo> allocList;
+
+ Buffer() {
+ initialize();
+ }
+
+ void initialize() {
+ bb = ByteBuffer.allocate(NO_SLOTS * ITEM_SIZE);
+ freeSlotNum = 0;
+ occupiedSlots = 0;
+
+ for (int i = 0; i < NO_SLOTS - 1; i++) {
+ setNextFreeSlot(i, i + 1);
+ }
+ setNextFreeSlot(NO_SLOTS - 1, -1); //-1 represents EOL(end of link)
+
+ if (TRACK_ALLOC_LOC) {
+ allocList = new ArrayList<AllocInfo>(NO_SLOTS);
+ for (int i = 0; i < NO_SLOTS; ++i) {
+ allocList.add(new AllocInfo());
+ }
+ }
+ }
+
+ public void deinitialize() {
+ if (TRACK_ALLOC_LOC) allocList = null;
+ bb = null;
+ }
+
+ public boolean isInitialized() {
+ return bb != null;
+ }
+
+ public boolean isFull() {
+ return freeSlotNum == -1;
+ }
+
+ public boolean isEmpty() {
+ return occupiedSlots == 0;
+ }
+
+ public int allocate() {
+ int slotNum = freeSlotNum;
+ freeSlotNum = getNextFreeSlot(slotNum);
+ @INIT_SLOT@
+ occupiedSlots++;
+ if (TRACK_ALLOC_LOC) allocList.get(slotNum).alloc();
+ return slotNum;
+ }
+
+ public void deallocate(int slotNum) {
+ @INIT_SLOT@
+ setNextFreeSlot(slotNum, freeSlotNum);
+ freeSlotNum = slotNum;
+ occupiedSlots--;
+ if (TRACK_ALLOC_LOC) allocList.get(slotNum).free();
+ }
+
+ public int getNextFreeSlot(int slotNum) {
+ return bb.getInt(slotNum * ITEM_SIZE + NEXT_FREE_SLOT_OFF);
+ }
+
+ public void setNextFreeSlot(int slotNum, int nextFreeSlot) {
+ bb.putInt(slotNum * ITEM_SIZE + NEXT_FREE_SLOT_OFF, nextFreeSlot);
+ }
+
+ StringBuilder append(StringBuilder sb) {
+ sb.append("++ free slot: ")
+ .append(freeSlotNum)
+ .append(" no occupied slots: ")
+ .append(occupiedSlots)
+ .append(" ++\n");
+ @PRINT_BUFFER@
+ return sb;
+ }
+
+ public String toString() {
+ return append(new StringBuilder()).toString();
+ }
+
+ public void addTo(Stats s) {
+ if (isInitialized()) {
+ s.items += occupiedSlots;
+ }
+ }
+
+ private void checkSlot(int slotNum) {
+ if (! CHECK_SLOTS) {
+ return;
+ }
+ final int itemOffset = (slotNum % NO_SLOTS) * ITEM_SIZE;
+ // @CHECK_SLOT@
+ }
+ }
+
+}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/Stats.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/Stats.java
new file mode 100644
index 0000000..c136101
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/Stats.java
@@ -0,0 +1,19 @@
+package @PACKAGE@;
+
+public class Stats {
+ int arenas = 0;
+ int buffers = 0;
+ int slots = 0;
+ int items = 0;
+ int size = 0;
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{ arenas : ").append(arenas);
+ sb.append(", buffers : ").append(buffers);
+ sb.append(", slots : ").append(slots);
+ sb.append(", items : ").append(items);
+ sb.append(", size : ").append(size).append(" }");
+ return sb.toString();
+ }
+}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/TypeUtil.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/TypeUtil.java
new file mode 100644
index 0000000..9571156
--- /dev/null
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/TypeUtil.java
@@ -0,0 +1,59 @@
+package @PACKAGE@;
+
+public class TypeUtil {
+
+ public static class Byte {
+ public static StringBuilder append(StringBuilder sb, byte b) {
+ return sb.append(String.format("%1$18x", b));
+ }
+ }
+
+ public static class Short {
+ public static StringBuilder append(StringBuilder sb, short s) {
+ return sb.append(String.format("%1$18x", s));
+ }
+ }
+
+ public static class Int {
+ public static StringBuilder append(StringBuilder sb, int i) {
+ return sb.append(String.format("%1$18x", i));
+ }
+ }
+
+ public static class Global {
+
+ public static long build(int arenaId, int allocId, int localId) {
+ long result = arenaId;
+ result = result << 48;
+ result |= (allocId << 32);
+ result |= localId;
+ return result;
+ }
+
+ public static int arenaId(long l) {
+ return (int)((l >>> 48) & 0xffff);
+ }
+
+ public static int allocId(long l) {
+ return (int)((l >>> 32) & 0xffff);
+ }
+
+ public static int localId(long l) {
+ return (int) (l & 0xffffffffL);
+ }
+
+ public static StringBuilder append(StringBuilder sb, long l) {
+ sb.append(String.format("%1$4x", TypeUtil.Global.arenaId(l)));
+ sb.append(':');
+ sb.append(String.format("%1$4x", TypeUtil.Global.allocId(l)));
+ sb.append(':');
+ sb.append(String.format("%1$8x", TypeUtil.Global.localId(l)));
+ return sb;
+ }
+
+ public static String toString(long l) {
+ return append(new StringBuilder(), l).toString();
+ }
+
+ }
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index 46159ba..a1afd89 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -128,8 +128,8 @@
}
@Override
- public void unlock(MetadataTransactionContext ctx) throws RemoteException, ACIDException {
- metadataNode.unlock(ctx.getJobId());
+ public void unlock(MetadataTransactionContext ctx, byte lockMode) throws RemoteException, ACIDException {
+ metadataNode.unlock(ctx.getJobId(), lockMode);
}
@Override
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index b344f69..9a35fb0 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -140,9 +140,9 @@
}
@Override
- public void unlock(JobId jobId) throws ACIDException, RemoteException {
+ public void unlock(JobId jobId, byte lockMode) throws ACIDException, RemoteException {
ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
- transactionSubsystem.getLockManager().unlock(METADATA_DATASET_ID, -1, txnCtx);
+ transactionSubsystem.getLockManager().unlock(METADATA_DATASET_ID, -1, lockMode, txnCtx);
}
@Override
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
index 392c8a1..5fbd06d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
@@ -105,7 +105,7 @@
* @throws ACIDException
* @throws RemoteException
*/
- public void unlock(MetadataTransactionContext ctx) throws ACIDException, RemoteException;
+ public void unlock(MetadataTransactionContext ctx, byte lockMode) throws ACIDException, RemoteException;
/**
* Inserts a new dataverse into the metadata.
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
index f27268f..fce3f25 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
@@ -84,7 +84,7 @@
* @throws ACIDException
* @throws RemoteException
*/
- public void unlock(JobId jobId) throws ACIDException, RemoteException;
+ public void unlock(JobId jobId, byte lockMode) throws ACIDException, RemoteException;
/**
* Inserts a new dataverse into the metadata, acquiring local locks on
diff --git a/asterix-transactions/pom.xml b/asterix-transactions/pom.xml
index 3123008..ef06acc 100644
--- a/asterix-transactions/pom.xml
+++ b/asterix-transactions/pom.xml
@@ -33,6 +33,47 @@
<fork>true</fork>
</configuration>
</plugin>
+ <plugin>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <artifactId>record-manager-generator-maven-plugin</artifactId>
+ <version>0.8.1-SNAPSHOT</version>
+ <configuration>
+ <debug>false</debug>
+ <inputFiles>
+ <param>src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Job.json</param>
+ <param>src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Resource.json</param>
+ <param>src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Request.json</param>
+ </inputFiles>
+ <packageName>edu.uci.ics.asterix.transaction.management.service.locking</packageName>
+ </configuration>
+ <executions>
+ <execution>
+ <id>generate-record-manager</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>generate-record-manager</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.build.directory}/generated-sources/java/</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
index dcc4d40..e16e106 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
@@ -64,7 +64,7 @@
public void complete(ITupleReference tuple) throws HyracksDataException {
int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
try {
- lockManager.unlock(datasetId, pkHash, txnCtx);
+ lockManager.unlock(datasetId, pkHash, LockMode.S, txnCtx);
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
index efe1daa..9957edf 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
@@ -58,7 +58,7 @@
public void cancel(ITupleReference tuple) throws HyracksDataException {
int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
try {
- lockManager.unlock(datasetId, pkHash, txnCtx);
+ lockManager.unlock(datasetId, pkHash, LockMode.S, txnCtx);
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
new file mode 100644
index 0000000..96665ba
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -0,0 +1,998 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.transactions.DatasetId;
+import edu.uci.ics.asterix.common.transactions.ILockManager;
+import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.transactions.ITransactionManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
+
+/**
+ * An implementation of the ILockManager interface.
+ *
+ * @author tillw
+ */
+public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent {
+
+ private static final Logger LOGGER = Logger.getLogger(ConcurrentLockManager.class.getName());
+
+ public static final boolean IS_DEBUG_MODE = false;//true
+
+ private TransactionSubsystem txnSubsystem;
+ private ResourceGroupTable table;
+ private ResourceArenaManager resArenaMgr;
+ private RequestArenaManager reqArenaMgr;
+ private JobArenaManager jobArenaMgr;
+ private ConcurrentHashMap<Integer, Long> jobIdSlotMap;
+ private ThreadLocal<DatasetLockCache> dsLockCache;
+
+ enum LockAction {
+ ERR(false, false),
+ GET(false, false),
+ UPD(false, true), // special version of GET that updates the max lock mode
+ WAIT(true, false),
+ CONV(true, true) // convert (upgrade) a lock (e.g. from S to X)
+ ;
+ boolean wait;
+ boolean modify;
+ LockAction(boolean wait, boolean modify) {
+ this.wait = wait;
+ this.modify = modify;
+ }
+ }
+
+ static LockAction[][] ACTION_MATRIX = {
+ // new NL IS IX S X
+ { LockAction.ERR, LockAction.UPD, LockAction.UPD, LockAction.UPD, LockAction.UPD }, // NL
+ { LockAction.ERR, LockAction.GET, LockAction.UPD, LockAction.UPD, LockAction.WAIT }, // IS
+ { LockAction.ERR, LockAction.GET, LockAction.GET, LockAction.WAIT, LockAction.WAIT }, // IX
+ { LockAction.ERR, LockAction.GET, LockAction.WAIT, LockAction.GET, LockAction.WAIT }, // S
+ { LockAction.ERR, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT } // X
+ };
+
+ public ConcurrentLockManager(TransactionSubsystem txnSubsystem) throws ACIDException {
+ this.txnSubsystem = txnSubsystem;
+
+ this.table = new ResourceGroupTable();
+
+ final int lockManagerShrinkTimer = txnSubsystem.getTransactionProperties()
+ .getLockManagerShrinkTimer();
+
+ int noArenas = Runtime.getRuntime().availableProcessors() * 2;
+
+ resArenaMgr = new ResourceArenaManager(noArenas, lockManagerShrinkTimer);
+ reqArenaMgr = new RequestArenaManager(noArenas, lockManagerShrinkTimer);
+ jobArenaMgr = new JobArenaManager(noArenas, lockManagerShrinkTimer);
+ jobIdSlotMap = new ConcurrentHashMap<>();
+ dsLockCache = new ThreadLocal<DatasetLockCache>() {
+ protected DatasetLockCache initialValue() {
+ return new DatasetLockCache();
+ }
+ };
+ }
+
+ public AsterixTransactionProperties getTransactionProperties() {
+ return this.txnSubsystem.getTransactionProperties();
+ }
+
+ @Override
+ public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+ throws ACIDException {
+ log("lock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+
+ final int dsId = datasetId.getId();
+ final int jobId = txnContext.getJobId().getId();
+
+ if (entityHashValue != -1) {
+ // get the intention lock on the dataset, if we want to lock an individual item
+ final byte dsLockMode = LockMode.intentionMode(lockMode);
+ if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
+ lock(datasetId, -1, dsLockMode, txnContext);
+ dsLockCache.get().put(jobId, dsId, dsLockMode);
+ }
+ }
+
+ final long jobSlot = findOrAllocJobSlot(jobId);
+
+ final ResourceGroup group = table.get(datasetId, entityHashValue);
+ group.getLatch();
+ try {
+ validateJob(txnContext);
+
+ final long resSlot = findOrAllocResourceSlot(group, dsId, entityHashValue);
+ final long reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
+ boolean locked = false;
+ while (! locked) {
+ final LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
+ switch (act) {
+ case UPD:
+ resArenaMgr.setMaxMode(resSlot, lockMode);
+ // no break
+ case GET:
+ addHolder(reqSlot, resSlot, jobSlot);
+ locked = true;
+ break;
+ case WAIT:
+ case CONV:
+ enqueueWaiter(group, reqSlot, resSlot, jobSlot, act, txnContext);
+ break;
+ case ERR:
+ default:
+ throw new IllegalStateException();
+ }
+ }
+ } finally {
+ group.releaseLatch();
+ }
+ }
+
+ private void enqueueWaiter(final ResourceGroup group, final long reqSlot,
+ final long resSlot, final long jobSlot, final LockAction act,
+ ITransactionContext txnContext) throws ACIDException {
+ final Queue queue = act.modify ? upgrader : waiter;
+ if (! introducesDeadlock(resSlot, jobSlot)) {
+ queue.add(reqSlot, resSlot, jobSlot);
+ } else {
+ requestAbort(txnContext);
+ }
+ try {
+ group.await(txnContext);
+ } finally {
+ queue.remove(reqSlot, resSlot, jobSlot);
+ }
+ }
+
+ /**
+ * determine if adding a job to the waiters of a resource will introduce a
+ * cycle in the wait-graph where the job waits on itself
+ * @param resSlot the slot that contains the information about the resource
+ * @param jobSlot the slot that contains the information about the job
+ * @return true if a cycle would be introduced, false otherwise
+ */
+ private boolean introducesDeadlock(final long resSlot, final long jobSlot) {
+ long reqSlot = resArenaMgr.getLastHolder(resSlot);
+ while (reqSlot >= 0) {
+ long holderJobSlot = reqArenaMgr.getJobSlot(reqSlot);
+ if (holderJobSlot == jobSlot) {
+ return true;
+ }
+ boolean scanWaiters = true;
+ long waiter = jobArenaMgr.getLastWaiter(holderJobSlot);
+ while (waiter >= 0) {
+ long watingOnResSlot = reqArenaMgr.getResourceId(waiter);
+ if (introducesDeadlock(watingOnResSlot, jobSlot)) {
+ return true;
+ }
+ waiter = reqArenaMgr.getNextJobRequest(waiter);
+ if (waiter < 0 && scanWaiters) {
+ scanWaiters = false;
+ waiter = jobArenaMgr.getLastUpgrader(holderJobSlot);
+ }
+ }
+ reqSlot = reqArenaMgr.getNextRequest(reqSlot);
+ }
+ return false;
+ }
+
+ @Override
+ public void instantLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+ throws ACIDException {
+ log("instantLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+
+ final int dsId = datasetId.getId();
+ final int jobId = txnContext.getJobId().getId();
+
+ if (entityHashValue != -1) {
+ // get the intention lock on the dataset, if we want to lock an individual item
+ final byte dsLockMode = LockMode.intentionMode(lockMode);
+ if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
+ lock(datasetId, -1, dsLockMode, txnContext);
+ dsLockCache.get().put(jobId, dsId, dsLockMode);
+ }
+ }
+
+ final ResourceGroup group = table.get(datasetId, entityHashValue);
+ if (group.firstResourceIndex.get() == -1l) {
+ validateJob(txnContext);
+ // if we do not have a resource in the group, we know that the
+ // resource that we are looking for is not locked
+ return;
+ }
+
+ // we only allocate a request slot if we actually have to wait
+ long reqSlot = -1;
+
+ group.getLatch();
+ try {
+ validateJob(txnContext);
+
+ final long resSlot = findResourceInGroup(group, dsId, entityHashValue);
+ if (resSlot < 0) {
+ // if we don't find the resource, there are no locks on it.
+ return;
+ }
+
+ final long jobSlot = findOrAllocJobSlot(jobId);
+
+ while (true) {
+ final LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
+ switch (act) {
+ case UPD:
+ case GET:
+ return;
+ case WAIT:
+ case CONV:
+ if (reqSlot == -1) {
+ reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
+ }
+ enqueueWaiter(group, reqSlot, resSlot, jobSlot, act, txnContext);
+ break;
+ case ERR:
+ default:
+ throw new IllegalStateException();
+ }
+ }
+ } finally {
+ if (reqSlot != -1) {
+ // deallocate request, if we allocated one earlier
+ reqArenaMgr.deallocate(reqSlot);
+ }
+ group.releaseLatch();
+ }
+ }
+
+ @Override
+ public boolean tryLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+ throws ACIDException {
+ log("tryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+
+ final int dsId = datasetId.getId();
+ final int jobId = txnContext.getJobId().getId();
+
+ if (entityHashValue != -1) {
+ // get the intention lock on the dataset, if we want to lock an individual item
+ final byte dsLockMode = LockMode.intentionMode(lockMode);
+ if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
+ if (! tryLock(datasetId, -1, dsLockMode, txnContext)) {
+ return false;
+ }
+ dsLockCache.get().put(jobId, dsId, dsLockMode);
+ }
+ }
+
+ final long jobSlot = findOrAllocJobSlot(jobId);
+
+ final ResourceGroup group = table.get(datasetId, entityHashValue);
+ group.getLatch();
+
+ try {
+ validateJob(txnContext);
+
+ final long resSlot = findOrAllocResourceSlot(group, dsId, entityHashValue);
+ final long reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
+
+ final LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
+ switch (act) {
+ case UPD:
+ resArenaMgr.setMaxMode(resSlot, lockMode);
+ // no break
+ case GET:
+ addHolder(reqSlot, resSlot, jobSlot);
+ return true;
+ case WAIT:
+ case CONV:
+ return false;
+ default:
+ throw new IllegalStateException();
+ }
+ } finally {
+ group.releaseLatch();
+ }
+
+ // if we did acquire the dataset lock, but not the entity lock, we keep
+ // it anyway and clean it up at the end of the job
+ }
+
+ @Override
+ public boolean instantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
+ ITransactionContext txnContext) throws ACIDException {
+ log("instantTryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+
+ final int dsId = datasetId.getId();
+ final int jobId = txnContext.getJobId().getId();
+
+ if (entityHashValue != -1) {
+ // get the intention lock on the dataset, if we want to lock an individual item
+ final byte dsLockMode = LockMode.intentionMode(lockMode);
+ if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
+ if (! tryLock(datasetId, -1, dsLockMode, txnContext)) {
+ return false;
+ }
+ dsLockCache.get().put(jobId, dsId, dsLockMode);
+ }
+ }
+
+ final ResourceGroup group = table.get(datasetId, entityHashValue);
+ if (group.firstResourceIndex.get() == -1l) {
+ validateJob(txnContext);
+ // if we do not have a resource in the group, we know that the
+ // resource that we are looking for is not locked
+ return true;
+ }
+
+ group.getLatch();
+ try {
+ validateJob(txnContext);
+
+ final long resSlot = findResourceInGroup(group, dsId, entityHashValue);
+ if (resSlot < 0) {
+ // if we don't find the resource, there are no locks on it.
+ return true;
+ }
+
+ final long jobSlot = findOrAllocJobSlot(jobId);
+
+ LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
+ switch (act) {
+ case UPD:
+ case GET:
+ return true;
+ case WAIT:
+ case CONV:
+ return false;
+ case ERR:
+ default:
+ throw new IllegalStateException();
+ }
+ } finally {
+ group.releaseLatch();
+ }
+ }
+
+ @Override
+ public void unlock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext) throws ACIDException {
+ log("unlock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+
+ ResourceGroup group = table.get(datasetId, entityHashValue);
+ group.getLatch();
+ try {
+
+ int dsId = datasetId.getId();
+ long resource = findResourceInGroup(group, dsId, entityHashValue);
+ if (resource < 0) {
+ throw new IllegalStateException("resource (" + dsId + ", " + entityHashValue + ") not found");
+ }
+
+ int jobId = txnContext.getJobId().getId();
+ long jobSlot = findOrAllocJobSlot(jobId);
+
+ long holder = removeLastHolder(resource, jobSlot, lockMode);
+
+ // deallocate request
+ reqArenaMgr.deallocate(holder);
+ // deallocate resource or fix max lock mode
+ if (resourceNotUsed(resource)) {
+ long prev = group.firstResourceIndex.get();
+ if (prev == resource) {
+ group.firstResourceIndex.set(resArenaMgr.getNext(resource));
+ } else {
+ while (resArenaMgr.getNext(prev) != resource) {
+ prev = resArenaMgr.getNext(prev);
+ }
+ resArenaMgr.setNext(prev, resArenaMgr.getNext(resource));
+ }
+ resArenaMgr.deallocate(resource);
+ } else {
+ final int oldMaxMode = resArenaMgr.getMaxMode(resource);
+ final int newMaxMode = determineNewMaxMode(resource, oldMaxMode);
+ resArenaMgr.setMaxMode(resource, newMaxMode);
+ if (oldMaxMode != newMaxMode) {
+ // the locking mode didn't change, current waiters won't be
+ // able to acquire the lock, so we do not need to signal them
+ group.wakeUp();
+ }
+ }
+ } finally {
+ group.releaseLatch();
+ }
+
+ // dataset intention locks are cleaned up at the end of the job
+ }
+
+ @Override
+ public void releaseLocks(ITransactionContext txnContext) throws ACIDException {
+ log("releaseLocks", -1, -1, LockMode.ANY, txnContext);
+
+ int jobId = txnContext.getJobId().getId();
+ Long jobSlot = jobIdSlotMap.get(jobId);
+ if (jobSlot == null) {
+ // we don't know the job, so there are no locks for it - we're done
+ return;
+ }
+ synchronized (jobArenaMgr) {
+ long holder = jobArenaMgr.getLastHolder(jobSlot);
+ while (holder != -1) {
+ long resource = reqArenaMgr.getResourceId(holder);
+ int dsId = resArenaMgr.getDatasetId(resource);
+ int pkHashVal = resArenaMgr.getPkHashVal(resource);
+ unlock(new DatasetId(dsId), pkHashVal, LockMode.ANY, txnContext);
+ holder = jobArenaMgr.getLastHolder(jobSlot);
+ }
+ jobArenaMgr.deallocate(jobSlot);
+ }
+ //System.err.println(table.append(new StringBuilder(), true).toString());
+ //System.out.println("jobArenaMgr " + jobArenaMgr.addTo(new Stats()).toString());
+ //System.out.println("resArenaMgr " + resArenaMgr.addTo(new Stats()).toString());
+ //System.out.println("reqArenaMgr " + reqArenaMgr.addTo(new Stats()).toString());
+ }
+
+ private long findOrAllocJobSlot(int jobId) {
+ Long jobSlot = jobIdSlotMap.get(jobId);
+ if (jobSlot == null) {
+ jobSlot = new Long(jobArenaMgr.allocate());
+ jobArenaMgr.setJobId(jobSlot, jobId);
+ Long oldSlot = jobIdSlotMap.putIfAbsent(jobId, jobSlot);
+ if (oldSlot != null) {
+ // if another thread allocated a slot for this jobId between
+ // get(..) and putIfAbsent(..), we'll use that slot and
+ // deallocate the one we allocated
+ jobArenaMgr.deallocate(jobSlot);
+ jobSlot = oldSlot;
+ }
+ }
+ assert(jobSlot >= 0);
+ return jobSlot;
+ }
+
+ private long findOrAllocResourceSlot(ResourceGroup group, int dsId, int entityHashValue) {
+ long resSlot = findResourceInGroup(group, dsId, entityHashValue);
+
+ if (resSlot == -1) {
+ // we don't know about this resource, let's alloc a slot
+ resSlot = resArenaMgr.allocate();
+ resArenaMgr.setDatasetId(resSlot, dsId);
+ resArenaMgr.setPkHashVal(resSlot, entityHashValue);
+ resArenaMgr.setNext(resSlot, group.firstResourceIndex.get());
+ group.firstResourceIndex.set(resSlot);
+ }
+ return resSlot;
+ }
+
+ private long allocRequestSlot(long resSlot, long jobSlot, byte lockMode) {
+ long reqSlot = reqArenaMgr.allocate();
+ reqArenaMgr.setResourceId(reqSlot, resSlot);
+ reqArenaMgr.setLockMode(reqSlot, lockMode); // lock mode is a byte!!
+ reqArenaMgr.setJobSlot(reqSlot, jobSlot);
+ return reqSlot;
+ }
+
+ private LockAction determineLockAction(long resSlot, long jobSlot, byte lockMode) {
+ final int curLockMode = resArenaMgr.getMaxMode(resSlot);
+ final LockAction act = ACTION_MATRIX[curLockMode][lockMode];
+ if (act == LockAction.WAIT) {
+ return updateActionForSameJob(resSlot, jobSlot, lockMode);
+ }
+ return act;
+ }
+
+ /**
+ * when we've got a lock conflict for a different job, we always have to
+ * wait, if it is for the same job we either have to
+ * a) (wait and) convert the lock once conversion becomes viable or
+ * b) acquire the lock if we want to lock the same resource with the same
+ * lock mode for the same job.
+ * @param resource the resource slot that's being locked
+ * @param job the job slot of the job locking the resource
+ * @param lockMode the lock mode that the resource should be locked with
+ * @return
+ */
+ private LockAction updateActionForSameJob(long resource, long job, byte lockMode) {
+ // TODO we can reduce the number of things we have to look at by
+ // carefully distinguishing the different lock modes
+ long holder = resArenaMgr.getLastHolder(resource);
+ LockAction res = LockAction.WAIT;
+ while (holder != -1) {
+ if (job == reqArenaMgr.getJobSlot(holder)) {
+ if (reqArenaMgr.getLockMode(holder) == lockMode) {
+ return LockAction.GET;
+ } else {
+ res = LockAction.CONV;
+ }
+ }
+ holder = reqArenaMgr.getNextRequest(holder);
+ }
+ return res;
+ }
+
+ private long findResourceInGroup(ResourceGroup group, int dsId, int entityHashValue) {
+ long resSlot = group.firstResourceIndex.get();
+ while (resSlot != -1) {
+ // either we already have a lock on this resource or we have a
+ // hash collision
+ if (resArenaMgr.getDatasetId(resSlot) == dsId &&
+ resArenaMgr.getPkHashVal(resSlot) == entityHashValue) {
+ return resSlot;
+ } else {
+ resSlot = resArenaMgr.getNext(resSlot);
+ }
+ }
+ return -1;
+ }
+
+ private void addHolder(long request, long resource, long job) {
+ long lastHolder = resArenaMgr.getLastHolder(resource);
+ reqArenaMgr.setNextRequest(request, lastHolder);
+ resArenaMgr.setLastHolder(resource, request);
+
+ synchronized (jobArenaMgr) {
+ long lastJobHolder = jobArenaMgr.getLastHolder(job);
+ insertIntoJobQueue(request, lastJobHolder);
+ jobArenaMgr.setLastHolder(job, request);
+ }
+ }
+
+ private long removeLastHolder(long resource, long jobSlot, byte lockMode) {
+ long holder = resArenaMgr.getLastHolder(resource);
+ if (holder < 0) {
+ throw new IllegalStateException("no holder for resource " + resource);
+ }
+
+ // remove from the list of holders for a resource
+ if (requestMatches(holder, jobSlot, lockMode)) {
+ // if the head of the queue matches, we need to update the resource
+ long next = reqArenaMgr.getNextRequest(holder);
+ resArenaMgr.setLastHolder(resource, next);
+ } else {
+ holder = removeRequestFromQueueForJob(holder, jobSlot, lockMode);
+ }
+
+ synchronized (jobArenaMgr) {
+ // remove from the list of requests for a job
+ long newHead = removeRequestFromJob(jobSlot, holder);
+ jobArenaMgr.setLastHolder(jobSlot, newHead);
+ }
+ return holder;
+ }
+
+ private boolean requestMatches(long holder, long jobSlot, byte lockMode) {
+ return jobSlot == reqArenaMgr.getJobSlot(holder)
+ && (lockMode == LockMode.ANY
+ || lockMode == reqArenaMgr.getLockMode(holder));
+ }
+
+ private long removeRequestFromJob(long jobSlot, long holder) {
+ long prevForJob = reqArenaMgr.getPrevJobRequest(holder);
+ long nextForJob = reqArenaMgr.getNextJobRequest(holder);
+ if (nextForJob != -1) {
+ reqArenaMgr.setPrevJobRequest(nextForJob, prevForJob);
+ }
+ if (prevForJob == -1) {
+ return nextForJob;
+ } else {
+ reqArenaMgr.setNextJobRequest(prevForJob, nextForJob);
+ return -1;
+ }
+ }
+
+ interface Queue {
+ void add(long request, long resource, long job);
+ void remove(long request, long resource, long job);
+ }
+
+ final Queue waiter = new Queue() {
+ public void add(long request, long resource, long job) {
+ long waiter = resArenaMgr.getFirstWaiter(resource);
+ reqArenaMgr.setNextRequest(request, -1);
+ if (waiter == -1) {
+ resArenaMgr.setFirstWaiter(resource, request);
+ } else {
+ appendToRequestQueue(waiter, request);
+ }
+ synchronized (jobArenaMgr) {
+ waiter = jobArenaMgr.getLastWaiter(job);
+ insertIntoJobQueue(request, waiter);
+ jobArenaMgr.setLastWaiter(job, request);
+ }
+ }
+ public void remove(long request, long resource, long job) {
+ long waiter = resArenaMgr.getFirstWaiter(resource);
+ if (waiter == request) {
+ long next = reqArenaMgr.getNextRequest(waiter);
+ resArenaMgr.setFirstWaiter(resource, next);
+ } else {
+ waiter = removeRequestFromQueueForSlot(waiter, request);
+ }
+ synchronized (jobArenaMgr) {
+ // remove from the list of requests for a job
+ long newHead = removeRequestFromJob(job, waiter);
+ jobArenaMgr.setLastWaiter(job, newHead);
+ }
+ }
+ };
+
+ final Queue upgrader = new Queue() {
+ public void add(long request, long resource, long job) {
+ long upgrader = resArenaMgr.getFirstUpgrader(resource);
+ reqArenaMgr.setNextRequest(request, -1);
+ if (upgrader == -1) {
+ resArenaMgr.setFirstUpgrader(resource, request);
+ } else {
+ appendToRequestQueue(upgrader, request);
+ }
+ synchronized (jobArenaMgr) {
+ upgrader = jobArenaMgr.getLastUpgrader(job);
+ insertIntoJobQueue(request, upgrader);
+ jobArenaMgr.setLastUpgrader(job, request);
+ }
+ }
+ public void remove(long request, long resource, long job) {
+ long upgrader = resArenaMgr.getFirstUpgrader(resource);
+ if (upgrader == request) {
+ long next = reqArenaMgr.getNextRequest(upgrader);
+ resArenaMgr.setFirstUpgrader(resource, next);
+ } else {
+ upgrader = removeRequestFromQueueForSlot(upgrader, request);
+ }
+ synchronized (jobArenaMgr) {
+ // remove from the list of requests for a job
+ long newHead = removeRequestFromJob(job, upgrader);
+ jobArenaMgr.setLastUpgrader(job, newHead);
+ }
+ }
+ };
+
+ private void insertIntoJobQueue(long newRequest, long oldRequest) {
+ reqArenaMgr.setNextJobRequest(newRequest, oldRequest);
+ reqArenaMgr.setPrevJobRequest(newRequest, -1);
+ if (oldRequest >= 0) {
+ reqArenaMgr.setPrevJobRequest(oldRequest, newRequest);
+ }
+ }
+
+ private void appendToRequestQueue(long head, long appendee) {
+ long next = reqArenaMgr.getNextRequest(head);
+ while(next != -1) {
+ head = next;
+ next = reqArenaMgr.getNextRequest(head);
+ }
+ reqArenaMgr.setNextRequest(head, appendee);
+ }
+
+ private long removeRequestFromQueueForSlot(long head, long reqSlot) {
+ long cur = head;
+ long prev = cur;
+ while (prev != -1) {
+ cur = reqArenaMgr.getNextRequest(prev);
+ if (cur == -1) {
+ throw new IllegalStateException("request " + reqSlot+ " not in queue");
+ }
+ if (cur == reqSlot) {
+ break;
+ }
+ prev = cur;
+ }
+ long next = reqArenaMgr.getNextRequest(cur);
+ reqArenaMgr.setNextRequest(prev, next);
+ return cur;
+ }
+
+ /**
+ * remove the first request for a given job and lock mode from a request queue.
+ * If the value of the parameter lockMode is LockMode.NL the first request
+ * for the job is removed - independent of the LockMode.
+ * @param head the head of the request queue
+ * @param jobSlot the job slot
+ * @param lockMode the lock mode
+ * @return the slot of the first request that matched the given job
+ */
+ private long removeRequestFromQueueForJob(long head, long jobSlot, byte lockMode) {
+ long holder = head;
+ long prev = holder;
+ while (prev != -1) {
+ holder = reqArenaMgr.getNextRequest(prev);
+ if (holder == -1) {
+ throw new IllegalStateException("no entry for job " + jobSlot + " in queue");
+ }
+ if (requestMatches(holder, jobSlot, lockMode)) {
+ break;
+ }
+ prev = holder;
+ }
+ long next = reqArenaMgr.getNextRequest(holder);
+ reqArenaMgr.setNextRequest(prev, next);
+ return holder;
+ }
+
+ private int determineNewMaxMode(long resource, int oldMaxMode) {
+ int newMaxMode = LockMode.NL;
+ long holder = resArenaMgr.getLastHolder(resource);
+ while (holder != -1) {
+ int curLockMode = reqArenaMgr.getLockMode(holder);
+ if (curLockMode == oldMaxMode) {
+ // we have another lock of the same mode - we're done
+ return oldMaxMode;
+ }
+ switch (ACTION_MATRIX[newMaxMode][curLockMode]) {
+ case UPD:
+ newMaxMode = curLockMode;
+ break;
+ case GET:
+ break;
+ case WAIT:
+ throw new IllegalStateException("incompatible locks in holder queue");
+ }
+ holder = reqArenaMgr.getNextRequest(holder);
+ }
+ return newMaxMode;
+ }
+
+ private boolean resourceNotUsed(long resource) {
+ return resArenaMgr.getLastHolder(resource) == -1
+ && resArenaMgr.getFirstUpgrader(resource) == -1
+ && resArenaMgr.getFirstWaiter(resource) == -1;
+ }
+
+ private void log(String string, int id, int entityHashValue, byte lockMode, ITransactionContext txnContext) {
+ if (! LOGGER.isLoggable(Level.FINEST)) {
+ return;
+ }
+ StringBuilder sb = new StringBuilder();
+ sb.append("{ op : ").append(string);
+ if (id != -1) {
+ sb.append(" , dataset : ").append(id);
+ }
+ if (entityHashValue != -1) {
+ sb.append(" , entity : ").append(entityHashValue);
+ }
+ if (lockMode != LockMode.NL) {
+ sb.append(" , mode : ").append(LockMode.toString(lockMode));
+ }
+ if (txnContext != null) {
+ sb.append(" , jobId : ").append(txnContext.getJobId());
+ }
+ sb.append(" }");
+ LOGGER.finest(sb.toString());
+ }
+
+ private void validateJob(ITransactionContext txnContext) throws ACIDException {
+ if (txnContext.getTxnState() == ITransactionManager.ABORTED) {
+ throw new ACIDException("" + txnContext.getJobId() + " is in ABORTED state.");
+ } else if (txnContext.isTimeout()) {
+ requestAbort(txnContext);
+ }
+ }
+
+ private void requestAbort(ITransactionContext txnContext) throws ACIDException {
+ txnContext.setTimeout(true);
+ throw new ACIDException("Transaction " + txnContext.getJobId()
+ + " should abort (requested by the Lock Manager)");
+ }
+
+ public StringBuilder append(StringBuilder sb) {
+ table.getAllLatches();
+ try {
+ sb.append(">>dump_begin\t>>----- [resTable] -----\n");
+ table.append(sb);
+ sb.append(">>dump_end\t>>----- [resTable] -----\n");
+
+ sb.append(">>dump_begin\t>>----- [resArenaMgr] -----\n");
+ resArenaMgr.append(sb);
+ sb.append(">>dump_end\t>>----- [resArenaMgr] -----\n");
+
+ sb.append(">>dump_begin\t>>----- [reqArenaMgr] -----\n");
+ reqArenaMgr.append(sb);
+ sb.append(">>dump_end\t>>----- [reqArenaMgr] -----\n");
+
+ sb.append(">>dump_begin\t>>----- [jobIdSlotMap] -----\n");
+ for(Integer i : jobIdSlotMap.keySet()) {
+ sb.append(i).append(" : ");
+ TypeUtil.Global.append(sb, jobIdSlotMap.get(i));
+ sb.append("\n");
+ }
+ sb.append(">>dump_end\t>>----- [jobIdSlotMap] -----\n");
+
+ sb.append(">>dump_begin\t>>----- [jobArenaMgr] -----\n");
+ jobArenaMgr.append(sb);
+ sb.append(">>dump_end\t>>----- [jobArenaMgr] -----\n");
+ } finally {
+ table.releaseAllLatches();
+ }
+ return sb;
+ }
+
+ public String toString() {
+ return append(new StringBuilder()).toString();
+ }
+
+ @Override
+ public String prettyPrint() throws ACIDException {
+ StringBuilder s = new StringBuilder("\n########### LockManager Status #############\n");
+ return append(s).toString() + "\n";
+ }
+
+ @Override
+ public void start() {
+ //no op
+ }
+
+ @Override
+ public void stop(boolean dumpState, OutputStream os) {
+ if (dumpState) {
+ try {
+ os.write(toString().getBytes());
+ os.flush();
+ } catch (IOException e) {
+ //ignore
+ }
+ }
+ }
+
+ private static class DatasetLockCache {
+ private long jobId = -1;
+ private HashMap<Integer,Byte> lockCache = new HashMap<Integer,Byte>();
+ // size 1 cache to avoid the boxing/unboxing that comes with the
+ // access to the HashMap
+ private int cDsId = -1;
+ private byte cDsLockMode = -1;
+
+ public boolean contains(final int jobId, final int dsId, byte dsLockMode) {
+ if (this.jobId == jobId) {
+ if (this.cDsId == dsId && this.cDsLockMode == dsLockMode) {
+ return true;
+ }
+ final Byte cachedLockMode = this.lockCache.get(dsId);
+ if (cachedLockMode != null && cachedLockMode == dsLockMode) {
+ this.cDsId = dsId;
+ this.cDsLockMode = dsLockMode;
+ return true;
+ }
+ } else {
+ this.jobId = -1;
+ this.cDsId = -1;
+ this.cDsLockMode = -1;
+ this.lockCache.clear();
+ }
+ return false;
+ }
+
+ public void put(final int jobId, final int dsId, byte dsLockMode) {
+ this.jobId = jobId;
+ this.cDsId = dsId;
+ this.cDsLockMode = dsLockMode;
+ this.lockCache.put(dsId, dsLockMode);
+ }
+
+ public String toString() {
+ return "[ " + jobId + " : " + lockCache.toString() + "]";
+ }
+ }
+
+ private static class ResourceGroupTable {
+ public static final int TABLE_SIZE = 1024; // TODO increase?
+
+ private ResourceGroup[] table;
+
+ public ResourceGroupTable() {
+ table = new ResourceGroup[TABLE_SIZE];
+ for (int i = 0; i < TABLE_SIZE; ++i) {
+ table[i] = new ResourceGroup();
+ }
+ }
+
+ ResourceGroup get(DatasetId dId, int entityHashValue) {
+ // TODO ensure good properties of hash function
+ int h = Math.abs(dId.getId() ^ entityHashValue);
+ if (h < 0) h = 0;
+ return table[h % TABLE_SIZE];
+ }
+
+ public void getAllLatches() {
+ for (int i = 0; i < TABLE_SIZE; ++i) {
+ table[i].getLatch();
+ }
+ }
+
+ public void releaseAllLatches() {
+ for (int i = 0; i < TABLE_SIZE; ++i) {
+ table[i].releaseLatch();
+ }
+ }
+
+ public StringBuilder append(StringBuilder sb) {
+ return append(sb, false);
+ }
+
+ public StringBuilder append(StringBuilder sb, boolean detail) {
+ for (int i = 0; i < table.length; ++i) {
+ sb.append(i).append(" : ");
+ if (detail) {
+ sb.append(table[i]);
+ } else {
+ sb.append(table[i].firstResourceIndex);
+ }
+ sb.append('\n');
+ }
+ return sb;
+ }
+ }
+
+ private static class ResourceGroup {
+ private ReentrantReadWriteLock latch;
+ private Condition condition;
+ AtomicLong firstResourceIndex;
+
+ ResourceGroup() {
+ latch = new ReentrantReadWriteLock();
+ condition = latch.writeLock().newCondition();
+ firstResourceIndex = new AtomicLong(-1);
+ }
+
+ void getLatch() {
+ log("latch");
+ latch.writeLock().lock();
+ }
+
+ void releaseLatch() {
+ log("release");
+ latch.writeLock().unlock();
+ }
+
+ boolean hasWaiters() {
+ return latch.hasQueuedThreads();
+ }
+
+ void await(ITransactionContext txnContext) throws ACIDException {
+ log("wait for");
+ try {
+ condition.await();
+ } catch (InterruptedException e) {
+ throw new ACIDException(txnContext, "interrupted", e);
+ }
+ }
+
+ void wakeUp() {
+ log("notify");
+ condition.signalAll();
+ }
+
+ void log(String s) {
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest(s + " " + toString());
+ }
+ }
+
+ public String toString() {
+ return "{ id : " + hashCode()
+ + ", first : " + firstResourceIndex.toString()
+ + ", waiters : " + (hasWaiters() ? "true" : "false") + " }";
+ }
+ }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Job.json b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Job.json
new file mode 100644
index 0000000..a649b7c
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Job.json
@@ -0,0 +1,24 @@
+{
+ "name" : "Job",
+ "fields" : [
+ {
+ "name" : "last holder",
+ "type" : "GLOBAL",
+ "initial" : "-1"
+ },
+ {
+ "name" : "last waiter",
+ "type" : "GLOBAL",
+ "initial" : "-1"
+ },
+ {
+ "name" : "last upgrader",
+ "type" : "GLOBAL",
+ "initial" : "-1"
+ },
+ {
+ "name" : "job id",
+ "type" : "INT"
+ }
+ ]
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index c7df2f2..98b16c0 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -278,7 +278,7 @@
did = entityInfoManager.getDatasetId(entityInfo);
entityHashValue = entityInfoManager.getPKHashVal(entityInfo);
if (did == datasetId.getId() && entityHashValue != -1) {
- this.unlock(datasetId, entityHashValue, txnContext);
+ this.unlock(datasetId, entityHashValue, LockMode.ANY, txnContext);
}
entityInfo = prevEntityInfo;
@@ -638,7 +638,7 @@
}
@Override
- public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext) throws ACIDException {
+ public void unlock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext) throws ACIDException {
internalUnlock(datasetId, entityHashValue, txnContext, false);
}
@@ -2211,7 +2211,7 @@
tempDatasetIdObj.setId(logRecord.getDatasetId());
tempJobIdObj.setId(logRecord.getJobId());
txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj, false);
- unlock(tempDatasetIdObj, logRecord.getPKHashValue(), txnCtx);
+ unlock(tempDatasetIdObj, logRecord.getPKHashValue(), LockMode.ANY, txnCtx);
txnCtx.notifyOptracker(false);
} else if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
tempJobIdObj.setId(logRecord.getJobId());
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
index e61cb55..1bcfa22 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
@@ -24,6 +24,7 @@
import org.apache.commons.io.FileUtils;
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
@@ -32,6 +33,7 @@
import edu.uci.ics.asterix.common.transactions.ILockManager;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -62,7 +64,7 @@
ArrayList<LockRequest> requestList;
ArrayList<ArrayList<Integer>> expectedResultList;
int resultListIndex;
- LockManager lockMgr;
+ ILockManager lockMgr;
String requestFileName;
long defaultWaitTime;
@@ -72,7 +74,7 @@
this.workerReadyQueue = new WorkerReadyQueue();
this.requestList = new ArrayList<LockRequest>();
this.expectedResultList = new ArrayList<ArrayList<Integer>>();
- this.lockMgr = (LockManager) txnProvider.getLockManager();
+ this.lockMgr = txnProvider.getLockManager();
this.requestFileName = new String(requestFileName);
this.resultListIndex = 0;
this.defaultWaitTime = 10;
@@ -151,6 +153,8 @@
if (isSuccess) {
log("\n*** Test Passed ***");
}
+ ((LogManager) txnProvider.getLogManager()).stop(false, null);
+ AsterixThreadExecutor.INSTANCE.shutdown();
}
public boolean handleRequest(LockRequest request) throws ACIDException {
@@ -482,7 +486,7 @@
request.txnContext);
break;
case RequestType.UNLOCK:
- lockMgr.unlock(request.datasetIdObj, request.entityHashValue, request.txnContext);
+ lockMgr.unlock(request.datasetIdObj, request.entityHashValue, request.lockMode, request.txnContext);
break;
case RequestType.RELEASE_LOCKS:
lockMgr.releaseLocks(request.txnContext);
@@ -511,6 +515,18 @@
public void log(String s) {
System.out.println(s);
}
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{ t : \"").append(threadName).append("\", r : ");
+ if (lockRequest == null) {
+ sb.append("null");
+ } else {
+ sb.append("\"").append(lockRequest.toString()).append("\"");
+ }
+ sb.append(" }");
+ return sb.toString();
+ }
}
class WorkerReadyQueue {
@@ -618,7 +634,7 @@
} catch (InterruptedException e) {
e.printStackTrace();
}
- log(Thread.currentThread().getName() + "Waiting for worker to finish its task...");
+ log(Thread.currentThread().getName() + " Waiting for worker to finish its task...");
queueSize = workerReadyQueue.size();
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
index e6f2798..8cdc87b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
@@ -14,9 +14,14 @@
*/
package edu.uci.ics.asterix.transaction.management.service.locking;
+import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Random;
+import org.apache.commons.io.FileUtils;
+
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
@@ -26,6 +31,7 @@
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -45,9 +51,16 @@
private static int jobId = 0;
private static Random rand;
- public static void main(String args[]) throws ACIDException, AsterixException {
+ public static void main(String args[]) throws ACIDException, AsterixException, IOException {
int i;
- TransactionSubsystem txnProvider = new TransactionSubsystem("LockManagerRandomUnitTest", null,
+ //prepare configuration file
+ File cwd = new File(System.getProperty("user.dir"));
+ File asterixdbDir = cwd.getParentFile();
+ File srcFile = new File(asterixdbDir.getAbsoluteFile(), "asterix-app/src/main/resources/asterix-build-configuration.xml");
+ File destFile = new File(cwd, "target/classes/asterix-configuration.xml");
+ FileUtils.copyFile(srcFile, destFile);
+
+ TransactionSubsystem txnProvider = new TransactionSubsystem("nc1", null,
new AsterixTransactionProperties(new AsterixPropertiesAccessor()));
rand = new Random(System.currentTimeMillis());
for (i = 0; i < MAX_NUM_OF_ENTITY_LOCK_JOB; i++) {
@@ -64,6 +77,8 @@
System.out.println("Creating " + i + "th EntityLockUpgradeJob..");
generateEntityLockUpgradeThread(txnProvider);
}
+ ((LogManager) txnProvider.getLogManager()).stop(false, null);
+ AsterixThreadExecutor.INSTANCE.shutdown();
}
private static void generateEntityLockThread(TransactionSubsystem txnProvider) {
@@ -496,7 +511,7 @@
request.txnContext);
break;
case RequestType.UNLOCK:
- lockMgr.unlock(request.datasetIdObj, request.entityHashValue, request.txnContext);
+ lockMgr.unlock(request.datasetIdObj, request.entityHashValue, request.lockMode, request.txnContext);
break;
case RequestType.RELEASE_LOCKS:
lockMgr.releaseLocks(request.txnContext);
@@ -555,6 +570,11 @@
this.entityHashValue = waitTime;
}
+ @Override
+ public String toString() {
+ return prettyPrint();
+ }
+
public String prettyPrint() {
StringBuilder s = new StringBuilder();
//s.append(threadName.charAt(7)).append("\t").append("\t");
@@ -595,23 +615,7 @@
}
s.append("\tJ").append(txnContext.getJobId().getId()).append("\tD").append(datasetIdObj.getId()).append("\tE")
.append(entityHashValue).append("\t");
- switch (lockMode) {
- case LockMode.S:
- s.append("S");
- break;
- case LockMode.X:
- s.append("X");
- break;
- case LockMode.IS:
- s.append("IS");
- break;
- case LockMode.IX:
- s.append("IX");
- break;
- default:
- throw new UnsupportedOperationException("Unsupported lock mode");
- }
- s.append("\n");
+ s.append(LockMode.toString(lockMode)).append("\n");
return s.toString();
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Request.json b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Request.json
new file mode 100644
index 0000000..0c4fa71
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Request.json
@@ -0,0 +1,29 @@
+{
+ "name" : "Request",
+ "fields" : [
+ {
+ "name" : "resource id",
+ "type" : "GLOBAL"
+ },
+ {
+ "name" : "job slot",
+ "type" : "GLOBAL"
+ },
+ {
+ "name" : "prev job request",
+ "type" : "GLOBAL"
+ },
+ {
+ "name" : "next job request",
+ "type" : "GLOBAL"
+ },
+ {
+ "name" : "next request",
+ "type" : "GLOBAL"
+ },
+ {
+ "name" : "lock mode",
+ "type" : "INT"
+ }
+ ]
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Resource.json b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Resource.json
new file mode 100644
index 0000000..d0d553a
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Resource.json
@@ -0,0 +1,37 @@
+{
+ "name" : "Resource",
+ "fields" : [
+ {
+ "name" : "last holder",
+ "type" : "GLOBAL",
+ "initial" : "-1"
+ },
+ {
+ "name" : "first waiter",
+ "type" : "GLOBAL",
+ "initial" : "-1"
+ },
+ {
+ "name" : "first upgrader",
+ "type" : "GLOBAL",
+ "initial" : "-1"
+ },
+ {
+ "name" : "next",
+ "type" : "GLOBAL"
+ },
+ {
+ "name" : "max mode",
+ "type" : "INT",
+ "initial" : "edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode.NL"
+ },
+ {
+ "name" : "dataset id",
+ "type" : "INT"
+ },
+ {
+ "name" : "pk hash val",
+ "type" : "INT"
+ }
+ ]
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 933afcd..4f13b9f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -41,7 +41,6 @@
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.LogManagerProperties;
import edu.uci.ics.asterix.common.transactions.MutableLong;
-import edu.uci.ics.asterix.transaction.management.service.locking.LockManager;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
@@ -82,7 +81,7 @@
emptyQ = new LinkedBlockingQueue<LogPage>(numLogPages);
flushQ = new LinkedBlockingQueue<LogPage>(numLogPages);
for (int i = 0; i < numLogPages; i++) {
- emptyQ.offer(new LogPage((LockManager) txnSubsystem.getLockManager(), logPageSize, flushLSN));
+ emptyQ.offer(new LogPage(txnSubsystem, logPageSize, flushLSN));
}
appendLSN = initializeLogAnchor(nextLogFileId);
flushLSN.set(appendLSN);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
index a3f42a7..1ed6fba 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
@@ -22,16 +22,21 @@
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.ILogPage;
import edu.uci.ics.asterix.common.transactions.ILogRecord;
+import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.common.transactions.MutableLong;
import edu.uci.ics.asterix.transaction.management.service.locking.LockManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
public class LogPage implements ILogPage {
public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(LogPage.class.getName());
- private final LockManager lockMgr;
+ private final TransactionSubsystem txnSubsystem;
private final LogPageReader logPageReader;
private final int logPageSize;
private final MutableLong flushLSN;
@@ -46,8 +51,8 @@
private FileChannel fileChannel;
private boolean stop;
- public LogPage(LockManager lockMgr, int logPageSize, MutableLong flushLSN) {
- this.lockMgr = lockMgr;
+ public LogPage(TransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLSN) {
+ this.txnSubsystem = txnSubsystem;
this.logPageSize = logPageSize;
this.flushLSN = flushLSN;
appendBuffer = ByteBuffer.allocate(logPageSize);
@@ -187,7 +192,27 @@
private void batchUnlock(int beginOffset, int endOffset) throws ACIDException {
if (endOffset > beginOffset) {
logPageReader.initializeScan(beginOffset, endOffset);
- lockMgr.batchUnlock(this, logPageReader);
+
+ DatasetId dsId = new DatasetId(-1);
+ JobId jId = new JobId(-1);
+ ITransactionContext txnCtx = null;
+
+ LogRecord logRecord = logPageReader.next();
+ while (logRecord != null) {
+ if (logRecord.getLogType() == LogType.ENTITY_COMMIT) {
+ dsId.setId(logRecord.getDatasetId());
+ jId.setId(logRecord.getJobId());
+ txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jId, false);
+ txnSubsystem.getLockManager().unlock(dsId, logRecord.getPKHashValue(), LockMode.ANY, txnCtx);
+ txnCtx.notifyOptracker(false);
+ } else if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
+ jId.setId(logRecord.getJobId());
+ txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jId, false);
+ txnCtx.notifyOptracker(true);
+ notifyJobTerminator();
+ }
+ logRecord = logPageReader.next();
+ }
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
index 78bca42..91f2535 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
@@ -30,17 +30,34 @@
}
public static class LockManagerConstants {
- public static final String LOCK_CONF_DIR = "lock_conf";
- public static final String LOCK_CONF_FILE = "lock.conf";
- public static final int[] LOCK_CONFLICT_MATRIX = new int[] { 2, 3 };
- public static final int[] LOCK_CONVERT_MATRIX = new int[] { 2, 0 };
-
public static class LockMode {
- public static final byte S = 0;
- public static final byte X = 1;
- public static final byte IS = 2;
- public static final byte IX = 3;
+ public static final byte ANY = -1;
+ public static final byte NL = 0;
+ public static final byte IS = 1;
+ public static final byte IX = 2;
+ public static final byte S = 3;
+ public static final byte X = 4;
+
+ public static byte intentionMode(byte mode) {
+ switch (mode) {
+ case S: return IS;
+ case X: return IX;
+ default: throw new IllegalArgumentException(
+ "no intention lock mode for " + toString(mode));
+ }
+ }
+
+ public static String toString(byte mode) {
+ switch (mode) {
+ case ANY: return "ANY";
+ case NL: return "NL";
+ case IS: return "IS";
+ case IX: return "IX";
+ case S: return "S";
+ case X: return "X";
+ default: throw new IllegalArgumentException("no such lock mode");
+ }
+ }
}
}
-
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index aceeb82..4cb5fac 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -22,7 +22,7 @@
import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
-import edu.uci.ics.asterix.transaction.management.service.locking.LockManager;
+import edu.uci.ics.asterix.transaction.management.service.locking.ConcurrentLockManager;
import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
import edu.uci.ics.asterix.transaction.management.service.recovery.CheckpointThread;
import edu.uci.ics.asterix.transaction.management.service.recovery.RecoveryManager;
@@ -46,7 +46,7 @@
this.id = id;
this.txnProperties = txnProperties;
this.transactionManager = new TransactionManager(this);
- this.lockManager = new LockManager(this);
+ this.lockManager = new ConcurrentLockManager(this);
this.logManager = new LogManager(this);
this.recoveryManager = new RecoveryManager(this);
this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;