merge from zheilbron/hyracks_msr
diff --git a/pregelix-dataflow/pom.xml b/pregelix-dataflow/pom.xml
index 2828451..1df75ae 100644
--- a/pregelix-dataflow/pom.xml
+++ b/pregelix-dataflow/pom.xml
@@ -7,8 +7,7 @@
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ! See the
License for the specific language governing permissions and ! limitations
under the License. ! -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>pregelix-dataflow</artifactId>
<packaging>jar</packaging>
@@ -17,7 +16,7 @@
<parent>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
</parent>
@@ -84,75 +83,75 @@
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix-api</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix-dataflow-std-base</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-common</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-data-std</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-common</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-btree</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-lsm-common</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-control-cc</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-control-nc</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-ipc</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
diff --git a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index 24a0a9e..e25a46a 100644
--- a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -79,7 +79,7 @@
bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs,
new PreDelayPageCleanerPolicy(Long.MAX_VALUE), fileMapManager, pageSize, numPages, 1000000,
threadFactory);
- int numPagesInMemComponents = numPages / 4;
+ int numPagesInMemComponents = numPages / 8;
vBufferCache = new MultitenantVirtualBufferCache(new VirtualBufferCache(new HeapBufferAllocator(), pageSize,
numPagesInMemComponents));
ioManager = (IOManager) appCtx.getRootContext().getIOManager();
diff --git a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index 1d7c979..75f8ed8 100644
--- a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -31,6 +31,7 @@
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.api.util.JobStateUtils;
import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
import edu.uci.ics.pregelix.dataflow.context.StateKey;
@@ -91,22 +92,6 @@
}
}
- public static void writeForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
- try {
- FileSystem dfs = FileSystem.get(conf);
- String pathStr = IterationUtils.TMP_DIR + jobId + "fterm";
- Path path = new Path(pathStr);
- if (!dfs.exists(path)) {
- FSDataOutputStream output = dfs.create(path, true);
- output.writeBoolean(true);
- output.flush();
- output.close();
- }
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
public static void writeGlobalAggregateValue(Configuration conf, String jobId, Writable agg)
throws HyracksDataException {
try {
@@ -136,19 +121,12 @@
}
}
+ public static void writeForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
+ JobStateUtils.writeForceTerminationState(conf, jobId);
+ }
+
public static boolean readForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
- try {
- FileSystem dfs = FileSystem.get(conf);
- String pathStr = IterationUtils.TMP_DIR + jobId + "fterm";
- Path path = new Path(pathStr);
- if (dfs.exists(path)) {
- return true;
- } else {
- return false;
- }
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
+ return JobStateUtils.readForceTerminationState(conf, jobId);
}
public static Writable readGlobalAggregateValue(Configuration conf, String jobId) throws HyracksDataException {