fix fault-tolerance and error reporting to handle disk failures
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java
index 6390abf..aab59c8 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java
@@ -17,6 +17,8 @@
public class HyracksDataException extends HyracksException {
private static final long serialVersionUID = 1L;
+ private String nodeId;
+
public HyracksDataException() {
}
@@ -24,11 +26,19 @@
super(message);
}
+ public HyracksDataException(Throwable cause) {
+ super(cause);
+ }
+
public HyracksDataException(String message, Throwable cause) {
super(message, cause);
}
- public HyracksDataException(Throwable cause) {
- super(cause);
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 2166620..bae0eb5 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -46,6 +46,7 @@
import edu.uci.ics.hyracks.control.cc.scheduler.ActivityPartitionDetails;
import edu.uci.ics.hyracks.control.cc.scheduler.JobScheduler;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.utils.ExceptionUtils;
public class JobRun implements IJobStatusConditionVariable {
private final DeploymentId deploymentId;
@@ -347,7 +348,7 @@
taskAttempt.put("end-time", ta.getEndTime());
List<Exception> exceptions = ta.getExceptions();
if (exceptions != null && !exceptions.isEmpty()) {
- List<Exception> filteredExceptions = ExceptionFilterUtils
+ List<Exception> filteredExceptions = ExceptionUtils
.getActualExceptions(exceptions);
for (Exception exception : filteredExceptions) {
StringWriter exceptionWriter = new StringWriter();
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ExceptionFilterUtils.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/utils/ExceptionUtils.java
similarity index 69%
rename from hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ExceptionFilterUtils.java
rename to hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/utils/ExceptionUtils.java
index 44f3bec..cbdc6e5 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ExceptionFilterUtils.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/utils/ExceptionUtils.java
@@ -12,15 +12,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.control.cc.job;
+package edu.uci.ics.hyracks.control.common.utils;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
/**
* @author yingyib
*/
-public class ExceptionFilterUtils {
+public class ExceptionUtils {
public static List<Exception> getActualExceptions(List<Exception> allExceptions) {
List<Exception> exceptions = new ArrayList<Exception>();
@@ -32,6 +35,17 @@
return exceptions;
}
+ public static void setNodeIds(Collection<Exception> exceptions, String nodeId) {
+ List<Exception> newExceptions = new ArrayList<Exception>();
+ for (Exception e : exceptions) {
+ HyracksDataException newException = new HyracksDataException(e);
+ newException.setNodeId(nodeId);
+ newExceptions.add(newException);
+ }
+ exceptions.clear();
+ exceptions.addAll(newExceptions);
+ }
+
private static boolean possibleRootCause(Throwable exception) {
Throwable cause = exception;
while ((cause = cause.getCause()) != null) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index fc64814..09eeab6 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -49,6 +49,7 @@
import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
import edu.uci.ics.hyracks.control.common.job.profiling.om.PartitionProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.control.common.utils.ExceptionUtils;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
@@ -278,6 +279,7 @@
}
if (!exceptions.isEmpty()) {
NodeControllerService ncs = joblet.getNodeController();
+ ExceptionUtils.setNodeIds(exceptions, ncs.getId());
ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, exceptions));
}
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
index 75553e1..85f80ac 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
import edu.uci.ics.hyracks.hdfs.api.INcCollectionBuilder;
/**
@@ -54,7 +55,20 @@
public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
scheduler = new edu.uci.ics.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos);
}
-
+
+ /**
+ * The constructor of the scheduler.
+ *
+ * @param ncNameToNcInfos
+ * the mapping from nc names to nc infos
+ * @param topology
+ * the hyracks cluster toplogy
+ * @throws HyracksException
+ */
+ public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, ClusterTopology topology) throws HyracksException {
+ scheduler = new edu.uci.ics.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos, topology);
+ }
+
/**
* The constructor of the scheduler.
*
@@ -62,7 +76,8 @@
* the mapping from nc names to nc infos
* @throws HyracksException
*/
- public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder builder) throws HyracksException {
+ public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder builder)
+ throws HyracksException {
scheduler = new edu.uci.ics.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos, builder);
}
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index a85a174..c14f23a 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -317,7 +317,11 @@
if (!releasedLatches) {
for (int i = 0; i < nodeFrontiers.size(); i++) {
- nodeFrontiers.get(i).page.releaseWriteLatch();
+ try {
+ nodeFrontiers.get(i).page.releaseWriteLatch();
+ } catch (Exception e) {
+ //ignore illegal monitor state exception
+ }
bufferCache.unpin(nodeFrontiers.get(i).page);
}
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 1e0d87a..a28d059 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -76,6 +76,8 @@
public static final String UPDATE_INTENSIVE = "pregelix.updateIntensive";
/** the check point hook */
public static final String CKP_CLASS = "pregelix.checkpointHook";
+ /** the check point hook */
+ public static final String RECOVERY_COUNT = "pregelix.recoveryCount";
/**
* Construct a Pregelix job from an existing configuration
@@ -222,6 +224,15 @@
final public void setCheckpointHook(Class<?> ckpClass) {
getConfiguration().setClass(CKP_CLASS, ckpClass, ICheckpointHook.class);
}
+
+ /**
+ * Users can provide an ICheckpointHook implementation to specify when to do checkpoint
+ *
+ * @param ckpClass
+ */
+ final public void setRecoveryCount(int recoveryCount) {
+ getConfiguration().setInt(RECOVERY_COUNT, recoveryCount);
+ }
@Override
public String toString() {
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index be419c2..e3950c8 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -583,4 +583,13 @@
public static String getSecondaryIndexCheckpointPath(Configuration conf, long lastSuperStep) {
return "/tmp/ckpoint/" + BspUtils.getJobId(conf) + "/secondaryindex/" + lastSuperStep;
}
+
+ /***
+ * Get the recovery count
+ *
+ * @return recovery count
+ */
+ public static int getRecoveryCount(Configuration conf) {
+ return conf.getInt(PregelixJob.RECOVERY_COUNT, 0);
+ }
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java
index 6a4a660..3455233 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java
@@ -25,7 +25,7 @@
@Override
public boolean checkpoint(int superstep) {
- if (superstep % 5 == 0) {
+ if (superstep % 100 == 0) {
return true;
} else {
return false;
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 61c3653..5530144 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -23,7 +23,9 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -90,6 +92,7 @@
this.profiling = profiling;
PregelixJob currentJob = jobs.get(0);
PregelixJob lastJob = currentJob;
+ addHadoopConfiguration(currentJob, ipAddress, port, true);
JobGen jobGen = null;
/** prepare job -- deploy jars */
@@ -108,9 +111,10 @@
for (int i = lastSnapshotJobIndex.get(); i < jobs.size(); i++) {
lastJob = currentJob;
currentJob = jobs.get(i);
+ currentJob.setRecoveryCount(retryCount);
/** add hadoop configurations */
- addHadoopConfiguration(currentJob, ipAddress, port);
+ addHadoopConfiguration(currentJob, ipAddress, port, failed);
ICheckpointHook ckpHook = BspUtils.createCheckpointHook(currentJob.getConfiguration());
/** load the data */
@@ -140,10 +144,10 @@
jobGen.clearCheckpoints();
hcc.unDeployBinary(deploymentId);
} catch (Exception e1) {
- /** disk failures */
- //restart from snapshot
- /** node failures */
- if (ExceptionUtilities.recoverable(e1)) {
+ Set<String> blackListNodes = new HashSet<String>();
+ /** disk failures or node failures */
+ if (ExceptionUtilities.recoverable(e1, blackListNodes)) {
+ ClusterConfig.addToBlackListNodes(blackListNodes);
failed = true;
retryCount++;
} else {
@@ -228,9 +232,9 @@
}
private DeploymentId prepareJobs(String ipAddress, int port) throws Exception {
- if (hcc == null)
+ if (hcc == null) {
hcc = new HyracksConnection(ipAddress, port);
-
+ }
URLClassLoader classLoader = (URLClassLoader) exampleClass.getClassLoader();
List<File> jars = new ArrayList<File>();
URL[] urls = classLoader.getURLs();
@@ -241,7 +245,8 @@
return deploymentId;
}
- private void addHadoopConfiguration(PregelixJob job, String ipAddress, int port) throws HyracksException {
+ private void addHadoopConfiguration(PregelixJob job, String ipAddress, int port, boolean loadClusterConfig)
+ throws HyracksException {
URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
if (hadoopCore != null) {
job.getConfiguration().addResource(hadoopCore);
@@ -254,7 +259,9 @@
if (hadoopHdfs != null) {
job.getConfiguration().addResource(hadoopHdfs);
}
- ClusterConfig.loadClusterConfig(ipAddress, port);
+ if (loadClusterConfig) {
+ ClusterConfig.loadClusterConfig(ipAddress, port);
+ }
}
private void runLoopBody(DeploymentId deploymentId, PregelixJob job, JobGen jobGen, int currentJobIndex,
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 91d7e22..4863378 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -36,9 +36,12 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -61,10 +64,7 @@
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
-import edu.uci.ics.hyracks.hdfs2.dataflow.ConfFactory;
import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSReadOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSWriteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
@@ -102,10 +102,10 @@
import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.KeyValueParserFactory;
-import edu.uci.ics.pregelix.dataflow.KeyValueWriterFactory;
import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.VertexFileScanOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.VertexFileWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
@@ -378,6 +378,7 @@
tmpJob.setOutputKeyClass(NullWritable.class);
tmpJob.setOutputValueClass(BspUtils.getVertexClass(tmpJob.getConfiguration()));
FileSystem dfs = FileSystem.get(tmpJob.getConfiguration());
+
dfs.delete(new Path(BspUtils.getVertexCheckpointPath(conf, lastSuccessfulIteration)), true);
JobSpecification vertexCkpSpec = scanIndexWriteToHDFS(tmpJob.getConfiguration());
@@ -601,7 +602,8 @@
*/
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
vertexIdClass.getName(), vertexClass.getName());
- HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, confFactory, inputRdFactory);
+ VertexFileWriteOperatorDescriptor writer = new VertexFileWriteOperatorDescriptor(spec, confFactory,
+ inputRdFactory);
ClusterConfig.setLocationConstraint(spec, writer);
/**
@@ -653,13 +655,15 @@
String checkpointPath = BspUtils.getMessageCheckpointPath(conf, lastSuccessfulIteration);;
PregelixJob tmpJob = createCloneJob("State checkpointing for job " + jobId, pregelixJob);
- tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
+ tmpJob.setOutputFormatClass(SequenceFileOutputFormat.class);
FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
tmpJob.setOutputKeyClass(vertexIdClass);
tmpJob.setOutputValueClass(MsgList.class);
- ITupleWriterFactory writerFactory = new KeyValueWriterFactory(new ConfFactory(tmpJob));
- HDFSWriteOperatorDescriptor hdfsWriter = new HDFSWriteOperatorDescriptor(spec, tmpJob, writerFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), MsgList.class.getName());
+ HDFSFileWriteOperatorDescriptor hdfsWriter = new HDFSFileWriteOperatorDescriptor(spec, tmpJob, inputRdFactory);
+ ClusterConfig.setLocationConstraint(spec, hdfsWriter);
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, materializeRead, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, hdfsWriter, 0);
@@ -673,7 +677,7 @@
throws HyracksException {
String checkpointPath = BspUtils.getMessageCheckpointPath(job.getConfiguration(), lastCheckpointedIteration);
PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
- tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
+ tmpJob.setInputFormatClass(SequenceFileInputFormat.class);
try {
FileInputFormat.setInputPaths(tmpJob, checkpointPath);
} catch (IOException e) {
@@ -686,10 +690,11 @@
/***
* HDFS read operator
*/
- VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
List<InputSplit> splits = new ArrayList<InputSplit>();
try {
- splits = inputFormat.getSplits(tmpJob, ClusterConfig.getLocationConstraint().length);
+ InputFormat inputFormat = org.apache.hadoop.util.ReflectionUtils.newInstance(job.getInputFormatClass(),
+ job.getConfiguration());
+ splits = inputFormat.getSplits(tmpJob);
LOGGER.info("number of splits: " + splits.size());
for (InputSplit split : splits)
LOGGER.info(split.toString());
@@ -703,6 +708,16 @@
readSchedule, new KeyValueParserFactory());
ClusterConfig.setLocationConstraint(spec, scanner);
+ /** construct the sort operator to sort message states */
+ int[] keyFields = new int[] { 0 };
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
+ IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+ sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastCheckpointedIteration,
+ WritableComparator.get(vertexIdClass).getClass());
+ ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
+ nkmFactory, sortCmpFactories, recordDescriptor);
+ ClusterConfig.setLocationConstraint(spec, sort);
+
/**
* construct the materializing write operator
*/
@@ -712,7 +727,7 @@
/** construct runtime hook */
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
- new RecoveryRuntimeHookFactory(jobId, lastCheckpointedIteration + 1, new ConfigurationFactory(
+ new RecoveryRuntimeHookFactory(jobId, lastCheckpointedIteration, new ConfigurationFactory(
pregelixJob.getConfiguration())));
ClusterConfig.setLocationConstraint(spec, postSuperStep);
@@ -725,7 +740,8 @@
*/
ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0,
- materialize, 0);
+ sort, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), sort, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
spec.setFrameSize(frameSize);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 7c1df0ff..9389f62 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -21,15 +21,17 @@
import java.util.List;
import java.util.logging.Logger;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -49,17 +51,12 @@
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs2.dataflow.ConfFactory;
import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSReadOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSWriteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.pregelix.api.graph.MsgList;
-import edu.uci.ics.pregelix.api.io.VertexInputFormat;
-import edu.uci.ics.pregelix.api.io.internal.InternalVertexInputFormat;
-import edu.uci.ics.pregelix.api.io.internal.InternalVertexOutputFormat;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.core.data.TypeTraits;
@@ -71,8 +68,8 @@
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.FinalAggregateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.KeyValueParserFactory;
-import edu.uci.ics.pregelix.dataflow.KeyValueWriterFactory;
import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
@@ -526,14 +523,10 @@
/** generate plan specific state checkpointing */
protected JobSpecification[] generateStateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
JobSpecification[] msgCkpSpecs = super.generateStateCheckpointing(lastSuccessfulIteration);
- PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
- tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
/** generate secondary index checkpoint */
- String checkpointPath = BspUtils.getSecondaryIndexCheckpointPath(conf, lastSuccessfulIteration);
- FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
- tmpJob.setOutputKeyClass(BspUtils.getVertexIndexClass(tmpJob.getConfiguration()));
- tmpJob.setOutputValueClass(MsgList.class);
+ PregelixJob tmpJob = this.createCloneJob("Secondary index checkpointing for job " + jobId, pregelixJob);
+
JobSpecification secondaryBTreeCkp = generateSecondaryBTreeCheckpoint(lastSuccessfulIteration, tmpJob);
JobSpecification[] specs = new JobSpecification[msgCkpSpecs.length + 1];
@@ -550,11 +543,12 @@
@Override
protected JobSpecification[] generateStateCheckpointLoading(int lastSuccessfulIteration, PregelixJob job)
throws HyracksException {
- JobSpecification[] msgCkpSpecs = generateStateCheckpointLoading(lastSuccessfulIteration, job);
- PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
- tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
+ /** generate message checkpoint load */
+ JobSpecification[] msgCkpSpecs = super.generateStateCheckpointLoading(lastSuccessfulIteration, job);
/** generate secondary index checkpoint load */
+ PregelixJob tmpJob = this.createCloneJob("Secondary index checkpoint loading for job " + jobId, pregelixJob);
+ tmpJob.setOutputFormatClass(SequenceFileOutputFormat.class);
JobSpecification secondaryBTreeCkpLoad = generateSecondaryBTreeCheckpointLoad(lastSuccessfulIteration, tmpJob);
JobSpecification[] specs = new JobSpecification[msgCkpSpecs.length + 1];
for (int i = 0; i < msgCkpSpecs.length; i++) {
@@ -572,21 +566,21 @@
String checkpointPath = BspUtils.getSecondaryIndexCheckpointPath(conf, lastSuccessfulIteration);
PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
- tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
+ tmpJob.setInputFormatClass(SequenceFileInputFormat.class);
try {
FileInputFormat.setInputPaths(tmpJob, checkpointPath);
} catch (IOException e) {
throw new HyracksException(e);
}
- Configuration conf = job.getConfiguration();
/***
- * construct HDFS read operator
+ * HDFS read operator
*/
- VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
List<InputSplit> splits = new ArrayList<InputSplit>();
try {
- splits = inputFormat.getSplits(tmpJob, ClusterConfig.getLocationConstraint().length);
+ InputFormat inputFormat = org.apache.hadoop.util.ReflectionUtils.newInstance(job.getInputFormatClass(),
+ job.getConfiguration());
+ splits = inputFormat.getSplits(tmpJob);
LOGGER.info("number of splits: " + splits.size());
for (InputSplit split : splits)
LOGGER.info(split.toString());
@@ -600,6 +594,16 @@
readSchedule, new KeyValueParserFactory());
ClusterConfig.setLocationConstraint(spec, scanner);
+ /** construct the sort operator to sort message states */
+ int[] keyFields = new int[] { 0 };
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
+ IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+ sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration,
+ WritableComparator.get(vertexIdClass).getClass());
+ ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
+ nkmFactory, sortCmpFactories, recordDescriptor);
+ ClusterConfig.setLocationConstraint(spec, sort);
+
/**
* construct bulk-load index operator
*/
@@ -622,8 +626,8 @@
* connect operator descriptors
*/
ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0,
- btreeBulkLoad, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sort, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), sort, 0, btreeBulkLoad, 0);
spec.setFrameSize(frameSize);
return spec;
@@ -632,6 +636,12 @@
@SuppressWarnings({ "rawtypes", "unchecked" })
private JobSpecification generateSecondaryBTreeCheckpoint(int lastSuccessfulIteration, PregelixJob job)
throws HyracksException {
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ String checkpointPath = BspUtils.getSecondaryIndexCheckpointPath(conf, lastSuccessfulIteration);
+ FileOutputFormat.setOutputPath(job, new Path(checkpointPath));
+ job.setOutputKeyClass(BspUtils.getVertexIndexClass(job.getConfiguration()));
+ job.setOutputValueClass(MsgList.class);
+
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
Class<? extends Writable> msgListClass = MsgList.class;
String readFile = lastSuccessfulIteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
@@ -655,7 +665,6 @@
/**
* construct btree search operator
*/
- ConfFactory confFactory = new ConfFactory(job);
RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), msgListClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
@@ -675,8 +684,9 @@
/**
* construct write file operator
*/
- HDFSWriteOperatorDescriptor writer = new HDFSWriteOperatorDescriptor(spec, job, new KeyValueWriterFactory(
- confFactory));
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), MsgList.class.getName());
+ HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, job, inputRdFactory);
ClusterConfig.setLocationConstraint(spec, writer);
/**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
index ea6cc8a..89fbdcd 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -20,12 +20,15 @@
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
+import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -52,6 +55,8 @@
private static Map<String, List<String>> ipToNcMapping;
private static String[] stores;
private static Scheduler hdfsScheduler;
+ private static Set<String> blackListNodes = new HashSet<String>();
+ private static IHyracksClientConnection hcc;
/**
* let tests set config path to be whatever
@@ -197,9 +202,19 @@
public static void loadClusterConfig(String ipAddress, int port) throws HyracksException {
try {
- IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
+ if (hcc == null) {
+ hcc = new HyracksConnection(ipAddress, port);
+ }
Map<String, NodeControllerInfo> ncNameToNcInfos = new TreeMap<String, NodeControllerInfo>();
ncNameToNcInfos.putAll(hcc.getNodeControllerInfos());
+
+ /**
+ * remove black list nodes -- which had disk failures
+ */
+ for (String blackListNode : blackListNodes) {
+ ncNameToNcInfos.remove(blackListNode);
+ }
+
NCs = new String[ncNameToNcInfos.size()];
ipToNcMapping = new HashMap<String, List<String>>();
int i = 0;
@@ -216,7 +231,7 @@
i++;
}
- hdfsScheduler = new Scheduler(ipAddress, port);
+ hdfsScheduler = new Scheduler(hcc.getNodeControllerInfos(), hcc.getClusterTopology());
} catch (Exception e) {
throw new IllegalStateException(e);
}
@@ -240,4 +255,8 @@
}
return locations;
}
+
+ public static void addToBlackListNodes(Collection<String> nodes) {
+ blackListNodes.addAll(nodes);
+ }
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/ExceptionUtilities.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/ExceptionUtilities.java
index 4b1bf94..a4c4501 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/ExceptionUtilities.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/ExceptionUtilities.java
@@ -14,6 +14,11 @@
*/
package edu.uci.ics.pregelix.core.util;
+import java.io.IOException;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
/**
* The util to analysis exceptions
*
@@ -27,8 +32,12 @@
* @param exception
* @return true or false
*/
- public static boolean recoverable(Exception exception) {
+ public static boolean recoverable(Exception exception, Set<String> blackListNodes) {
String message = exception.getMessage();
+
+ /***
+ * check interrupted exception
+ */
if (exception instanceof InterruptedException || (message.contains("Node") && message.contains("not live"))
|| message.contains("Failure occurred on input")) {
return true;
@@ -39,6 +48,41 @@
return true;
}
}
+
+ /***
+ * check io exception
+ */
+ cause = exception;
+ String blackListNode = null;
+ if (cause instanceof HyracksDataException) {
+ blackListNode = ((HyracksDataException) cause).getNodeId();
+ }
+ while ((cause = cause.getCause()) != null) {
+ if (cause instanceof IOException) {
+ if (containsIOManager(cause)) {
+ if (blackListNode != null) {
+ blackListNodes.add(blackListNode);
+ }
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Check if the exception traces contains the IOManager, which means there are disk failures
+ *
+ * @param cause
+ * @return true if IOManager is in the trace; false otherwise.
+ */
+ private static boolean containsIOManager(Throwable cause) {
+ StackTraceElement[] traces = cause.getStackTrace();
+ for (StackTraceElement e : traces) {
+ if (e.getClassName().endsWith("IOManager")) {
+ return true;
+ }
+ }
return false;
}
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index 76a9562..98be8cd 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -63,7 +63,7 @@
ccConfig.profileDumpPeriod = -1;
ccConfig.heartbeatPeriod = 1000;
ccConfig.maxHeartbeatLapsePeriods = 15;
-
+
// cluster controller
cc = new ClusterControllerService(ccConfig);
cc.start();
@@ -98,14 +98,22 @@
ClusterConfig.loadClusterConfig(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
}
- public static void showDownNC1() throws Exception {
+ public static void startNC1() throws Exception {
+ nc1.start();
+ }
+
+ public static void shutdownNC1() throws Exception {
nc1.stop();
}
- public static void showDownNC2() throws Exception {
+ public static void shutdownNC2() throws Exception {
nc2.stop();
}
+ public static void shutdownCC() throws Exception {
+ cc.stop();
+ }
+
public static void deinit() throws Exception {
nc2.stop();
nc1.stop();
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
index 1f410c7..c985f64 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
@@ -103,9 +103,8 @@
try {
bulkLoader.end();
} catch (IndexException e) {
- throw new HyracksDataException(e);
- } finally {
treeIndexOpHelper.close();
- }
+ throw new HyracksDataException(e);
+ }
}
}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index d3a9890..a1177c8 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -19,42 +19,45 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializer;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.hdfs.ContextFactory;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
-import edu.uci.ics.pregelix.api.io.VertexWriter;
-import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.hyracks.hdfs2.dataflow.ConfFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
public class HDFSFileWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
- private final IConfigurationFactory confFactory;
+ private final ConfFactory confFactory;
private final IRecordDescriptorFactory inputRdFactory;
- public HDFSFileWriteOperatorDescriptor(JobSpecification spec, IConfigurationFactory confFactory,
- IRecordDescriptorFactory inputRdFactory) {
+ public HDFSFileWriteOperatorDescriptor(JobSpecification spec, Job conf, IRecordDescriptorFactory inputRdFactory)
+ throws HyracksException {
super(spec, 1, 0);
- this.confFactory = confFactory;
- this.inputRdFactory = inputRdFactory;
+ try {
+ this.confFactory = new ConfFactory(conf);
+ this.inputRdFactory = inputRdFactory;
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
}
@SuppressWarnings("rawtypes")
@@ -65,12 +68,12 @@
return new AbstractUnaryInputSinkOperatorNodePushable() {
private RecordDescriptor rd0;
private FrameDeserializer frameDeserializer;
- private Configuration conf;
- private VertexWriter vertexWriter;
+ private Job job;
+ private RecordWriter recordWriter;
private TaskAttemptContext context;
+ private ContextFactory ctxFactory = new ContextFactory();
private String TEMP_DIR = "_temporary";
private ClassLoader ctxCL;
- private ContextFactory ctxFactory = new ContextFactory();
@Override
public void open() throws HyracksDataException {
@@ -79,16 +82,16 @@
frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
ctxCL = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
- conf = confFactory.createConfiguration(ctx);
-
- VertexOutputFormat outputFormat = BspUtils.createVertexOutputFormat(conf);
- context = ctxFactory.createContext(conf, partition);
- context.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
+ job = confFactory.getConf();
try {
- vertexWriter = outputFormat.createVertexWriter(context);
+ OutputFormat outputFormat = ReflectionUtils.newInstance(job.getOutputFormatClass(),
+ job.getConfiguration());
+ context = ctxFactory.createContext(job.getConfiguration(), partition);
+ context.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
+ recordWriter = outputFormat.getRecordWriter(context);
} catch (InterruptedException e) {
throw new HyracksDataException(e);
- } catch (IOException e) {
+ } catch (Exception e) {
throw new HyracksDataException(e);
}
}
@@ -100,8 +103,9 @@
try {
while (!frameDeserializer.done()) {
Object[] tuple = frameDeserializer.deserializeRecord();
- Vertex value = (Vertex) tuple[1];
- vertexWriter.writeVertex(value);
+ Object key = tuple[0];
+ Object value = tuple[1];
+ recordWriter.write(key, value);
}
} catch (InterruptedException e) {
throw new HyracksDataException(e);
@@ -118,7 +122,7 @@
@Override
public void close() throws HyracksDataException {
try {
- vertexWriter.close(context);
+ recordWriter.close(context);
moveFilesToFinalPath();
} catch (InterruptedException e) {
throw new HyracksDataException(e);
@@ -129,9 +133,8 @@
private void moveFilesToFinalPath() throws HyracksDataException {
try {
- JobContext job = ctxFactory.createJobContext(conf);
Path outputPath = FileOutputFormat.getOutputPath(job);
- FileSystem dfs = FileSystem.get(conf);
+ FileSystem dfs = FileSystem.get(job.getConfiguration());
Path filePath = new Path(outputPath, "part-" + new Integer(partition).toString());
FileStatus[] results = findPartitionPaths(outputPath, dfs);
if (results.length >= 1) {
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileWriteOperatorDescriptor.java
new file mode 100644
index 0000000..f3ec40e
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileWriteOperatorDescriptor.java
@@ -0,0 +1,192 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+
+public class VertexFileWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ private final IConfigurationFactory confFactory;
+ private final IRecordDescriptorFactory inputRdFactory;
+
+ public VertexFileWriteOperatorDescriptor(JobSpecification spec, IConfigurationFactory confFactory,
+ IRecordDescriptorFactory inputRdFactory) {
+ super(spec, 1, 0);
+ this.confFactory = confFactory;
+ this.inputRdFactory = inputRdFactory;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+ throws HyracksDataException {
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
+ private RecordDescriptor rd0;
+ private FrameDeserializer frameDeserializer;
+ private Configuration conf;
+ private VertexWriter vertexWriter;
+ private TaskAttemptContext context;
+ private String TEMP_DIR = "_temporary";
+ private ClassLoader ctxCL;
+ private ContextFactory ctxFactory = new ContextFactory();
+
+ @Override
+ public void open() throws HyracksDataException {
+ rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
+ : inputRdFactory.createRecordDescriptor(ctx);
+ frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+ ctxCL = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ conf = confFactory.createConfiguration(ctx);
+
+ VertexOutputFormat outputFormat = BspUtils.createVertexOutputFormat(conf);
+ context = ctxFactory.createContext(conf, partition);
+ context.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
+ try {
+ vertexWriter = outputFormat.createVertexWriter(context);
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+ frameDeserializer.reset(frame);
+ try {
+ while (!frameDeserializer.done()) {
+ Object[] tuple = frameDeserializer.deserializeRecord();
+ Vertex value = (Vertex) tuple[1];
+ vertexWriter.writeVertex(value);
+ }
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ vertexWriter.close(context);
+ moveFilesToFinalPath();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void moveFilesToFinalPath() throws HyracksDataException {
+ try {
+ JobContext job = ctxFactory.createJobContext(conf);
+ Path outputPath = FileOutputFormat.getOutputPath(job);
+ FileSystem dfs = FileSystem.get(conf);
+ Path filePath = new Path(outputPath, "part-" + new Integer(partition).toString());
+ FileStatus[] results = findPartitionPaths(outputPath, dfs);
+ if (results.length >= 1) {
+ /**
+ * for Hadoop-0.20.2
+ */
+ renameFile(dfs, filePath, results);
+ } else {
+ /**
+ * for Hadoop-0.23.1
+ */
+ int jobId = job.getJobID().getId();
+ outputPath = new Path(outputPath.toString() + File.separator + TEMP_DIR + File.separator
+ + jobId);
+ results = findPartitionPaths(outputPath, dfs);
+ renameFile(dfs, filePath, results);
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ }
+
+ private FileStatus[] findPartitionPaths(Path outputPath, FileSystem dfs) throws FileNotFoundException,
+ IOException {
+ FileStatus[] tempPaths = dfs.listStatus(outputPath, new PathFilter() {
+ @Override
+ public boolean accept(Path dir) {
+ return dir.getName().endsWith(TEMP_DIR) && dir.getName().indexOf(".crc") < 0;
+ }
+ });
+ Path tempDir = tempPaths[0].getPath();
+ FileStatus[] results = dfs.listStatus(tempDir, new PathFilter() {
+ @Override
+ public boolean accept(Path dir) {
+ return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0
+ && dir.getName().indexOf(".crc") < 0;
+ }
+ });
+ return results;
+ }
+
+ private void renameFile(FileSystem dfs, Path filePath, FileStatus[] results) throws IOException,
+ HyracksDataException, FileNotFoundException {
+ Path srcDir = results[0].getPath();
+ if (!dfs.exists(srcDir))
+ throw new HyracksDataException("file " + srcDir.toString() + " does not exist!");
+
+ FileStatus[] srcFiles = dfs.listStatus(srcDir);
+ Path srcFile = srcFiles[0].getPath();
+ dfs.delete(filePath, true);
+ dfs.rename(srcFile, filePath);
+ }
+
+ };
+ }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index bfe89ab..f3f7513 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -147,11 +147,15 @@
return (RuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
}
- public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges, int currentIteration) {
+ public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges, long currentIteration) {
Boolean toMove = jobIdToMove.get(jobId);
if (toMove == null || toMove == true) {
if (jobIdToSuperStep.get(jobId) == null) {
- jobIdToSuperStep.put(jobId, 0L);
+ if (currentIteration <= 0) {
+ jobIdToSuperStep.put(jobId, 0L);
+ } else {
+ jobIdToSuperStep.put(jobId, currentIteration);
+ }
}
long superStep = jobIdToSuperStep.get(jobId);
@@ -175,6 +179,35 @@
System.gc();
}
+ public synchronized void recoverVertexProperties(String jobId, long numVertices, long numEdges,
+ long currentIteration) {
+ if (jobIdToSuperStep.get(jobId) == null) {
+ if (currentIteration <= 0) {
+ jobIdToSuperStep.put(jobId, 0L);
+ } else {
+ jobIdToSuperStep.put(jobId, currentIteration);
+ }
+ }
+
+ long superStep = jobIdToSuperStep.get(jobId);
+ List<FileReference> files = iterationToFiles.remove(superStep - 1);
+ if (files != null) {
+ for (FileReference fileRef : files)
+ fileRef.delete();
+ }
+
+ if (currentIteration > 0) {
+ Vertex.setSuperstep(currentIteration);
+ } else {
+ Vertex.setSuperstep(++superStep);
+ }
+ Vertex.setNumVertices(numVertices);
+ Vertex.setNumEdges(numEdges);
+ jobIdToSuperStep.put(jobId, superStep);
+ jobIdToMove.put(jobId, true);
+ LOGGER.info("recovered iteration " + Vertex.getSuperstep());
+ }
+
public synchronized void endSuperStep(String pregelixJobId) {
jobIdToMove.put(pregelixJobId, true);
LOGGER.info("end iteration " + Vertex.getSuperstep());
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index 1cf81ac..02097bf 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -75,13 +75,21 @@
context.endSuperStep(giraphJobId);
}
- public static void setProperties(String jobId, IHyracksTaskContext ctx, Configuration conf, int currentIteration) {
+ public static void setProperties(String jobId, IHyracksTaskContext ctx, Configuration conf, long currentIteration) {
INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
context.setVertexProperties(jobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
conf.getLong(PregelixJob.NUM_EDGES, -1), currentIteration);
}
+ public static void recoverProperties(String jobId, IHyracksTaskContext ctx, Configuration conf,
+ long currentIteration) {
+ INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
+ RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
+ context.recoverVertexProperties(jobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
+ conf.getLong(PregelixJob.NUM_EDGES, -1), currentIteration);
+ }
+
public static void writeTerminationState(Configuration conf, String jobId, boolean terminate)
throws HyracksDataException {
try {
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryConnectedComponentsTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryConnectedComponentsTest.java
new file mode 100644
index 0000000..efc7bcc
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryConnectedComponentsTest.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
+import edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.ConnectedComponentsVertex.SimpleConnectedComponentsVertexOutputFormat;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * @author yingyib
+ */
+public class FailureRecoveryConnectedComponentsTest {
+ private static String INPUTPATH = "data/webmapcomplex";
+ private static String OUTPUTPAH = "actual/result";
+ private static String EXPECTEDPATH = "src/test/resources/expected/ConnectedComponentsRealComplex2";
+
+ @Test
+ public void test() throws Exception {
+ TestCluster testCluster = new TestCluster();
+ try {
+ PregelixJob job = new PregelixJob(ConnectedComponentsVertex.class.getName());
+ job.setVertexClass(ConnectedComponentsVertex.class);
+ job.setVertexClass(ConnectedComponentsVertex.class);
+ job.setVertexInputFormatClass(TextConnectedComponentsInputFormat.class);
+ job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
+ job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setVertexPartitionerClass(DefaultVertexPartitioner.class);
+ job.setDynamicVertexValueSize(true);
+ FileInputFormat.setInputPaths(job, INPUTPATH);
+ FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
+ job.setCheckpointHook(ConservativeCheckpointHook.class);
+
+ testCluster.setUp();
+ Driver driver = new Driver(PageRankVertex.class);
+ Thread thread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ synchronized (this) {
+ while (Vertex.getSuperstep() <= 5) {
+ this.wait(200);
+ }
+ PregelixHyracksIntegrationUtil.shutdownNC1();
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ });
+ thread.start();
+ driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+
+ TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+ } catch (Exception e) {
+ PregelixHyracksIntegrationUtil.shutdownNC2();
+ testCluster.cleanupHDFS();
+ throw e;
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryInnerJoinTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryInnerJoinTest.java
new file mode 100644
index 0000000..ff1e29f
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryInnerJoinTest.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
+import edu.uci.ics.pregelix.core.base.IDriver.Plan;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * @author yingyib
+ */
+public class FailureRecoveryInnerJoinTest {
+ private static String INPUTPATH = "data/webmap";
+ private static String OUTPUTPAH = "actual/result";
+ private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal2";
+
+ @Test
+ public void test() throws Exception {
+ TestCluster testCluster = new TestCluster();
+
+ try {
+ PregelixJob job = new PregelixJob(PageRankVertex.class.getName());
+ job.setVertexClass(PageRankVertex.class);
+ job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+ job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ FileInputFormat.setInputPaths(job, INPUTPATH);
+ FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job.setCheckpointHook(ConservativeCheckpointHook.class);
+
+ testCluster.setUp();
+ Driver driver = new Driver(PageRankVertex.class);
+ Thread thread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ synchronized (this) {
+ while (Vertex.getSuperstep() <= 5) {
+ this.wait(200);
+ }
+ PregelixHyracksIntegrationUtil.shutdownNC1();
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ });
+ thread.start();
+ driver.runJob(job, Plan.INNER_JOIN, "127.0.0.1",
+ PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT, false);
+
+ TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+ } catch (Exception e) {
+ PregelixHyracksIntegrationUtil.shutdownNC2();
+ testCluster.cleanupHDFS();
+ throw e;
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
index 5294ace..3fdaf15 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
@@ -21,6 +21,7 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
+import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
import edu.uci.ics.pregelix.core.driver.Driver;
@@ -63,8 +64,10 @@
public void run() {
try {
synchronized (this) {
- this.wait(15000);
- PregelixHyracksIntegrationUtil.showDownNC1();
+ while (Vertex.getSuperstep() <= 5) {
+ this.wait(200);
+ }
+ PregelixHyracksIntegrationUtil.shutdownNC1();
}
} catch (Exception e) {
throw new IllegalStateException(e);
@@ -76,7 +79,7 @@
TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
} catch (Exception e) {
- PregelixHyracksIntegrationUtil.showDownNC2();
+ PregelixHyracksIntegrationUtil.shutdownNC2();
testCluster.cleanupHDFS();
throw e;
}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryWithoutCheckpointTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryWithoutCheckpointTest.java
new file mode 100644
index 0000000..e006ccd
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryWithoutCheckpointTest.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * @author yingyib
+ */
+public class FailureRecoveryWithoutCheckpointTest {
+ private static String INPUTPATH = "data/webmap";
+ private static String OUTPUTPAH = "actual/result";
+ private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal2";
+
+ @Test
+ public void test() throws Exception {
+ TestCluster testCluster = new TestCluster();
+
+ try {
+ PregelixJob job = new PregelixJob(PageRankVertex.class.getName());
+ job.setVertexClass(PageRankVertex.class);
+ job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+ job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ FileInputFormat.setInputPaths(job, INPUTPATH);
+ FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+
+ testCluster.setUp();
+ Driver driver = new Driver(PageRankVertex.class);
+ Thread thread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ synchronized (this) {
+ while (Vertex.getSuperstep() <= 5) {
+ this.wait(200);
+ }
+ PregelixHyracksIntegrationUtil.shutdownNC1();
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ });
+ thread.start();
+ driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+
+ TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+ } catch (Exception e) {
+ PregelixHyracksIntegrationUtil.shutdownNC2();
+ testCluster.cleanupHDFS();
+ throw e;
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/FailureVertex.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureVertex.java
similarity index 100%
rename from pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/FailureVertex.java
rename to pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureVertex.java
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-0 b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-0
new file mode 100755
index 0000000..2c975de
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-0
@@ -0,0 +1,10 @@
+0 0
+2 0
+4 0
+6 0
+8 0
+10 0
+12 0
+14 0
+16 0
+18 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-1 b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-1
new file mode 100755
index 0000000..6976bc1
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-1
@@ -0,0 +1,13 @@
+1 0
+3 0
+5 0
+7 0
+9 0
+11 0
+13 0
+15 0
+17 0
+19 0
+21 21
+25 25
+27 27
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankReal2/part-0 b/pregelix/pregelix-example/src/test/resources/expected/PageRankReal2/part-0
new file mode 100755
index 0000000..d135b86
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRankReal2/part-0
@@ -0,0 +1,10 @@
+0 0.008290140026154316
+2 0.14646839195826472
+4 0.03976979906329426
+6 0.015736276824953852
+8 0.010628239626209894
+10 0.008290140026154316
+12 0.14646839195826472
+14 0.03976979906329426
+16 0.015736276824953852
+18 0.010628239626209894
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankReal2/part-1 b/pregelix/pregelix-example/src/test/resources/expected/PageRankReal2/part-1
new file mode 100755
index 0000000..d3badee
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRankReal2/part-1
@@ -0,0 +1,10 @@
+1 0.15351528192471647
+3 0.08125113985998214
+5 0.0225041581462058
+7 0.012542224114863661
+9 0.009294348455354817
+11 0.15351528192471647
+13 0.08125113985998214
+15 0.0225041581462058
+17 0.012542224114863661
+19 0.009294348455354817
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RecoveryRuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RecoveryRuntimeHookFactory.java
index 35e7cd8..4720272 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RecoveryRuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RecoveryRuntimeHookFactory.java
@@ -30,11 +30,11 @@
*/
public class RecoveryRuntimeHookFactory implements IRuntimeHookFactory {
private static final long serialVersionUID = 1L;
- private final int currentSuperStep;
+ private final long currentSuperStep;
private String jobId;
private IConfigurationFactory confFactory;
- public RecoveryRuntimeHookFactory(String jobId, int currentSuperStep, IConfigurationFactory confFactory) {
+ public RecoveryRuntimeHookFactory(String jobId, long currentSuperStep, IConfigurationFactory confFactory) {
this.currentSuperStep = currentSuperStep;
this.jobId = jobId;
this.confFactory = confFactory;
@@ -48,7 +48,7 @@
public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
IterationUtils.endSuperStep(jobId, ctx);
Configuration conf = confFactory.createConfiguration(ctx);
- IterationUtils.setProperties(jobId, ctx, conf, currentSuperStep);
+ IterationUtils.recoverProperties(jobId, ctx, conf, currentSuperStep);
}
};