support large-size global aggreate values
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 51a9ce3..d68ad2c 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -538,4 +538,15 @@
public static boolean useLSM(Configuration conf) {
return conf.getBoolean(PregelixJob.UPDATE_INTENSIVE, false);
}
+
+ /***
+ * Get the spilling dir name for global aggregates
+ *
+ * @param conf
+ * @param superStep
+ * @return the spilling dir name
+ */
+ public static String getGlobalAggregateSpillingDirName(Configuration conf, long superStep) {
+ return "/tmp/pregelix/agg/" + conf.get(PregelixJob.JOB_ID) + "/" + superStep;
+ }
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java
index 24105ae..a0f67e3 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java
@@ -15,6 +15,14 @@
package edu.uci.ics.pregelix.api.util;
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -41,4 +49,19 @@
}
}
+ public static void flushTupleToHDFS(ArrayTupleBuilder atb, Configuration conf, long superStep)
+ throws HyracksDataException {
+ try {
+ if (atb.getSize()>0) {
+ FileSystem dfs = FileSystem.get(conf);
+ String fileName = BspUtils.getGlobalAggregateSpillingDirName(conf, superStep) +"/" + UUID.randomUUID();
+ FSDataOutputStream dos = dfs.create(new Path(fileName), true);
+ dos.write(atb.getByteArray(), 0, atb.getSize());
+ dos.flush();
+ dos.close();
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 3e6e9a5..7bd2cf8 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -99,7 +99,7 @@
IntWritable lastSnapshotSuperstep = new IntWritable(0);
boolean failed = false;
int retryCount = 0;
- int maxRetryCount = 3;
+ int maxRetryCount = 1;
do {
try {
@@ -142,12 +142,11 @@
//restart from snapshot
failed = true;
retryCount++;
- ioe.printStackTrace();
+ throw new HyracksException(ioe);
}
} while (failed && retryCount < maxRetryCount);
LOG.info("job finished");
} catch (Exception e) {
- e.printStackTrace();
throw new HyracksException(e);
}
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index d37c6fd..36723a6 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -108,11 +108,13 @@
import edu.uci.ics.pregelix.dataflow.VertexFileScanOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.bootstrap.IndexLifeCycleManagerProvider;
import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
import edu.uci.ics.pregelix.runtime.bootstrap.VirtualBufferCacheProvider;
+import edu.uci.ics.pregelix.runtime.touchpoint.RecoveryRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexPartitionComputerFactory;
@@ -374,9 +376,10 @@
@Override
public JobSpecification[] generateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
try {
+
PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
- FileOutputFormat.setOutputPath(tmpJob, new Path(vertexCheckpointPath));
+ FileOutputFormat.setOutputPath(tmpJob, new Path(vertexCheckpointPath + "/" + lastSuccessfulIteration));
tmpJob.setOutputKeyClass(NullWritable.class);
tmpJob.setOutputValueClass(BspUtils.getVertexClass(tmpJob.getConfiguration()));
JobSpecification vertexCkpSpec = scanIndexWriteToHDFS(tmpJob.getConfiguration());
@@ -409,7 +412,7 @@
try {
PregelixJob tmpJob = this.createCloneJob("Vertex checkpoint loading for job " + jobId, pregelixJob);
tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
- FileInputFormat.setInputPaths(tmpJob, new Path(vertexCheckpointPath));
+ FileInputFormat.setInputPaths(tmpJob, new Path(vertexCheckpointPath + "/" + lastCheckpointedIteration));
JobSpecification vertexLoadSpec = loadHDFSData(tmpJob.getConfiguration());
JobSpecification[] stateLoadSpecs = generateStateCheckpointLoading(lastCheckpointedIteration, tmpJob);
JobSpecification[] specs = new JobSpecification[1 + stateLoadSpecs.length];
@@ -637,7 +640,7 @@
MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
ClusterConfig.setLocationConstraint(spec, materializeRead);
- String checkpointPath = "/tmp/ckpoint/" + jobId + "/message";
+ String checkpointPath = "/tmp/ckpoint/" + jobId + "/message/" + lastSuccessfulIteration;
PregelixJob tmpJob = createCloneJob("State checkpointing for job " + jobId, pregelixJob);
tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
@@ -657,7 +660,7 @@
@SuppressWarnings({ "unchecked", "rawtypes" })
protected JobSpecification[] generateStateCheckpointLoading(int lastCheckpointedIteration, PregelixJob job)
throws HyracksException {
- String checkpointPath = "/tmp/ckpoint/" + jobId + "/message";
+ String checkpointPath = "/tmp/ckpoint/" + jobId + "/message/" + lastCheckpointedIteration;
PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
try {
@@ -696,6 +699,12 @@
recordDescriptor);
ClusterConfig.setLocationConstraint(spec, materialize);
+ /** construct runtime hook */
+ RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+ new RecoveryRuntimeHookFactory(jobId, lastCheckpointedIteration + 1, new ConfigurationFactory(
+ pregelixJob.getConfiguration())));
+ ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
ClusterConfig.setLocationConstraint(spec, emptySink);
@@ -706,7 +715,8 @@
ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0,
materialize, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, emptySink, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
spec.setFrameSize(frameSize);
return new JobSpecification[] { spec };
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 58384b2..41887c0 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -529,7 +529,7 @@
tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
/** generate secondary index checkpoint */
- String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary";
+ String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary/" + lastSuccessfulIteration;
FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
tmpJob.setOutputKeyClass(BspUtils.getVertexIndexClass(tmpJob.getConfiguration()));
tmpJob.setOutputValueClass(MsgList.class);
@@ -569,7 +569,7 @@
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
JobSpecification spec = new JobSpecification();
- String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary";
+ String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary/" + lastSuccessfulIteration;;
PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
try {
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index e8a6b5c..e1795de 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -61,6 +61,8 @@
ccConfig.defaultMaxJobAttempts = 0;
ccConfig.jobHistorySize = 1;
ccConfig.profileDumpPeriod = -1;
+ //ccConfig.heartbeatPeriod = 5000;
+ //ccConfig.maxHeartbeatLapsePeriods = 1;
// cluster controller
cc = new ClusterControllerService(ccConfig);
@@ -74,7 +76,7 @@
ncConfig1.dataIPAddress = "127.0.0.1";
ncConfig1.datasetIPAddress = "127.0.0.1";
ncConfig1.nodeId = NC1_ID;
- ncConfig1.ioDevices="dev1,dev2";
+ ncConfig1.ioDevices = "dev1,dev2";
ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
@@ -87,7 +89,7 @@
ncConfig2.datasetIPAddress = "127.0.0.1";
ncConfig2.nodeId = NC2_ID;
ncConfig2.appNCMainClass = NCApplicationEntryPoint.class.getName();
- ncConfig2.ioDevices="dev3,dev4";
+ ncConfig2.ioDevices = "dev3,dev4";
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
@@ -96,6 +98,14 @@
ClusterConfig.loadClusterConfig(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
}
+ public static void showDownNC1() throws Exception {
+ nc1.stop();
+ }
+
+ public static void showDownNC2() throws Exception {
+ nc2.stop();
+ }
+
public static void deinit() throws Exception {
nc2.stop();
nc1.stop();
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
index 3fed609..c0be9dd 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
@@ -17,9 +17,13 @@
import java.io.DataInput;
import java.io.DataInputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -32,6 +36,7 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
@@ -93,10 +98,28 @@
}
+ @SuppressWarnings("unchecked")
@Override
public void close() throws HyracksDataException {
- Writable finalAggregateValue = aggregator.finishFinal();
- IterationUtils.writeGlobalAggregateValue(conf, jobId, finalAggregateValue);
+ try {
+ // iterate over hdfs spilled aggregates
+ FileSystem dfs = FileSystem.get(conf);
+ String spillingDir = BspUtils.getGlobalAggregateSpillingDirName(conf, Vertex.getSuperstep());
+ FileStatus[] files = dfs.listStatus(new Path(spillingDir));
+ if (files != null) {
+ // goes into this branch only when there are spilled files
+ for (int i = 0; i < files.length; i++) {
+ FileStatus file = files[i];
+ DataInput dis = dfs.open(file.getPath());
+ partialAggregateValue.readFields(dis);
+ aggregator.step(partialAggregateValue);
+ }
+ }
+ Writable finalAggregateValue = aggregator.finishFinal();
+ IterationUtils.writeGlobalAggregateValue(conf, jobId, finalAggregateValue);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
}
};
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index b74a5de..9a21680 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -168,7 +168,8 @@
FileStatus[] results = dfs.listStatus(tempDir, new PathFilter() {
@Override
public boolean accept(Path dir) {
- return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0;
+ return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0
+ && dir.getName().indexOf(".crc") < 0;
}
});
return results;
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index 496d066..a366899 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -61,7 +61,7 @@
private final Map<StateKey, IStateObject> appStateMap = new ConcurrentHashMap<StateKey, IStateObject>();
private final Map<String, Long> jobIdToSuperStep = new ConcurrentHashMap<String, Long>();
private final Map<String, Boolean> jobIdToMove = new ConcurrentHashMap<String, Boolean>();
-
+
private final ThreadFactory threadFactory = new ThreadFactory() {
public Thread newThread(Runnable r) {
return new Thread(r);
@@ -145,7 +145,7 @@
return (RuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
}
- public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges) {
+ public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges, int currentIteration) {
Boolean toMove = jobIdToMove.get(jobId);
if (toMove == null || toMove == true) {
if (jobIdToSuperStep.get(jobId) == null) {
@@ -159,7 +159,11 @@
fileRef.delete();
}
- Vertex.setSuperstep(++superStep);
+ if (currentIteration > 0) {
+ Vertex.setSuperstep(currentIteration);
+ } else {
+ Vertex.setSuperstep(++superStep);
+ }
Vertex.setNumVertices(numVertices);
Vertex.setNumEdges(numEdges);
jobIdToSuperStep.put(jobId, superStep);
@@ -169,8 +173,8 @@
System.gc();
}
- public synchronized void endSuperStep(String giraphJobId) {
- jobIdToMove.put(giraphJobId, true);
+ public synchronized void endSuperStep(String pregelixJobId) {
+ jobIdToMove.put(pregelixJobId, true);
LOGGER.info("end iteration " + Vertex.getSuperstep());
}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index 603a464..1cf81ac 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -75,11 +75,11 @@
context.endSuperStep(giraphJobId);
}
- public static void setProperties(String giraphJobId, IHyracksTaskContext ctx, Configuration conf) {
+ public static void setProperties(String jobId, IHyracksTaskContext ctx, Configuration conf, int currentIteration) {
INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
- context.setVertexProperties(giraphJobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
- conf.getLong(PregelixJob.NUM_EDGES, -1));
+ context.setVertexProperties(jobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
+ conf.getLong(PregelixJob.NUM_EDGES, -1), currentIteration);
}
public static void writeTerminationState(Configuration conf, String jobId, boolean terminate)
diff --git a/pregelix/pregelix-example/pom.xml b/pregelix/pregelix-example/pom.xml
index 1066e3b..9994c0e 100644
--- a/pregelix/pregelix-example/pom.xml
+++ b/pregelix/pregelix-example/pom.xml
@@ -94,7 +94,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
- <version>2.4.1</version>
+ <version>2.5</version>
<configuration>
<filesets>
<fileset>
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
new file mode 100644
index 0000000..dac087f
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * @author yingyib
+ */
+public class FailureRecoveryTest {
+ private static String INPUTPATH = "data/webmap";
+ private static String OUTPUTPAH = "actual/result";
+ private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal";
+
+ @Test
+ public void test() throws Exception {
+ TestCluster testCluster = new TestCluster();
+
+ try {
+ PregelixJob job = new PregelixJob(PageRankVertex.class.getName());
+ job.setVertexClass(PageRankVertex.class);
+ job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+ job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ FileInputFormat.setInputPaths(job, INPUTPATH);
+ FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job.setCheckpointHook(ConservativeCheckpointHook.class);
+
+ testCluster.setUp();
+ Driver driver = new Driver(PageRankVertex.class);
+ // Thread thread = new Thread(new Runnable() {
+ //
+ // @Override
+ // public void run() {
+ // try {
+ // synchronized (this) {
+ // this.wait(10000);
+ // PregelixHyracksIntegrationUtil.showDownNC1();
+ // }
+ // } catch (Exception e) {
+ // throw new IllegalStateException(e);
+ // }
+ // }
+ //
+ // });
+ //thread.start();
+ driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+
+ TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ testCluster.tearDown();
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureVertexTest.java
similarity index 96%
rename from pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java
rename to pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureVertexTest.java
index 5a2636a..a2d32c0 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureVertexTest.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.pregelix.example.test;
+package edu.uci.ics.pregelix.example;
import junit.framework.Assert;
@@ -29,6 +29,7 @@
import edu.uci.ics.pregelix.example.FailureVertex;
import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
/**
* This test case tests the error message propagation.
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
similarity index 91%
rename from pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java
rename to pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
index d2995f1..5a485ba 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.pregelix.example.test;
+package edu.uci.ics.pregelix.example;
import java.io.File;
import java.util.ArrayList;
@@ -24,13 +24,12 @@
import org.junit.Test;
import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
import edu.uci.ics.pregelix.core.driver.Driver;
import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
-import edu.uci.ics.pregelix.example.PageRankVertex;
import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
import edu.uci.ics.pregelix.example.util.TestUtils;
/**
@@ -56,7 +55,7 @@
job1.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileInputFormat.setInputPaths(job1, INPUTPATH);
job1.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
- job1.setCheckpointHook(ConservativeCheckpointHook.class);
+ //job1.setCheckpointHook(ConservativeCheckpointHook.class);
PregelixJob job2 = new PregelixJob(PageRankVertex.class.getName());
job2.setVertexClass(PageRankVertex.class);
@@ -66,7 +65,7 @@
job2.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileOutputFormat.setOutputPath(job2, new Path(OUTPUTPAH));
job2.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
- job2.setCheckpointHook(ConservativeCheckpointHook.class);
+ //job2.setCheckpointHook(ConservativeCheckpointHook.class);
jobs.add(job1);
jobs.add(job2);
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/OverflowAggregatorTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/OverflowAggregatorTest.java
new file mode 100644
index 0000000..474d0a6
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/OverflowAggregatorTest.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example;
+
+import java.io.File;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.aggregator.OverflowAggregator;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * @author yingyib
+ */
+public class OverflowAggregatorTest {
+
+ private static String INPUTPATH = "data/webmap";
+ private static String OUTPUTPAH = "actual/result";
+ private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal";
+
+ @Test
+ public void test() throws Exception {
+ TestCluster testCluster = new TestCluster();
+
+ try {
+ PregelixJob job = new PregelixJob(PageRankVertex.class.getName());
+ job.setVertexClass(PageRankVertex.class);
+ job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+ job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ FileInputFormat.setInputPaths(job, INPUTPATH);
+ FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job.setGlobalAggregatorClass(OverflowAggregator.class);
+
+ testCluster.setUp();
+ Driver driver = new Driver(PageRankVertex.class);
+ driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+
+ TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+ Text text = (Text) IterationUtils.readGlobalAggregateValue(job.getConfiguration(),
+ BspUtils.getJobId(job.getConfiguration()));
+ Assert.assertEquals(text.getLength(), 20 * 32767);
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ testCluster.tearDown();
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/aggregator/OverflowAggregator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/aggregator/OverflowAggregator.java
new file mode 100644
index 0000000..34b8b51
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/aggregator/OverflowAggregator.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.aggregator;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.example.io.DoubleWritable;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * Test the case where the global aggregate's state is bloated
+ *
+ * @author yingyib
+ */
+public class OverflowAggregator extends
+ GlobalAggregator<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable, Text, Text> {
+
+ private int textLength = 0;
+ private int inc = 32767;
+
+ @Override
+ public void init() {
+ textLength = 0;
+ }
+
+ @Override
+ public void step(Vertex<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> v)
+ throws HyracksDataException {
+ textLength += inc;
+ }
+
+ @Override
+ public void step(Text partialResult) {
+ textLength += partialResult.getLength();
+ }
+
+ @Override
+ public Text finishPartial() {
+ byte[] partialResult = new byte[textLength];
+ for (int i = 0; i < partialResult.length; i++) {
+ partialResult[i] = 'a';
+ }
+ Text text = new Text();
+ text.set(partialResult);
+ return text;
+ }
+
+ @Override
+ public Text finishFinal() {
+ byte[] result = new byte[textLength];
+ for (int i = 0; i < result.length; i++) {
+ result[i] = 'a';
+ }
+ Text text = new Text();
+ text.set(result);
+ return text;
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/TestCluster.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
similarity index 98%
rename from pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/TestCluster.java
rename to pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
index d0cf654..40ea690 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/TestCluster.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.pregelix.example.test;
+package edu.uci.ics.pregelix.example.util;
import java.io.BufferedReader;
import java.io.DataOutputStream;
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index ab564fa..f3a0bb4 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -237,8 +237,12 @@
Writable agg = aggregator.finishPartial();
agg.write(tbGlobalAggregate.getDataOutput());
tbGlobalAggregate.addFieldEndOffset();
- appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
- tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize());
+ if (!appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
+ tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize())) {
+ // aggregate state exceed the page size, write to HDFS
+ FrameTupleUtils.flushTupleToHDFS(tbGlobalAggregate, conf, Vertex.getSuperstep());
+ appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
+ }
FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate, writerGlobalAggregate);
} catch (IOException e) {
throw new HyracksDataException(e);
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index b4e1dd8..ca8ec01 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -179,7 +179,7 @@
vertex.setOutputWriters(writers);
vertex.setOutputAppenders(appenders);
vertex.setOutputTupleBuilders(tbs);
-
+
if (vertex.isHalted()) {
vertex.activate();
}
@@ -230,8 +230,12 @@
Writable agg = aggregator.finishPartial();
agg.write(tbGlobalAggregate.getDataOutput());
tbGlobalAggregate.addFieldEndOffset();
- appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
- tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize());
+ if (!appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
+ tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize())) {
+ // aggregate state exceed the page size, write to HDFS
+ FrameTupleUtils.flushTupleToHDFS(tbGlobalAggregate, conf, Vertex.getSuperstep());
+ appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
+ }
FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate, writerGlobalAggregate);
} catch (IOException e) {
throw new HyracksDataException(e);
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
index cd2012a..3dcdad2 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
@@ -40,7 +40,7 @@
@Override
public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
Configuration conf = confFactory.createConfiguration(ctx);
- IterationUtils.setProperties(jobId, ctx, conf);
+ IterationUtils.setProperties(jobId, ctx, conf, -1);
}
};
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RecoveryRuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RecoveryRuntimeHookFactory.java
new file mode 100644
index 0000000..35e7cd8
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RecoveryRuntimeHookFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHook;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+
+/**
+ * Recover the pregelix job state in a NC
+ *
+ * @author yingyib
+ */
+public class RecoveryRuntimeHookFactory implements IRuntimeHookFactory {
+ private static final long serialVersionUID = 1L;
+ private final int currentSuperStep;
+ private String jobId;
+ private IConfigurationFactory confFactory;
+
+ public RecoveryRuntimeHookFactory(String jobId, int currentSuperStep, IConfigurationFactory confFactory) {
+ this.currentSuperStep = currentSuperStep;
+ this.jobId = jobId;
+ this.confFactory = confFactory;
+ }
+
+ @Override
+ public IRuntimeHook createRuntimeHook() {
+ return new IRuntimeHook() {
+
+ @Override
+ public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
+ IterationUtils.endSuperStep(jobId, ctx);
+ Configuration conf = confFactory.createConfiguration(ctx);
+ IterationUtils.setProperties(jobId, ctx, conf, currentSuperStep);
+ }
+
+ };
+ }
+
+}