make failure-recovery work in multi-JVM test
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
index 43ca4ac..bc47360 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -112,7 +112,7 @@
public void initialize() throws HyracksDataException {
ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
try {
- Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ Thread.currentThread().setContextClassLoader(ctx.getJobletContext().getClassLoader());
Job job = confFactory.getConf();
job.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
@@ -141,6 +141,7 @@
* read the split
*/
TaskAttemptContext context = ctxFactory.createContext(job.getConfiguration(), i);
+ context.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
RecordReader reader = inputFormat.createRecordReader(inputSplits.get(i), context);
reader.initialize(inputSplits.get(i), context);
while (reader.nextKeyValue() == true) {
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index a28d059..6549c52 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -78,6 +78,8 @@
public static final String CKP_CLASS = "pregelix.checkpointHook";
/** the check point hook */
public static final String RECOVERY_COUNT = "pregelix.recoveryCount";
+ /** the checkpoint interval */
+ public static final String CKP_INTERVAL = "pregelix.ckpinterval";
/**
* Construct a Pregelix job from an existing configuration
@@ -224,7 +226,7 @@
final public void setCheckpointHook(Class<?> ckpClass) {
getConfiguration().setClass(CKP_CLASS, ckpClass, ICheckpointHook.class);
}
-
+
/**
* Users can provide an ICheckpointHook implementation to specify when to do checkpoint
*
@@ -234,6 +236,15 @@
getConfiguration().setInt(RECOVERY_COUNT, recoveryCount);
}
+ /**
+ * Users can set the interval of checkpointing
+ *
+ * @param ckpInterval
+ */
+ final public void setCheckpointingInterval(int ckpInterval) {
+ getConfiguration().setInt(CKP_INTERVAL, ckpInterval);
+ }
+
@Override
public String toString() {
return getJobName();
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index e3950c8..4ee1deb 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -592,4 +592,14 @@
public static int getRecoveryCount(Configuration conf) {
return conf.getInt(PregelixJob.RECOVERY_COUNT, 0);
}
+
+ /***
+ * Get the user-set checkpoint interval
+ *
+ * @param conf
+ * @return the checkpoint interval
+ */
+ public static int getCheckpointingInterval(Configuration conf) {
+ return conf.getInt(PregelixJob.CKP_INTERVAL, -1);
+ }
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 5530144..d6a6f3d 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -278,6 +278,7 @@
}
}
int i = doRecovery ? snapshotSuperstep.get() + 1 : 1;
+ int ckpInterval = BspUtils.getCheckpointingInterval(job.getConfiguration());
boolean terminate = false;
long start, end, time;
do {
@@ -288,7 +289,7 @@
LOG.info(job + ": iteration " + i + " finished " + time + "ms");
terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId())
|| IterationUtils.readForceTerminationState(job.getConfiguration(), jobGen.getJobId());
- if (ckpHook.checkpoint(i)) {
+ if (ckpHook.checkpoint(i) || (ckpInterval > 0 && i % ckpInterval == 0)) {
runCheckpoint(deploymentId, jobGen, i);
snapshotJobIndex.set(currentJobIndex);
snapshotSuperstep.set(i);
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh b/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh
index 2a6cab2..d7a0ead 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh
@@ -58,8 +58,8 @@
if [ -f "conf/topology.xml" ]; then
#Launch hyracks cc script with topology
-${PREGELIX_HOME}/bin/pregelixcc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 -cluster-topology "conf/topology.xml" &> $CCLOGS_DIR/cc.log &
+${PREGELIX_HOME}/bin/pregelixcc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -heartbeat-period 5000 -max-heartbeat-lapse-periods 4 -default-max-job-attempts 0 -job-history-size 0 -cluster-topology "conf/topology.xml" &> $CCLOGS_DIR/cc.log &
else
#Launch hyracks cc script without toplogy
-${PREGELIX_HOME}/bin/pregelixcc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &
+${PREGELIX_HOME}/bin/pregelixcc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -heartbeat-period 5000 -max-heartbeat-lapse-periods 4 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &
fi
diff --git a/pregelix/pregelix-dist/src/main/resources/scripts/startcc.sh b/pregelix/pregelix-dist/src/main/resources/scripts/startcc.sh
index 2a6cab2..d7a0ead 100644
--- a/pregelix/pregelix-dist/src/main/resources/scripts/startcc.sh
+++ b/pregelix/pregelix-dist/src/main/resources/scripts/startcc.sh
@@ -58,8 +58,8 @@
if [ -f "conf/topology.xml" ]; then
#Launch hyracks cc script with topology
-${PREGELIX_HOME}/bin/pregelixcc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 -cluster-topology "conf/topology.xml" &> $CCLOGS_DIR/cc.log &
+${PREGELIX_HOME}/bin/pregelixcc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -heartbeat-period 5000 -max-heartbeat-lapse-periods 4 -default-max-job-attempts 0 -job-history-size 0 -cluster-topology "conf/topology.xml" &> $CCLOGS_DIR/cc.log &
else
#Launch hyracks cc script without toplogy
-${PREGELIX_HOME}/bin/pregelixcc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &
+${PREGELIX_HOME}/bin/pregelixcc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -heartbeat-period 5000 -max-heartbeat-lapse-periods 4 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &
fi
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
index f99321a..393c8c9 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
@@ -45,7 +45,7 @@
public String ipAddress;
@Option(name = "-port", usage = "port of cluster controller", required = false)
- public int port;
+ public int port = 3099;
@Option(name = "-plan", usage = "query plan choice", required = false)
public Plan planChoice = Plan.OUTER_JOIN;
@@ -67,6 +67,9 @@
@Option(name = "-runtime-profiling", usage = "whether to do runtime profifling", required = false)
public String profiling = "false";
+
+ @Option(name = "-ckp-interval", usage = "checkpointing interval -- for fault-tolerance", required = false)
+ public int ckpInterval = -1;
}
public static void run(String[] args, PregelixJob job) throws Exception {
@@ -125,6 +128,7 @@
job.getConfiguration().setLong(ReachabilityVertex.DEST_ID, options.destId);
if (options.numIteration > 0)
job.getConfiguration().setLong(PageRankVertex.ITERATIONS, options.numIteration);
+ job.setCheckpointingInterval(options.ckpInterval);
}
}