Merge branch 'master' into salsubaiee/master_lsm
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 51a9ce3..d68ad2c 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -538,4 +538,15 @@
public static boolean useLSM(Configuration conf) {
return conf.getBoolean(PregelixJob.UPDATE_INTENSIVE, false);
}
+
+ /***
+ * Get the spilling dir name for global aggregates
+ *
+ * @param conf
+ * @param superStep
+ * @return the spilling dir name
+ */
+ public static String getGlobalAggregateSpillingDirName(Configuration conf, long superStep) {
+ return "/tmp/pregelix/agg/" + conf.get(PregelixJob.JOB_ID) + "/" + superStep;
+ }
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java
index 24105ae..a0f67e3 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java
@@ -15,6 +15,14 @@
package edu.uci.ics.pregelix.api.util;
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -41,4 +49,19 @@
}
}
+ public static void flushTupleToHDFS(ArrayTupleBuilder atb, Configuration conf, long superStep)
+ throws HyracksDataException {
+ try {
+ if (atb.getSize()>0) {
+ FileSystem dfs = FileSystem.get(conf);
+ String fileName = BspUtils.getGlobalAggregateSpillingDirName(conf, superStep) +"/" + UUID.randomUUID();
+ FSDataOutputStream dos = dfs.create(new Path(fileName), true);
+ dos.write(atb.getByteArray(), 0, atb.getSize());
+ dos.flush();
+ dos.close();
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
}
diff --git a/pregelix/pregelix-core/pom.xml b/pregelix/pregelix-core/pom.xml
index a3f24f4..3d1699f 100644
--- a/pregelix/pregelix-core/pom.xml
+++ b/pregelix/pregelix-core/pom.xml
@@ -90,21 +90,55 @@
<version>1.3</version>
<executions>
<execution>
+ <id>pregelix</id>
<configuration>
+ <platforms>
+ <platform>unix</platform>
+ </platforms>
<programs>
<program>
<mainClass>org.apache.hadoop.util.RunJar</mainClass>
- <name>pregelix-obselete</name>
+ <name>pregelix</name>
</program>
</programs>
<repositoryLayout>flat</repositoryLayout>
<repositoryName>lib</repositoryName>
+ <configurationDirectory>etc:"$HADOOP_HOME"/conf:/etc/hadoop/conf:"$1"</configurationDirectory>
</configuration>
<phase>package</phase>
<goals>
<goal>assemble</goal>
</goals>
</execution>
+ <execution>
+ <id>cc_nc_drivers</id>
+ <configuration>
+ <platforms>
+ <platform>unix</platform>
+ </platforms>
+ <programs>
+ <program>
+ <mainClass>edu.uci.ics.hyracks.control.cc.CCDriver</mainClass>
+ <name>pregelixcc</name>
+ </program>
+ <program>
+ <mainClass>edu.uci.ics.hyracks.control.nc.NCDriver</mainClass>
+ <name>pregelixnc</name>
+ <commandLineArguments>
+ <commandLineArgument>-app-nc-main-class</commandLineArgument>
+ <commandLineArgument>edu.uci.ics.pregelix.runtime.bootstrap.NCApplicationEntryPoint</commandLineArgument>
+ </commandLineArguments>
+ </program>
+ </programs>
+ <repositoryLayout>flat</repositoryLayout>
+ <repositoryName>lib</repositoryName>
+ <configurationDirectory>etc:"$HADOOP_HOME"/conf:/etc/hadoop/conf</configurationDirectory>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>assemble</goal>
+ </goals>
+ </execution>
</executions>
</plugin>
<plugin>
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 3e6e9a5..7bd2cf8 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -99,7 +99,7 @@
IntWritable lastSnapshotSuperstep = new IntWritable(0);
boolean failed = false;
int retryCount = 0;
- int maxRetryCount = 3;
+ int maxRetryCount = 1;
do {
try {
@@ -142,12 +142,11 @@
//restart from snapshot
failed = true;
retryCount++;
- ioe.printStackTrace();
+ throw new HyracksException(ioe);
}
} while (failed && retryCount < maxRetryCount);
LOG.info("job finished");
} catch (Exception e) {
- e.printStackTrace();
throw new HyracksException(e);
}
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index d37c6fd..36723a6 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -108,11 +108,13 @@
import edu.uci.ics.pregelix.dataflow.VertexFileScanOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.bootstrap.IndexLifeCycleManagerProvider;
import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
import edu.uci.ics.pregelix.runtime.bootstrap.VirtualBufferCacheProvider;
+import edu.uci.ics.pregelix.runtime.touchpoint.RecoveryRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexPartitionComputerFactory;
@@ -374,9 +376,10 @@
@Override
public JobSpecification[] generateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
try {
+
PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
- FileOutputFormat.setOutputPath(tmpJob, new Path(vertexCheckpointPath));
+ FileOutputFormat.setOutputPath(tmpJob, new Path(vertexCheckpointPath + "/" + lastSuccessfulIteration));
tmpJob.setOutputKeyClass(NullWritable.class);
tmpJob.setOutputValueClass(BspUtils.getVertexClass(tmpJob.getConfiguration()));
JobSpecification vertexCkpSpec = scanIndexWriteToHDFS(tmpJob.getConfiguration());
@@ -409,7 +412,7 @@
try {
PregelixJob tmpJob = this.createCloneJob("Vertex checkpoint loading for job " + jobId, pregelixJob);
tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
- FileInputFormat.setInputPaths(tmpJob, new Path(vertexCheckpointPath));
+ FileInputFormat.setInputPaths(tmpJob, new Path(vertexCheckpointPath + "/" + lastCheckpointedIteration));
JobSpecification vertexLoadSpec = loadHDFSData(tmpJob.getConfiguration());
JobSpecification[] stateLoadSpecs = generateStateCheckpointLoading(lastCheckpointedIteration, tmpJob);
JobSpecification[] specs = new JobSpecification[1 + stateLoadSpecs.length];
@@ -637,7 +640,7 @@
MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
ClusterConfig.setLocationConstraint(spec, materializeRead);
- String checkpointPath = "/tmp/ckpoint/" + jobId + "/message";
+ String checkpointPath = "/tmp/ckpoint/" + jobId + "/message/" + lastSuccessfulIteration;
PregelixJob tmpJob = createCloneJob("State checkpointing for job " + jobId, pregelixJob);
tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
@@ -657,7 +660,7 @@
@SuppressWarnings({ "unchecked", "rawtypes" })
protected JobSpecification[] generateStateCheckpointLoading(int lastCheckpointedIteration, PregelixJob job)
throws HyracksException {
- String checkpointPath = "/tmp/ckpoint/" + jobId + "/message";
+ String checkpointPath = "/tmp/ckpoint/" + jobId + "/message/" + lastCheckpointedIteration;
PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
try {
@@ -696,6 +699,12 @@
recordDescriptor);
ClusterConfig.setLocationConstraint(spec, materialize);
+ /** construct runtime hook */
+ RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+ new RecoveryRuntimeHookFactory(jobId, lastCheckpointedIteration + 1, new ConfigurationFactory(
+ pregelixJob.getConfiguration())));
+ ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
ClusterConfig.setLocationConstraint(spec, emptySink);
@@ -706,7 +715,8 @@
ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0,
materialize, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, emptySink, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
spec.setFrameSize(frameSize);
return new JobSpecification[] { spec };
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 58384b2..41887c0 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -529,7 +529,7 @@
tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
/** generate secondary index checkpoint */
- String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary";
+ String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary/" + lastSuccessfulIteration;
FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
tmpJob.setOutputKeyClass(BspUtils.getVertexIndexClass(tmpJob.getConfiguration()));
tmpJob.setOutputValueClass(MsgList.class);
@@ -569,7 +569,7 @@
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
JobSpecification spec = new JobSpecification();
- String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary";
+ String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary/" + lastSuccessfulIteration;;
PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
try {
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index e8a6b5c..e1795de 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -61,6 +61,8 @@
ccConfig.defaultMaxJobAttempts = 0;
ccConfig.jobHistorySize = 1;
ccConfig.profileDumpPeriod = -1;
+ //ccConfig.heartbeatPeriod = 5000;
+ //ccConfig.maxHeartbeatLapsePeriods = 1;
// cluster controller
cc = new ClusterControllerService(ccConfig);
@@ -74,7 +76,7 @@
ncConfig1.dataIPAddress = "127.0.0.1";
ncConfig1.datasetIPAddress = "127.0.0.1";
ncConfig1.nodeId = NC1_ID;
- ncConfig1.ioDevices="dev1,dev2";
+ ncConfig1.ioDevices = "dev1,dev2";
ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
@@ -87,7 +89,7 @@
ncConfig2.datasetIPAddress = "127.0.0.1";
ncConfig2.nodeId = NC2_ID;
ncConfig2.appNCMainClass = NCApplicationEntryPoint.class.getName();
- ncConfig2.ioDevices="dev3,dev4";
+ ncConfig2.ioDevices = "dev3,dev4";
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
@@ -96,6 +98,14 @@
ClusterConfig.loadClusterConfig(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
}
+ public static void showDownNC1() throws Exception {
+ nc1.stop();
+ }
+
+ public static void showDownNC2() throws Exception {
+ nc2.stop();
+ }
+
public static void deinit() throws Exception {
nc2.stop();
nc1.stop();
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/pregelix b/pregelix/pregelix-core/src/main/resources/scripts/pregelix
deleted file mode 100644
index 7232ccc..0000000
--- a/pregelix/pregelix-core/src/main/resources/scripts/pregelix
+++ /dev/null
@@ -1,111 +0,0 @@
-#!/bin/sh
-#
-#------------------------------------------------------------------------
-# Copyright 2009-2013 by The Regents of the University of California
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# you may obtain a copy of the License from
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ------------------------------------------------------------------------
-#
-
-
-# resolve links - $0 may be a softlink
-PRG="$0"
-
-while [ -h "$PRG" ]; do
- ls=`ls -ld "$PRG"`
- link=`expr "$ls" : '.*-> \(.*\)$'`
- if expr "$link" : '/.*' > /dev/null; then
- PRG="$link"
- else
- PRG=`dirname "$PRG"`/"$link"
- fi
-done
-
-PRGDIR=`dirname "$PRG"`
-BASEDIR=`cd "$PRGDIR/.." >/dev/null; pwd`
-
-
-
-# OS specific support. $var _must_ be set to either true or false.
-cygwin=false;
-darwin=false;
-case "`uname`" in
- CYGWIN*) cygwin=true ;;
- Darwin*) darwin=true
- if [ -z "$JAVA_VERSION" ] ; then
- JAVA_VERSION="CurrentJDK"
- else
- echo "Using Java version: $JAVA_VERSION"
- fi
- if [ -z "$JAVA_HOME" ] ; then
- JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/${JAVA_VERSION}/Home
- fi
- ;;
-esac
-
-if [ -z "$JAVA_HOME" ] ; then
- if [ -r /etc/gentoo-release ] ; then
- JAVA_HOME=`java-config --jre-home`
- fi
-fi
-
-# For Cygwin, ensure paths are in UNIX format before anything is touched
-if $cygwin ; then
- [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
- [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
-fi
-
-# If a specific java binary isn't specified search for the standard 'java' binary
-if [ -z "$JAVACMD" ] ; then
- if [ -n "$JAVA_HOME" ] ; then
- if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
- # IBM's JDK on AIX uses strange locations for the executables
- JAVACMD="$JAVA_HOME/jre/sh/java"
- else
- JAVACMD="$JAVA_HOME/bin/java"
- fi
- else
- JAVACMD=`which java`
- fi
-fi
-
-if [ ! -x "$JAVACMD" ] ; then
- echo "Error: JAVA_HOME is not defined correctly." 1>&2
- echo " We cannot execute $JAVACMD" 1>&2
- exit 1
-fi
-
-if [ -z "$REPO" ]
-then
- REPO="$BASEDIR"/lib
-fi
-
-CLASSPATH=$CLASSPATH_PREFIX:"$HADOOP_HOME"/conf:/etc/hadoop/conf:"$BASEDIR"/etc:$1
-
-# For Cygwin, switch paths to Windows format before running java
-if $cygwin; then
- [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
- [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
- [ -n "$HOME" ] && HOME=`cygpath --path --windows "$HOME"`
- [ -n "$BASEDIR" ] && BASEDIR=`cygpath --path --windows "$BASEDIR"`
- [ -n "$REPO" ] && REPO=`cygpath --path --windows "$REPO"`
-fi
-
-exec "$JAVACMD" $JAVA_OPTS \
- -classpath "$CLASSPATH" \
- -Dapp.name="pregelix" \
- -Dapp.pid="$$" \
- -Dapp.repo="$REPO" \
- -Dapp.home="$BASEDIR" \
- -Dbasedir="$BASEDIR" \
- org.apache.hadoop.util.RunJar \
- "$@"
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/pregelix.bat b/pregelix/pregelix-core/src/main/resources/scripts/pregelix.bat
deleted file mode 100644
index fe53029..0000000
--- a/pregelix/pregelix-core/src/main/resources/scripts/pregelix.bat
+++ /dev/null
@@ -1,124 +0,0 @@
-@rem /*
-@rem Copyright 2009-2013 by The Regents of the University of California
-@rem Licensed under the Apache License, Version 2.0 (the "License");
-@rem you may not use this file except in compliance with the License.
-@rem you may obtain a copy of the License from
-@rem
-@rem http://www.apache.org/licenses/LICENSE-2.0
-@rem
-@rem Unless required by applicable law or agreed to in writing, software
-@rem distributed under the License is distributed on an "AS IS" BASIS,
-@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-@rem See the License for the specific language governing permissions and
-@rem limitations under the License.
-@rem */
-@REM ----------------------------------------------------------------------------
-@REM Copyright 2001-2006 The Apache Software Foundation.
-@REM
-@REM Licensed under the Apache License, Version 2.0 (the "License");
-@REM you may not use this file except in compliance with the License.
-@REM You may obtain a copy of the License at
-@REM
-@REM http://www.apache.org/licenses/LICENSE-2.0
-@REM
-@REM Unless required by applicable law or agreed to in writing, software
-@REM distributed under the License is distributed on an "AS IS" BASIS,
-@REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-@REM See the License for the specific language governing permissions and
-@REM limitations under the License.
-@REM ----------------------------------------------------------------------------
-@REM
-@REM Copyright (c) 2001-2006 The Apache Software Foundation. All rights
-@REM reserved.
-
-@echo off
-
-set ERROR_CODE=0
-
-:init
-@REM Decide how to startup depending on the version of windows
-
-@REM -- Win98ME
-if NOT "%OS%"=="Windows_NT" goto Win9xArg
-
-@REM set local scope for the variables with windows NT shell
-if "%OS%"=="Windows_NT" @setlocal
-
-@REM -- 4NT shell
-if "%eval[2+2]" == "4" goto 4NTArgs
-
-@REM -- Regular WinNT shell
-set CMD_LINE_ARGS=%*
-goto WinNTGetScriptDir
-
-@REM The 4NT Shell from jp software
-:4NTArgs
-set CMD_LINE_ARGS=%$
-goto WinNTGetScriptDir
-
-:Win9xArg
-@REM Slurp the command line arguments. This loop allows for an unlimited number
-@REM of arguments (up to the command line limit, anyway).
-set CMD_LINE_ARGS=
-:Win9xApp
-if %1a==a goto Win9xGetScriptDir
-set CMD_LINE_ARGS=%CMD_LINE_ARGS% %1
-shift
-goto Win9xApp
-
-:Win9xGetScriptDir
-set SAVEDIR=%CD%
-%0\
-cd %0\..\..
-set BASEDIR=%CD%
-cd %SAVEDIR%
-set SAVE_DIR=
-goto repoSetup
-
-:WinNTGetScriptDir
-set BASEDIR=%~dp0\..
-
-:repoSetup
-
-
-if "%JAVACMD%"=="" set JAVACMD=java
-
-if "%REPO%"=="" set REPO=%BASEDIR%\lib
-
-cp $BASEDIR"\..\a-hadoop-patch.jar "$REPO"\
-
-set CLASSPATH="%BASEDIR%"\etc;"%REPO%"\a-hadoop-patch.jar;"%REPO%"\pregelix-api-0.0.1-SNAPSHOT.jar;"%REPO%"\hyracks-dataflow-common-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-api-0.2.2-SNAPSHOT.jar;"%REPO%"\json-20090211.jar;"%REPO%"\httpclient-4.1-alpha2.jar;"%REPO%"\httpcore-4.1-beta1.jar;"%REPO%"\commons-logging-1.1.1.jar;"%REPO%"\commons-codec-1.3.jar;"%REPO%"\args4j-2.0.12.jar;"%REPO%"\hyracks-ipc-0.2.2-SNAPSHOT.jar;"%REPO%"\commons-lang3-3.1.jar;"%REPO%"\hyracks-data-std-0.2.2-SNAPSHOT.jar;"%REPO%"\hadoop-core-0.20.2.jar;"%REPO%"\commons-cli-1.2.jar;"%REPO%"\xmlenc-0.52.jar;"%REPO%"\commons-httpclient-3.0.1.jar;"%REPO%"\commons-net-1.4.1.jar;"%REPO%"\oro-2.0.8.jar;"%REPO%"\jetty-6.1.14.jar;"%REPO%"\jetty-util-6.1.14.jar;"%REPO%"\servlet-api-2.5-6.1.14.jar;"%REPO%"\jasper-runtime-5.5.12.jar;"%REPO%"\jasper-compiler-5.5.12.jar;"%REPO%"\jsp-api-2.1-6.1.14.jar;"%REPO%"\jsp-2.1-6.1.14.jar;"%REPO%"\core-3.1.1.jar;"%REPO%"\ant-1.6.5.jar;"%REPO%"\commons-el-1.0.jar;"%REPO%"\jets3t-0.7.1.jar;"%REPO%"\kfs-0.3.jar;"%REPO%"\hsqldb-1.8.0.10.jar;"%REPO%"\pregelix-dataflow-std-0.0.1-SNAPSHOT.jar;"%REPO%"\pregelix-dataflow-std-base-0.0.1-SNAPSHOT.jar;"%REPO%"\hyracks-dataflow-std-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-dataflow-hadoop-0.2.2-SNAPSHOT.jar;"%REPO%"\dcache-client-0.0.1.jar;"%REPO%"\jetty-client-8.0.0.M0.jar;"%REPO%"\jetty-http-8.0.0.RC0.jar;"%REPO%"\jetty-io-8.0.0.RC0.jar;"%REPO%"\jetty-util-8.0.0.RC0.jar;"%REPO%"\hyracks-storage-am-common-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-storage-common-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-storage-am-btree-0.2.2-SNAPSHOT.jar;"%REPO%"\btreehelper-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-control-cc-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-control-common-0.2.2-SNAPSHOT.jar;"%REPO%"\commons-io-1.3.1.jar;"%REPO%"\jetty-server-8.0.0.RC0.jar;"%REPO%"\servlet-api-3.0.20100224.jar;"%REPO%"\jetty-continuation-8.0.0.RC0.jar;"%REPO%"\jetty-webapp-8.0.0.RC0.jar;"%REPO%"\jetty-xml-8.0.0.RC0.jar;"%REPO%"\jetty-servlet-8.0.0.RC0.jar;"%REPO%"\jetty-security-8.0.0.RC0.jar;"%REPO%"\wicket-core-1.5.2.jar;"%REPO%"\wicket-util-1.5.2.jar;"%REPO%"\slf4j-api-1.6.1.jar;"%REPO%"\wicket-request-1.5.2.jar;"%REPO%"\slf4j-jcl-1.6.3.jar;"%REPO%"\hyracks-control-nc-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-net-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-hadoop-compat-0.2.2-SNAPSHOT.jar;"%REPO%"\pregelix-dataflow-0.0.1-SNAPSHOT.jar;"%REPO%"\pregelix-runtime-0.0.1-SNAPSHOT.jar;"%REPO%"\hadoop-test-0.20.2.jar;"%REPO%"\ftplet-api-1.0.0.jar;"%REPO%"\mina-core-2.0.0-M5.jar;"%REPO%"\ftpserver-core-1.0.0.jar;"%REPO%"\ftpserver-deprecated-1.0.0-M2.jar;"%REPO%"\javax.servlet-api-3.0.1.jar;"%REPO%"\pregelix-core-0.0.1-SNAPSHOT.jar
-goto endInit
-
-@REM Reaching here means variables are defined and arguments have been captured
-:endInit
-
-%JAVACMD% %JAVA_OPTS% -classpath %CLASSPATH_PREFIX%;%CLASSPATH% -Dapp.name="pregelix" -Dapp.repo="%REPO%" -Dapp.home="%BASEDIR%" -Dbasedir="%BASEDIR%" org.apache.hadoop.util.RunJar %CMD_LINE_ARGS%
-if ERRORLEVEL 1 goto error
-goto end
-
-:error
-if "%OS%"=="Windows_NT" @endlocal
-set ERROR_CODE=%ERRORLEVEL%
-
-:end
-@REM set local scope for the variables with windows NT shell
-if "%OS%"=="Windows_NT" goto endNT
-
-@REM For old DOS remove the set variables from ENV - we assume they were not set
-@REM before we started - at least we don't leave any baggage around
-set CMD_LINE_ARGS=
-goto postExec
-
-:endNT
-@REM If error code is set to 1 then the endlocal was done already in :error.
-if %ERROR_CODE% EQU 0 @endlocal
-
-
-:postExec
-
-if "%FORCE_EXIT_ON_ERROR%" == "on" (
- if %ERROR_CODE% NEQ 0 exit %ERROR_CODE%
-)
-
-exit /B %ERROR_CODE%
\ No newline at end of file
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/pregelixcc b/pregelix/pregelix-core/src/main/resources/scripts/pregelixcc
deleted file mode 100755
index c1ee3f2..0000000
--- a/pregelix/pregelix-core/src/main/resources/scripts/pregelixcc
+++ /dev/null
@@ -1,114 +0,0 @@
-#!/bin/sh
-#
-#------------------------------------------------------------------------
-# Copyright 2009-2013 by The Regents of the University of California
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# you may obtain a copy of the License from
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ------------------------------------------------------------------------
-#
-
-# resolve links - $0 may be a softlink
-PRG="$0"
-
-while [ -h "$PRG" ]; do
- ls=`ls -ld "$PRG"`
- link=`expr "$ls" : '.*-> \(.*\)$'`
- if expr "$link" : '/.*' > /dev/null; then
- PRG="$link"
- else
- PRG=`dirname "$PRG"`/"$link"
- fi
-done
-
-PRGDIR=`dirname "$PRG"`
-BASEDIR=`cd "$PRGDIR/.." >/dev/null; pwd`
-
-
-
-# OS specific support. $var _must_ be set to either true or false.
-cygwin=false;
-darwin=false;
-case "`uname`" in
- CYGWIN*) cygwin=true ;;
- Darwin*) darwin=true
- if [ -z "$JAVA_VERSION" ] ; then
- JAVA_VERSION="CurrentJDK"
- else
- echo "Using Java version: $JAVA_VERSION"
- fi
- if [ -z "$JAVA_HOME" ] ; then
- JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/${JAVA_VERSION}/Home
- fi
- ;;
-esac
-
-if [ -z "$JAVA_HOME" ] ; then
- if [ -r /etc/gentoo-release ] ; then
- JAVA_HOME=`java-config --jre-home`
- fi
-fi
-
-# For Cygwin, ensure paths are in UNIX format before anything is touched
-if $cygwin ; then
- [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
- [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
-fi
-
-# If a specific java binary isn't specified search for the standard 'java' binary
-if [ -z "$JAVACMD" ] ; then
- if [ -n "$JAVA_HOME" ] ; then
- if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
- # IBM's JDK on AIX uses strange locations for the executables
- JAVACMD="$JAVA_HOME/jre/sh/java"
- else
- JAVACMD="$JAVA_HOME/bin/java"
- fi
- else
- JAVACMD=`which java`
- fi
-fi
-
-if [ ! -x "$JAVACMD" ] ; then
- echo "Error: JAVA_HOME is not defined correctly." 1>&2
- echo " We cannot execute $JAVACMD" 1>&2
- exit 1
-fi
-
-if [ -z "$REPO" ]
-then
- REPO="$BASEDIR"/lib
-fi
-
-CLASSPATH=$CLASSPATH_PREFIX:"$HADOOP_HOME"/conf:/etc/hadoop/conf:"$BASEDIR"/etc:$1
-
-for f in ${REPO}/*.jar; do
- CLASSPATH=${CLASSPATH}:$f;
-done
-
-# For Cygwin, switch paths to Windows format before running java
-if $cygwin; then
- [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
- [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
- [ -n "$HOME" ] && HOME=`cygpath --path --windows "$HOME"`
- [ -n "$BASEDIR" ] && BASEDIR=`cygpath --path --windows "$BASEDIR"`
- [ -n "$REPO" ] && REPO=`cygpath --path --windows "$REPO"`
-fi
-
-exec "$JAVACMD" $JAVA_OPTS \
- -classpath "$CLASSPATH" \
- -Dapp.name="pregelixcc" \
- -Dapp.pid="$$" \
- -Dapp.repo="$REPO" \
- -Dapp.home="$BASEDIR" \
- -Dbasedir="$BASEDIR" \
- edu.uci.ics.hyracks.control.cc.CCDriver \
- "$@"
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/pregelixnc b/pregelix/pregelix-core/src/main/resources/scripts/pregelixnc
deleted file mode 100755
index c01b4b4..0000000
--- a/pregelix/pregelix-core/src/main/resources/scripts/pregelixnc
+++ /dev/null
@@ -1,115 +0,0 @@
-#!/bin/sh
-#
-#------------------------------------------------------------------------
-# Copyright 2009-2013 by The Regents of the University of California
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# you may obtain a copy of the License from
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ------------------------------------------------------------------------
-#
-
-# resolve links - $0 may be a softlink
-PRG="$0"
-
-while [ -h "$PRG" ]; do
- ls=`ls -ld "$PRG"`
- link=`expr "$ls" : '.*-> \(.*\)$'`
- if expr "$link" : '/.*' > /dev/null; then
- PRG="$link"
- else
- PRG=`dirname "$PRG"`/"$link"
- fi
-done
-
-PRGDIR=`dirname "$PRG"`
-BASEDIR=`cd "$PRGDIR/.." >/dev/null; pwd`
-
-
-
-# OS specific support. $var _must_ be set to either true or false.
-cygwin=false;
-darwin=false;
-case "`uname`" in
- CYGWIN*) cygwin=true ;;
- Darwin*) darwin=true
- if [ -z "$JAVA_VERSION" ] ; then
- JAVA_VERSION="CurrentJDK"
- else
- echo "Using Java version: $JAVA_VERSION"
- fi
- if [ -z "$JAVA_HOME" ] ; then
- JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/${JAVA_VERSION}/Home
- fi
- ;;
-esac
-
-if [ -z "$JAVA_HOME" ] ; then
- if [ -r /etc/gentoo-release ] ; then
- JAVA_HOME=`java-config --jre-home`
- fi
-fi
-
-# For Cygwin, ensure paths are in UNIX format before anything is touched
-if $cygwin ; then
- [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
- [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
-fi
-
-# If a specific java binary isn't specified search for the standard 'java' binary
-if [ -z "$JAVACMD" ] ; then
- if [ -n "$JAVA_HOME" ] ; then
- if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
- # IBM's JDK on AIX uses strange locations for the executables
- JAVACMD="$JAVA_HOME/jre/sh/java"
- else
- JAVACMD="$JAVA_HOME/bin/java"
- fi
- else
- JAVACMD=`which java`
- fi
-fi
-
-if [ ! -x "$JAVACMD" ] ; then
- echo "Error: JAVA_HOME is not defined correctly." 1>&2
- echo " We cannot execute $JAVACMD" 1>&2
- exit 1
-fi
-
-if [ -z "$REPO" ]
-then
- REPO="$BASEDIR"/lib
-fi
-
-CLASSPATH=$CLASSPATH_PREFIX:"$HADOOP_HOME"/conf:/etc/hadoop/conf:"$BASEDIR"/etc:$1
-
-for f in ${REPO}/*.jar; do
- CLASSPATH=${CLASSPATH}:$f;
-done
-
-
-# For Cygwin, switch paths to Windows format before running java
-if $cygwin; then
- [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
- [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
- [ -n "$HOME" ] && HOME=`cygpath --path --windows "$HOME"`2
- [ -n "$BASEDIR" ] && BASEDIR=`cygpath --path --windows "$BASEDIR"`
- [ -n "$REPO" ] && REPO=`cygpath --path --windows "$REPO"`
-fi
-
-exec "$JAVACMD" $JAVA_OPTS \
- -classpath "$CLASSPATH" \
- -Dapp.name="pregelixnc" \
- -Dapp.pid="$$" \
- -Dapp.repo="$REPO" \
- -Dapp.home="$BASEDIR" \
- -Dbasedir="$BASEDIR" \
- edu.uci.ics.hyracks.control.nc.NCDriver \
- -app-nc-main-class edu.uci.ics.pregelix.runtime.bootstrap.NCApplicationEntryPoint "$@"
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
index 3fed609..c0be9dd 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
@@ -17,9 +17,13 @@
import java.io.DataInput;
import java.io.DataInputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -32,6 +36,7 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
@@ -93,10 +98,28 @@
}
+ @SuppressWarnings("unchecked")
@Override
public void close() throws HyracksDataException {
- Writable finalAggregateValue = aggregator.finishFinal();
- IterationUtils.writeGlobalAggregateValue(conf, jobId, finalAggregateValue);
+ try {
+ // iterate over hdfs spilled aggregates
+ FileSystem dfs = FileSystem.get(conf);
+ String spillingDir = BspUtils.getGlobalAggregateSpillingDirName(conf, Vertex.getSuperstep());
+ FileStatus[] files = dfs.listStatus(new Path(spillingDir));
+ if (files != null) {
+ // goes into this branch only when there are spilled files
+ for (int i = 0; i < files.length; i++) {
+ FileStatus file = files[i];
+ DataInput dis = dfs.open(file.getPath());
+ partialAggregateValue.readFields(dis);
+ aggregator.step(partialAggregateValue);
+ }
+ }
+ Writable finalAggregateValue = aggregator.finishFinal();
+ IterationUtils.writeGlobalAggregateValue(conf, jobId, finalAggregateValue);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
}
};
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index b74a5de..9a21680 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -168,7 +168,8 @@
FileStatus[] results = dfs.listStatus(tempDir, new PathFilter() {
@Override
public boolean accept(Path dir) {
- return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0;
+ return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0
+ && dir.getName().indexOf(".crc") < 0;
}
});
return results;
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index adcb702..bfe89ab 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -147,7 +147,7 @@
return (RuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
}
- public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges) {
+ public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges, int currentIteration) {
Boolean toMove = jobIdToMove.get(jobId);
if (toMove == null || toMove == true) {
if (jobIdToSuperStep.get(jobId) == null) {
@@ -161,7 +161,11 @@
fileRef.delete();
}
- Vertex.setSuperstep(++superStep);
+ if (currentIteration > 0) {
+ Vertex.setSuperstep(currentIteration);
+ } else {
+ Vertex.setSuperstep(++superStep);
+ }
Vertex.setNumVertices(numVertices);
Vertex.setNumEdges(numEdges);
jobIdToSuperStep.put(jobId, superStep);
@@ -171,8 +175,8 @@
System.gc();
}
- public synchronized void endSuperStep(String giraphJobId) {
- jobIdToMove.put(giraphJobId, true);
+ public synchronized void endSuperStep(String pregelixJobId) {
+ jobIdToMove.put(pregelixJobId, true);
LOGGER.info("end iteration " + Vertex.getSuperstep());
}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index 603a464..1cf81ac 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -75,11 +75,11 @@
context.endSuperStep(giraphJobId);
}
- public static void setProperties(String giraphJobId, IHyracksTaskContext ctx, Configuration conf) {
+ public static void setProperties(String jobId, IHyracksTaskContext ctx, Configuration conf, int currentIteration) {
INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
- context.setVertexProperties(giraphJobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
- conf.getLong(PregelixJob.NUM_EDGES, -1));
+ context.setVertexProperties(jobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
+ conf.getLong(PregelixJob.NUM_EDGES, -1), currentIteration);
}
public static void writeTerminationState(Configuration conf, String jobId, boolean terminate)
diff --git a/pregelix/pregelix-example/pom.xml b/pregelix/pregelix-example/pom.xml
index 1066e3b..9994c0e 100644
--- a/pregelix/pregelix-example/pom.xml
+++ b/pregelix/pregelix-example/pom.xml
@@ -94,7 +94,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
- <version>2.4.1</version>
+ <version>2.5</version>
<configuration>
<filesets>
<fileset>
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
new file mode 100644
index 0000000..dac087f
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * @author yingyib
+ */
+public class FailureRecoveryTest {
+ private static String INPUTPATH = "data/webmap";
+ private static String OUTPUTPAH = "actual/result";
+ private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal";
+
+ @Test
+ public void test() throws Exception {
+ TestCluster testCluster = new TestCluster();
+
+ try {
+ PregelixJob job = new PregelixJob(PageRankVertex.class.getName());
+ job.setVertexClass(PageRankVertex.class);
+ job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+ job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ FileInputFormat.setInputPaths(job, INPUTPATH);
+ FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job.setCheckpointHook(ConservativeCheckpointHook.class);
+
+ testCluster.setUp();
+ Driver driver = new Driver(PageRankVertex.class);
+ // Thread thread = new Thread(new Runnable() {
+ //
+ // @Override
+ // public void run() {
+ // try {
+ // synchronized (this) {
+ // this.wait(10000);
+ // PregelixHyracksIntegrationUtil.showDownNC1();
+ // }
+ // } catch (Exception e) {
+ // throw new IllegalStateException(e);
+ // }
+ // }
+ //
+ // });
+ //thread.start();
+ driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+
+ TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ testCluster.tearDown();
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureVertexTest.java
similarity index 96%
rename from pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java
rename to pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureVertexTest.java
index 5a2636a..a2d32c0 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureVertexTest.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.pregelix.example.test;
+package edu.uci.ics.pregelix.example;
import junit.framework.Assert;
@@ -29,6 +29,7 @@
import edu.uci.ics.pregelix.example.FailureVertex;
import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
/**
* This test case tests the error message propagation.
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
similarity index 91%
rename from pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java
rename to pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
index d2995f1..5a485ba 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.pregelix.example.test;
+package edu.uci.ics.pregelix.example;
import java.io.File;
import java.util.ArrayList;
@@ -24,13 +24,12 @@
import org.junit.Test;
import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
import edu.uci.ics.pregelix.core.driver.Driver;
import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
-import edu.uci.ics.pregelix.example.PageRankVertex;
import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
import edu.uci.ics.pregelix.example.util.TestUtils;
/**
@@ -56,7 +55,7 @@
job1.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileInputFormat.setInputPaths(job1, INPUTPATH);
job1.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
- job1.setCheckpointHook(ConservativeCheckpointHook.class);
+ //job1.setCheckpointHook(ConservativeCheckpointHook.class);
PregelixJob job2 = new PregelixJob(PageRankVertex.class.getName());
job2.setVertexClass(PageRankVertex.class);
@@ -66,7 +65,7 @@
job2.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileOutputFormat.setOutputPath(job2, new Path(OUTPUTPAH));
job2.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
- job2.setCheckpointHook(ConservativeCheckpointHook.class);
+ //job2.setCheckpointHook(ConservativeCheckpointHook.class);
jobs.add(job1);
jobs.add(job2);
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/OverflowAggregatorTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/OverflowAggregatorTest.java
new file mode 100644
index 0000000..474d0a6
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/OverflowAggregatorTest.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example;
+
+import java.io.File;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.aggregator.OverflowAggregator;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * @author yingyib
+ */
+public class OverflowAggregatorTest {
+
+ private static String INPUTPATH = "data/webmap";
+ private static String OUTPUTPAH = "actual/result";
+ private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal";
+
+ @Test
+ public void test() throws Exception {
+ TestCluster testCluster = new TestCluster();
+
+ try {
+ PregelixJob job = new PregelixJob(PageRankVertex.class.getName());
+ job.setVertexClass(PageRankVertex.class);
+ job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+ job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ FileInputFormat.setInputPaths(job, INPUTPATH);
+ FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job.setGlobalAggregatorClass(OverflowAggregator.class);
+
+ testCluster.setUp();
+ Driver driver = new Driver(PageRankVertex.class);
+ driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+
+ TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+ Text text = (Text) IterationUtils.readGlobalAggregateValue(job.getConfiguration(),
+ BspUtils.getJobId(job.getConfiguration()));
+ Assert.assertEquals(text.getLength(), 20 * 32767);
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ testCluster.tearDown();
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/aggregator/OverflowAggregator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/aggregator/OverflowAggregator.java
new file mode 100644
index 0000000..34b8b51
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/aggregator/OverflowAggregator.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.aggregator;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.example.io.DoubleWritable;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * Test the case where the global aggregate's state is bloated
+ *
+ * @author yingyib
+ */
+public class OverflowAggregator extends
+ GlobalAggregator<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable, Text, Text> {
+
+ private int textLength = 0;
+ private int inc = 32767;
+
+ @Override
+ public void init() {
+ textLength = 0;
+ }
+
+ @Override
+ public void step(Vertex<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> v)
+ throws HyracksDataException {
+ textLength += inc;
+ }
+
+ @Override
+ public void step(Text partialResult) {
+ textLength += partialResult.getLength();
+ }
+
+ @Override
+ public Text finishPartial() {
+ byte[] partialResult = new byte[textLength];
+ for (int i = 0; i < partialResult.length; i++) {
+ partialResult[i] = 'a';
+ }
+ Text text = new Text();
+ text.set(partialResult);
+ return text;
+ }
+
+ @Override
+ public Text finishFinal() {
+ byte[] result = new byte[textLength];
+ for (int i = 0; i < result.length; i++) {
+ result[i] = 'a';
+ }
+ Text text = new Text();
+ text.set(result);
+ return text;
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/TestCluster.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
similarity index 98%
rename from pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/TestCluster.java
rename to pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
index d0cf654..40ea690 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/TestCluster.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.pregelix.example.test;
+package edu.uci.ics.pregelix.example.util;
import java.io.BufferedReader;
import java.io.DataOutputStream;
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index ab564fa..f3a0bb4 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -237,8 +237,12 @@
Writable agg = aggregator.finishPartial();
agg.write(tbGlobalAggregate.getDataOutput());
tbGlobalAggregate.addFieldEndOffset();
- appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
- tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize());
+ if (!appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
+ tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize())) {
+ // aggregate state exceed the page size, write to HDFS
+ FrameTupleUtils.flushTupleToHDFS(tbGlobalAggregate, conf, Vertex.getSuperstep());
+ appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
+ }
FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate, writerGlobalAggregate);
} catch (IOException e) {
throw new HyracksDataException(e);
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index b4e1dd8..ca8ec01 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -179,7 +179,7 @@
vertex.setOutputWriters(writers);
vertex.setOutputAppenders(appenders);
vertex.setOutputTupleBuilders(tbs);
-
+
if (vertex.isHalted()) {
vertex.activate();
}
@@ -230,8 +230,12 @@
Writable agg = aggregator.finishPartial();
agg.write(tbGlobalAggregate.getDataOutput());
tbGlobalAggregate.addFieldEndOffset();
- appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
- tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize());
+ if (!appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
+ tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize())) {
+ // aggregate state exceed the page size, write to HDFS
+ FrameTupleUtils.flushTupleToHDFS(tbGlobalAggregate, conf, Vertex.getSuperstep());
+ appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
+ }
FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate, writerGlobalAggregate);
} catch (IOException e) {
throw new HyracksDataException(e);
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
index cd2012a..3dcdad2 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
@@ -40,7 +40,7 @@
@Override
public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
Configuration conf = confFactory.createConfiguration(ctx);
- IterationUtils.setProperties(jobId, ctx, conf);
+ IterationUtils.setProperties(jobId, ctx, conf, -1);
}
};
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RecoveryRuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RecoveryRuntimeHookFactory.java
new file mode 100644
index 0000000..35e7cd8
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RecoveryRuntimeHookFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHook;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+
+/**
+ * Recover the pregelix job state in a NC
+ *
+ * @author yingyib
+ */
+public class RecoveryRuntimeHookFactory implements IRuntimeHookFactory {
+ private static final long serialVersionUID = 1L;
+ private final int currentSuperStep;
+ private String jobId;
+ private IConfigurationFactory confFactory;
+
+ public RecoveryRuntimeHookFactory(String jobId, int currentSuperStep, IConfigurationFactory confFactory) {
+ this.currentSuperStep = currentSuperStep;
+ this.jobId = jobId;
+ this.confFactory = confFactory;
+ }
+
+ @Override
+ public IRuntimeHook createRuntimeHook() {
+ return new IRuntimeHook() {
+
+ @Override
+ public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
+ IterationUtils.endSuperStep(jobId, ctx);
+ Configuration conf = confFactory.createConfiguration(ctx);
+ IterationUtils.setProperties(jobId, ctx, conf, currentSuperStep);
+ }
+
+ };
+ }
+
+}