Merge branch 'master' into dev/hyracks_msr
Conflicts:
hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
diff --git a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index c52130d..f42dfe4 100644
--- a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -35,7 +36,6 @@
import edu.uci.ics.pregelix.api.io.WritableSizable;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.api.util.JobStateUtils;
-import edu.uci.ics.pregelix.api.util.SerDeUtils;
/**
* User applications should all inherit {@link Vertex}, and implement their own
@@ -270,7 +270,7 @@
delegate.setVertex(this);
}
destEdgeList.clear();
- long edgeMapSize = SerDeUtils.readVLong(in);
+ long edgeMapSize = WritableUtils.readVLong(in);
for (long i = 0; i < edgeMapSize; ++i) {
Edge<I, E> edge = allocateEdge();
edge.setConf(getContext().getConfiguration());
@@ -278,7 +278,7 @@
addEdge(edge);
}
msgList.clear();
- long msgListSize = SerDeUtils.readVLong(in);
+ long msgListSize = WritableUtils.readVLong(in);
for (long i = 0; i < msgListSize; ++i) {
M msg = allocateMessage();
msg.readFields(in);
@@ -297,11 +297,11 @@
if (vertexValue != null) {
vertexValue.write(out);
}
- SerDeUtils.writeVLong(out, destEdgeList.size());
+ WritableUtils.writeVLong(out, destEdgeList.size());
for (Edge<I, E> edge : destEdgeList) {
edge.write(out);
}
- SerDeUtils.writeVLong(out, msgList.size());
+ WritableUtils.writeVLong(out, msgList.size());
for (M msg : msgList) {
msg.write(out);
}
@@ -618,7 +618,10 @@
* Terminate the current partition where the current vertex stays in.
* This will immediately take effect and the upcoming vertice in the
* same partition cannot be processed.
+ * <<<<<<< HEAD
*
+ =======
+ * >>>>>>> master
*/
protected final void terminatePartition() {
voteToHalt();
diff --git a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexInputFormat.java b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexInputFormat.java
new file mode 100644
index 0000000..503b521
--- /dev/null
+++ b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexInputFormat.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.io.internal;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * @author yingyib
+ */
+@SuppressWarnings("rawtypes")
+public class InternalVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
+ extends VertexInputFormat<I, V, E, M> {
+ /** Uses the SequenceFileInputFormat to do everything */
+ protected SequenceFileInputFormat sequenceInputFormat = new SequenceFileInputFormat();
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException, InterruptedException {
+ return sequenceInputFormat.getSplits(context);
+ }
+
+ @Override
+ public VertexReader<I, V, E, M> createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
+ return new VertexReader<I, V, E, M>() {
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
+ InterruptedException {
+
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return false;
+ }
+
+ @Override
+ public Vertex<I, V, E, M> getCurrentVertex() throws IOException, InterruptedException {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return 0;
+ }
+
+ };
+ }
+
+}
diff --git a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexOutputFormat.java b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexOutputFormat.java
new file mode 100644
index 0000000..0581658
--- /dev/null
+++ b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexOutputFormat.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.io.internal;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+
+/**
+ * @author yingyib
+ */
+@SuppressWarnings("rawtypes")
+public class InternalVertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable> extends
+ VertexOutputFormat<I, V, E> {
+
+ @Override
+ public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ return new VertexWriter<I, V, E>() {
+
+ @Override
+ public void initialize(TaskAttemptContext context) throws IOException, InterruptedException {
+
+ }
+
+ @Override
+ public void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException, InterruptedException {
+
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+
+ }
+
+ };
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+ return new OutputCommitter() {
+
+ @Override
+ public void abortTask(TaskAttemptContext arg0) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void cleanupJob(JobContext arg0) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext arg0) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void setupJob(JobContext arg0) throws IOException {
+
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext arg0) throws IOException {
+
+ }
+
+ };
+ }
+
+}
diff --git a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index dae7818..d2d5cf0 100644
--- a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -201,4 +201,9 @@
final public void setLSMStorage(boolean variableSizedUpdateHeavyFlag) {
getConfiguration().setBoolean(UPDATE_INTENSIVE, variableSizedUpdateHeavyFlag);
}
+
+ @Override
+ public String toString() {
+ return getJobName();
+ }
}
diff --git a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java
index 7d5f627..f9cad9f 100755
--- a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java
+++ b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java
@@ -36,19 +36,12 @@
public int read() {
int remaining = data.length - position;
int value = remaining > 0 ? (data[position++] & 0xff) : -1;
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("read(): value: " + value + " remaining: " + remaining + " position: " + position);
- }
return value;
}
@Override
public int read(byte[] bytes, int offset, int length) {
int remaining = data.length - position;
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("read(bytes[], int, int): remaining: " + remaining + " offset: " + offset + " length: "
- + length + " position: " + position);
- }
if (remaining == 0) {
return -1;
}
@@ -57,4 +50,9 @@
position += l;
return l;
}
-}
\ No newline at end of file
+
+ @Override
+ public int available() {
+ return data.length - position;
+ }
+}
diff --git a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java
index 25b07ff..a4336a3 100644
--- a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java
+++ b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java
@@ -40,47 +40,4 @@
object.readFields(input);
}
- public static long readVLong(DataInput in) throws IOException {
- int vLen = 0;
- long value = 0L;
- while (true) {
- byte b = (byte) in.readByte();
- ++vLen;
- value += (((long) (b & 0x7f)) << ((vLen - 1) * 7));
- if ((b & 0x80) == 0) {
- break;
- }
- }
- return value;
- }
-
- public static void writeVLong(DataOutput out, long value) throws IOException {
- long data = value;
- do {
- byte b = (byte) (data & 0x7f);
- data >>= 7;
- if (data != 0) {
- b |= 0x80;
- }
- out.write(b);
- } while (data != 0);
- }
-
- public static long readVLong(byte[] data, int start, int length) {
- int vLen = 0;
- long value = 0L;
- while (true) {
- byte b = (byte) data[start];
- ++vLen;
- value += (((long) (b & 0x7f)) << ((vLen - 1) * 7));
- if ((b & 0x80) == 0) {
- break;
- }
- ++start;
- }
- if (vLen != length)
- throw new IllegalStateException("length mismatch -- vLen:" + vLen + " length:" + length);
- return value;
- }
-
}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IDriver.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IDriver.java
index bc6c0cf..c72f392 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IDriver.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IDriver.java
@@ -15,6 +15,8 @@
package edu.uci.ics.pregelix.core.base;
+import java.util.List;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -29,6 +31,11 @@
public void runJob(PregelixJob job, String ipAddress, int port) throws HyracksException;
+ public void runJobs(List<PregelixJob> jobs, String ipAddress, int port) throws HyracksException;
+
public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
throws HyracksException;
+
+ public void runJobs(List<PregelixJob> jobs, Plan planChoice, String ipAddress, int port, boolean profiling)
+ throws HyracksException;
}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 2d4064b..1b4f226 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -17,15 +17,19 @@
import java.io.File;
import java.io.FilenameFilter;
+import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
@@ -36,6 +40,7 @@
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.core.base.IDriver;
import edu.uci.ics.pregelix.core.jobgen.JobGen;
import edu.uci.ics.pregelix.core.jobgen.JobGenInnerJoin;
@@ -48,11 +53,9 @@
@SuppressWarnings("rawtypes")
public class Driver implements IDriver {
private static final Log LOG = LogFactory.getLog(Driver.class);
- private JobGen jobGen;
- private boolean profiling;
-
private IHyracksClientConnection hcc;
private Class exampleClass;
+ private boolean profiling = false;
public Driver(Class exampleClass) {
this.exampleClass = exampleClass;
@@ -64,93 +67,200 @@
}
@Override
+ public void runJobs(List<PregelixJob> jobs, String ipAddress, int port) throws HyracksException {
+ runJobs(jobs, Plan.OUTER_JOIN, ipAddress, port, false);
+ }
+
+ @Override
public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
throws HyracksException {
+ runJobs(Collections.singletonList(job), planChoice, ipAddress, port, profiling);
+ }
+
+ @Override
+ public void runJobs(List<PregelixJob> jobs, Plan planChoice, String ipAddress, int port, boolean profiling)
+ throws HyracksException {
try {
- /** add hadoop configurations */
- URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
- if (hadoopCore != null) {
- job.getConfiguration().addResource(hadoopCore);
+ if (jobs.size() <= 0) {
+ throw new HyracksException("Please submit at least one job for execution!");
}
- URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
- if (hadoopMapRed != null) {
- job.getConfiguration().addResource(hadoopMapRed);
- }
- URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
- if (hadoopHdfs != null) {
- job.getConfiguration().addResource(hadoopHdfs);
- }
- ClusterConfig.loadClusterConfig(ipAddress, port);
-
- LOG.info("job started");
- long start = System.currentTimeMillis();
- long end = start;
- long time = 0;
-
this.profiling = profiling;
+ PregelixJob currentJob = jobs.get(0);
+ PregelixJob lastJob = currentJob;
+ JobGen jobGen = null;
- switch (planChoice) {
- case INNER_JOIN:
- jobGen = new JobGenInnerJoin(job);
- break;
- case OUTER_JOIN:
- jobGen = new JobGenOuterJoin(job);
- break;
- case OUTER_JOIN_SORT:
- jobGen = new JobGenOuterJoinSort(job);
- break;
- case OUTER_JOIN_SINGLE_SORT:
- jobGen = new JobGenOuterJoinSingleSort(job);
- break;
- default:
- jobGen = new JobGenInnerJoin(job);
- }
+ /** prepare job -- deploy jars */
+ DeploymentId deploymentId = prepareJobs(ipAddress, port);
+ LOG.info("job started");
- if (hcc == null)
- hcc = new HyracksConnection(ipAddress, port);
+ int lastSnapshotJobIndex = 0;
+ int lastSnapshotSuperstep = 0;
+ boolean failed = false;
+ int retryCount = 0;
+ int maxRetryCount = 3;
- URLClassLoader classLoader = (URLClassLoader) exampleClass.getClassLoader();
- List<File> jars = new ArrayList<File>();
- URL[] urls = classLoader.getURLs();
- for (URL url : urls)
- if (url.toString().endsWith(".jar"))
- jars.add(new File(url.getPath()));
- DeploymentId deploymentId = installApplication(jars);
-
- start = System.currentTimeMillis();
- FileSystem dfs = FileSystem.get(job.getConfiguration());
- dfs.delete(FileOutputFormat.getOutputPath(job), true);
- runCreate(deploymentId, jobGen);
- runDataLoad(deploymentId, jobGen);
- end = System.currentTimeMillis();
- time = end - start;
- LOG.info("data loading finished " + time + "ms");
- int i = 1;
- boolean terminate = false;
do {
- start = System.currentTimeMillis();
- runLoopBodyIteration(deploymentId, jobGen, i);
- end = System.currentTimeMillis();
- time = end - start;
- LOG.info("iteration " + i + " finished " + time + "ms");
- terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId())
- || IterationUtils.readForceTerminationState(job.getConfiguration(), jobGen.getJobId());
- i++;
- } while (!terminate);
+ try {
+ for (int i = lastSnapshotJobIndex; i < jobs.size(); i++) {
+ lastJob = currentJob;
+ currentJob = jobs.get(i);
- start = System.currentTimeMillis();
- runHDFSWRite(deploymentId, jobGen);
- runCleanup(deploymentId, jobGen);
- end = System.currentTimeMillis();
- time = end - start;
- LOG.info("result writing finished " + time + "ms");
- hcc.unDeployBinary(deploymentId);
+ /** add hadoop configurations */
+ addHadoopConfiguration(currentJob, ipAddress, port);
+
+ /** load the data */
+ if (i == 0 || compatible(lastJob, currentJob)) {
+ if (i != 0) {
+ finishJobs(jobGen, deploymentId);
+ }
+ jobGen = selectJobGen(planChoice, currentJob);
+ loadData(currentJob, jobGen, deploymentId);
+ } else {
+ jobGen.reset(currentJob);
+ }
+
+ /** run loop-body jobs */
+ runLoopBody(deploymentId, currentJob, jobGen, lastSnapshotSuperstep);
+ runClearState(deploymentId, jobGen);
+ }
+
+ /** finish the jobs */
+ finishJobs(jobGen, deploymentId);
+ hcc.unDeployBinary(deploymentId);
+ } catch (IOException ioe) {
+ /** disk failures */
+ //restart from snapshot
+ failed = true;
+ retryCount++;
+ ioe.printStackTrace();
+ }
+ } while (failed && retryCount < maxRetryCount);
LOG.info("job finished");
} catch (Exception e) {
throw new HyracksException(e);
}
}
+ private boolean compatible(PregelixJob lastJob, PregelixJob currentJob) {
+ Class lastVertexIdClass = BspUtils.getVertexIndexClass(lastJob.getConfiguration());
+ Class lastVertexValueClass = BspUtils.getVertexValueClass(lastJob.getConfiguration());
+ Class lastEdgeValueClass = BspUtils.getEdgeValueClass(lastJob.getConfiguration());
+ Path lastOutputPath = FileOutputFormat.getOutputPath(lastJob);
+
+ Class currentVertexIdClass = BspUtils.getVertexIndexClass(currentJob.getConfiguration());
+ Class currentVertexValueClass = BspUtils.getVertexValueClass(currentJob.getConfiguration());
+ Class currentEdegeValueClass = BspUtils.getEdgeValueClass(currentJob.getConfiguration());
+ Path[] currentInputPaths = FileInputFormat.getInputPaths(currentJob);
+
+ return lastVertexIdClass.equals(currentVertexIdClass)
+ && lastVertexValueClass.equals(currentVertexValueClass)
+ && lastEdgeValueClass.equals(currentEdegeValueClass)
+ && (currentInputPaths.length == 0 || (currentInputPaths.length == 1 && lastOutputPath
+ .equals(currentInputPaths[0])));
+ }
+
+ private JobGen selectJobGen(Plan planChoice, PregelixJob currentJob) {
+ JobGen jobGen;
+ switch (planChoice) {
+ case INNER_JOIN:
+ jobGen = new JobGenInnerJoin(currentJob);
+ break;
+ case OUTER_JOIN:
+ jobGen = new JobGenOuterJoin(currentJob);
+ break;
+ case OUTER_JOIN_SORT:
+ jobGen = new JobGenOuterJoinSort(currentJob);
+ break;
+ case OUTER_JOIN_SINGLE_SORT:
+ jobGen = new JobGenOuterJoinSingleSort(currentJob);
+ break;
+ default:
+ jobGen = new JobGenInnerJoin(currentJob);
+ }
+ return jobGen;
+ }
+
+ private long loadData(PregelixJob currentJob, JobGen jobGen, DeploymentId deploymentId) throws IOException,
+ Exception {
+ long start;
+ long end;
+ long time;
+ start = System.currentTimeMillis();
+ FileSystem dfs = FileSystem.get(currentJob.getConfiguration());
+ Path outputPath = FileOutputFormat.getOutputPath(currentJob);
+ if (outputPath != null) {
+ dfs.delete(outputPath, true);
+ }
+ runCreate(deploymentId, jobGen);
+ runDataLoad(deploymentId, jobGen);
+ end = System.currentTimeMillis();
+ time = end - start;
+ LOG.info("data loading finished " + time + "ms");
+ return time;
+ }
+
+ private void finishJobs(JobGen jobGen, DeploymentId deploymentId) throws Exception {
+ long start;
+ long end;
+ long time;
+ start = System.currentTimeMillis();
+ runHDFSWRite(deploymentId, jobGen);
+ runCleanup(deploymentId, jobGen);
+ end = System.currentTimeMillis();
+ time = end - start;
+ LOG.info("result writing finished " + time + "ms");
+ }
+
+ private DeploymentId prepareJobs(String ipAddress, int port) throws Exception {
+ if (hcc == null)
+ hcc = new HyracksConnection(ipAddress, port);
+
+ URLClassLoader classLoader = (URLClassLoader) exampleClass.getClassLoader();
+ List<File> jars = new ArrayList<File>();
+ URL[] urls = classLoader.getURLs();
+ for (URL url : urls)
+ if (url.toString().endsWith(".jar"))
+ jars.add(new File(url.getPath()));
+ DeploymentId deploymentId = installApplication(jars);
+ return deploymentId;
+ }
+
+ private void addHadoopConfiguration(PregelixJob job, String ipAddress, int port) throws HyracksException {
+ URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
+ if (hadoopCore != null) {
+ job.getConfiguration().addResource(hadoopCore);
+ }
+ URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
+ if (hadoopMapRed != null) {
+ job.getConfiguration().addResource(hadoopMapRed);
+ }
+ URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
+ if (hadoopHdfs != null) {
+ job.getConfiguration().addResource(hadoopHdfs);
+ }
+ ClusterConfig.loadClusterConfig(ipAddress, port);
+ }
+
+ private void runLoopBody(DeploymentId deploymentId, PregelixJob job, JobGen jobGen, int snapshotSuperstep)
+ throws Exception {
+ if (snapshotSuperstep > 0) {
+ /** reload the snapshot */
+ }
+ int i = snapshotSuperstep + 1;
+ boolean terminate = false;
+ long start, end, time;
+ do {
+ start = System.currentTimeMillis();
+ runLoopBodyIteration(deploymentId, jobGen, i);
+ end = System.currentTimeMillis();
+ time = end - start;
+ LOG.info(job + ": iteration " + i + " finished " + time + "ms");
+ terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId())
+ || IterationUtils.readForceTerminationState(job.getConfiguration(), jobGen.getJobId());
+ i++;
+ } while (!terminate);
+ }
+
private void runCreate(DeploymentId deploymentId, JobGen jobGen) throws Exception {
try {
JobSpecification treeCreateSpec = jobGen.generateCreatingJob();
@@ -196,6 +306,15 @@
}
}
+ private void runClearState(DeploymentId deploymentId, JobGen jobGen) throws Exception {
+ try {
+ JobSpecification clear = jobGen.generateClearState();
+ execute(deploymentId, clear);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
private void runJobArray(DeploymentId deploymentId, JobSpecification[] jobs) throws Exception {
for (JobSpecification job : jobs) {
execute(deploymentId, job);
@@ -204,6 +323,7 @@
private void execute(DeploymentId deploymentId, JobSpecification job) throws Exception {
job.setUseConnectorPolicyForScheduling(false);
+ job.setMaxReattempts(0);
JobId jobId = hcc.startJob(deploymentId, job,
profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
hcc.waitForCompletion(jobId);
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 478ac07..931ecc3 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -79,6 +79,7 @@
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.ClearStateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.VertexFileScanOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
@@ -99,8 +100,8 @@
protected static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
protected static final int tableSize = 10485767;
protected static final String PRIMARY_INDEX = "primary";
- protected final Configuration conf;
- protected final PregelixJob giraphJob;
+ protected Configuration conf;
+ protected PregelixJob pregelixJob;
protected IIndexLifecycleManagerProvider lcManagerProvider = IndexLifeCycleManagerProvider.INSTANCE;
protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
@@ -111,9 +112,9 @@
protected static final String SECONDARY_INDEX_EVEN = "secondary2";
public JobGen(PregelixJob job) {
- this.conf = job.getConfiguration();
- this.giraphJob = job;
- this.initJobConfiguration();
+ conf = job.getConfiguration();
+ pregelixJob = job;
+ initJobConfiguration();
job.setJobId(jobId);
// set the frame size to be the one user specified if the user did
@@ -128,6 +129,13 @@
}
}
+ public void reset(PregelixJob job) {
+ conf = job.getConfiguration();
+ pregelixJob = job;
+ initJobConfiguration();
+ job.setJobId(jobId);
+ }
+
@SuppressWarnings({ "rawtypes", "unchecked" })
private void initJobConfiguration() {
Class vertexClass = conf.getClass(PregelixJob.VERTEX_CLASS, Vertex.class);
@@ -202,7 +210,7 @@
VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
List<InputSplit> splits = new ArrayList<InputSplit>();
try {
- splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
+ splits = inputFormat.getSplits(pregelixJob, fileSplitProvider.getFileSplits().length);
LOGGER.info("number of splits: " + splits.size());
for (InputSplit split : splits)
LOGGER.info(split.toString());
@@ -241,7 +249,7 @@
typeTraits[1] = new TypeTraits(false);
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
- sortFields, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, false, 0, false,
+ sortFields, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, true, 0, false,
getIndexDataflowHelperFactory(), NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
@@ -280,7 +288,7 @@
VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
List<InputSplit> splits = new ArrayList<InputSplit>();
try {
- splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
+ splits = inputFormat.getSplits(pregelixJob, fileSplitProvider.getFileSplits().length);
} catch (Exception e) {
throw new HyracksDataException(e);
}
@@ -457,6 +465,17 @@
}
/***
+ * generate a "clear state" job
+ */
+ public JobSpecification generateClearState() throws HyracksException {
+ JobSpecification spec = new JobSpecification();
+ ClearStateOperatorDescriptor clearState = new ClearStateOperatorDescriptor(spec, jobId);
+ ClusterConfig.setLocationConstraint(spec, clearState);
+ spec.addRoot(clearState);
+ return spec;
+ }
+
+ /***
* drop the sindex
*
* @return JobSpecification
@@ -494,7 +513,7 @@
NoOpIOOperationCallback.INSTANCE, 0.01);
} else {
return new BTreeDataflowHelperFactory();
- }
+ }
}
/** generate non-first iteration job */
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
index 49309ec..db18e53 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Random;
+import java.util.TreeMap;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -197,7 +198,8 @@
public static void loadClusterConfig(String ipAddress, int port) throws HyracksException {
try {
IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
- Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();
+ Map<String, NodeControllerInfo> ncNameToNcInfos = new TreeMap<String, NodeControllerInfo>();
+ ncNameToNcInfos.putAll(hcc.getNodeControllerInfos());
NCs = new String[ncNameToNcInfos.size()];
ipToNcMapping = new HashMap<String, List<String>>();
int i = 0;
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
index 8709301..392f728 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
@@ -34,7 +34,7 @@
int srcLen = fieldEndOffsets[1] - fieldEndOffsets[0]; // the updated vertex size
int frSize = frameTuple.getFieldLength(1); // the vertex binary size in the leaf page
if (srcLen <= frSize) {
- //doing in-place update if possible, save the "real update" overhead
+ //doing in-place update if the vertex size is not larger than the original size, save the "real update" overhead
System.arraycopy(cloneUpdateTb.getByteArray(), srcStart, frameTuple.getFieldData(1),
frameTuple.getFieldStart(1), srcLen);
cloneUpdateTb.reset();
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/ResetableByteArrayInputStream.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/ResetableByteArrayInputStream.java
index b697466..5be9ffc 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/ResetableByteArrayInputStream.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/ResetableByteArrayInputStream.java
@@ -15,11 +15,8 @@
package edu.uci.ics.pregelix.dataflow.util;
import java.io.InputStream;
-import java.util.logging.Level;
-import java.util.logging.Logger;
public class ResetableByteArrayInputStream extends InputStream {
- private static final Logger LOGGER = Logger.getLogger(ResetableByteArrayInputStream.class.getName());
private byte[] data;
private int position;
@@ -36,19 +33,12 @@
public int read() {
int remaining = data.length - position;
int value = remaining > 0 ? (data[position++] & 0xff) : -1;
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("read(): value: " + value + " remaining: " + remaining + " position: " + position);
- }
return value;
}
@Override
public int read(byte[] bytes, int offset, int length) {
int remaining = data.length - position;
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("read(bytes[], int, int): remaining: " + remaining + " offset: " + offset + " length: "
- + length + " position: " + position);
- }
if (remaining == 0) {
return -1;
}
@@ -57,4 +47,9 @@
position += l;
return l;
}
+
+ @Override
+ public int available() {
+ return data.length - position;
+ }
}
\ No newline at end of file
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
index 150bd8b..2fa1a4b 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
@@ -17,19 +17,15 @@
import java.io.DataInputStream;
import java.io.IOException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameConstants;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
public class TupleDeserializer {
- private static final Logger LOGGER = Logger.getLogger(TupleDeserializer.class.getName());
-
+ private static String ERROR_MSG = "Out-of-bound read in your Writable implementations of types for vertex id, vertex value, edge value or message --- check your readFields and write implmenetation";
private Object[] record;
private RecordDescriptor recordDescriptor;
private ResetableByteArrayInputStream bbis;
@@ -43,132 +39,120 @@
}
public Object[] deserializeRecord(ITupleReference tupleRef) throws HyracksDataException {
- for (int i = 0; i < tupleRef.getFieldCount(); ++i) {
- byte[] data = tupleRef.getFieldData(i);
- int offset = tupleRef.getFieldStart(i);
- bbis.setByteArray(data, offset);
+ try {
+ for (int i = 0; i < tupleRef.getFieldCount(); ++i) {
+ byte[] data = tupleRef.getFieldData(i);
+ int offset = tupleRef.getFieldStart(i);
+ int len = tupleRef.getFieldLength(i);
+ bbis.setByteArray(data, offset);
- Object instance = recordDescriptor.getFields()[i].deserialize(di);
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest(i + " " + instance);
- }
- record[i] = instance;
- if (FrameConstants.DEBUG_FRAME_IO) {
- try {
- if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
- throw new HyracksDataException("Field magic mismatch");
- }
- } catch (IOException e) {
- e.printStackTrace();
+ int availableBefore = bbis.available();
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ int availableAfter = bbis.available();
+ if (availableBefore - availableAfter > len) {
+ throw new IllegalStateException(ERROR_MSG);
}
+
+ record[i] = instance;
}
+ return record;
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
}
- return record;
}
public Object[] deserializeRecord(IFrameTupleAccessor left, int tIndex, ITupleReference right)
throws HyracksDataException {
- byte[] data = left.getBuffer().array();
- int tStart = left.getTupleStartOffset(tIndex) + left.getFieldSlotsLength();
- int leftFieldCount = left.getFieldCount();
- int fStart = tStart;
- for (int i = 0; i < leftFieldCount; ++i) {
- /**
- * reset the input
- */
- fStart = tStart + left.getFieldStartOffset(tIndex, i);
- bbis.setByteArray(data, fStart);
+ try {
+ /** skip vertex id field in deserialization */
+ byte[] data = left.getBuffer().array();
+ int tStart = left.getTupleStartOffset(tIndex) + left.getFieldSlotsLength();
+ int leftFieldCount = left.getFieldCount();
+ int fStart = tStart;
+ for (int i = 1; i < leftFieldCount; ++i) {
+ /**
+ * reset the input
+ */
+ fStart = tStart + left.getFieldStartOffset(tIndex, i);
+ int fieldLength = left.getFieldLength(tIndex, i);
+ bbis.setByteArray(data, fStart);
- /**
- * do deserialization
- */
- Object instance = recordDescriptor.getFields()[i].deserialize(di);
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest(i + " " + instance);
- }
- record[i] = instance;
- if (FrameConstants.DEBUG_FRAME_IO) {
- try {
- if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
- throw new HyracksDataException("Field magic mismatch");
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- for (int i = leftFieldCount; i < record.length; ++i) {
- byte[] rightData = right.getFieldData(i - leftFieldCount);
- int rightOffset = right.getFieldStart(i - leftFieldCount);
- bbis.setByteArray(rightData, rightOffset);
+ /**
+ * do deserialization
+ */
+ int availableBefore = bbis.available();
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ int availableAfter = bbis.available();
+ if (availableBefore - availableAfter > fieldLength) {
+ throw new IllegalStateException(ERROR_MSG);
- Object instance = recordDescriptor.getFields()[i].deserialize(di);
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest(i + " " + instance);
- }
- record[i] = instance;
- if (FrameConstants.DEBUG_FRAME_IO) {
- try {
- if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
- throw new HyracksDataException("Field magic mismatch");
- }
- } catch (IOException e) {
- e.printStackTrace();
}
+ record[i] = instance;
}
+ /** skip vertex id field in deserialization */
+ for (int i = leftFieldCount + 1; i < record.length; ++i) {
+ byte[] rightData = right.getFieldData(i - leftFieldCount);
+ int rightOffset = right.getFieldStart(i - leftFieldCount);
+ int len = right.getFieldLength(i - leftFieldCount);
+ bbis.setByteArray(rightData, rightOffset);
+
+ int availableBefore = bbis.available();
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ int availableAfter = bbis.available();
+ if (availableBefore - availableAfter > len) {
+ throw new IllegalStateException(ERROR_MSG);
+ }
+ record[i] = instance;
+ }
+ return record;
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
}
- return record;
}
public Object[] deserializeRecord(ArrayTupleBuilder tb, ITupleReference right) throws HyracksDataException {
- byte[] data = tb.getByteArray();
- int[] offset = tb.getFieldEndOffsets();
- int start = 0;
- for (int i = 0; i < offset.length; ++i) {
- /**
- * reset the input
- */
- bbis.setByteArray(data, start);
- start = offset[i];
+ try {
+ byte[] data = tb.getByteArray();
+ int[] offset = tb.getFieldEndOffsets();
+ int start = 0;
+ /** skip vertex id fields in deserialization */
+ for (int i = 1; i < offset.length; ++i) {
+ /**
+ * reset the input
+ */
+ start = offset[i - 1];
+ bbis.setByteArray(data, start);
+ int fieldLength = i == 0 ? offset[0] : offset[i] - offset[i - 1];
- /**
- * do deserialization
- */
- Object instance = recordDescriptor.getFields()[i].deserialize(di);
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest(i + " " + instance);
- }
- record[i] = instance;
- if (FrameConstants.DEBUG_FRAME_IO) {
- try {
- if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
- throw new HyracksDataException("Field magic mismatch");
- }
- } catch (IOException e) {
- e.printStackTrace();
+ /**
+ * do deserialization
+ */
+ int availableBefore = bbis.available();
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ int availableAfter = bbis.available();
+ if (availableBefore - availableAfter > fieldLength) {
+ throw new IllegalStateException(ERROR_MSG);
}
+ record[i] = instance;
}
- }
- for (int i = offset.length; i < record.length; ++i) {
- byte[] rightData = right.getFieldData(i - offset.length);
- int rightOffset = right.getFieldStart(i - offset.length);
- bbis.setByteArray(rightData, rightOffset);
+ /** skip vertex id fields in deserialization */
+ for (int i = offset.length + 1; i < record.length; ++i) {
+ byte[] rightData = right.getFieldData(i - offset.length);
+ int rightOffset = right.getFieldStart(i - offset.length);
+ bbis.setByteArray(rightData, rightOffset);
+ int fieldLength = right.getFieldLength(i - offset.length);
- Object instance = recordDescriptor.getFields()[i].deserialize(di);
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest(i + " " + instance);
- }
- record[i] = instance;
- if (FrameConstants.DEBUG_FRAME_IO) {
- try {
- if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
- throw new HyracksDataException("Field magic mismatch");
- }
- } catch (IOException e) {
- e.printStackTrace();
+ int availableBefore = bbis.available();
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ int availableAfter = bbis.available();
+ if (availableBefore - availableAfter > fieldLength) {
+ throw new IllegalStateException(ERROR_MSG);
}
+ record[i] = instance;
}
+ return record;
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
}
- return record;
}
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
index ea1e02e..4421695 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
@@ -28,6 +28,7 @@
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
/**
* The buffer to hold updates.
@@ -96,7 +97,12 @@
fta.reset(buffer);
for (int j = 0; j < fta.getTupleCount(); j++) {
tuple.reset(fta, j);
- bta.update(tuple);
+ try {
+ bta.update(tuple);
+ } catch (TreeIndexNonExistentKeyException e) {
+ // ignore non-existent key exception
+ bta.insert(tuple);
+ }
}
}
diff --git a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ClearStateOperatorDescriptor.java b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ClearStateOperatorDescriptor.java
new file mode 100644
index 0000000..d86557b
--- /dev/null
+++ b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ClearStateOperatorDescriptor.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
+
+/**
+ * Clear the state of the RuntimeContext in one slave
+ *
+ * @author yingyib
+ */
+public class ClearStateOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ private String jobId;
+
+ public ClearStateOperatorDescriptor(JobSpecification spec, String jobId) {
+ super(spec, 0, 0);
+ this.jobId = jobId;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return new IOperatorNodePushable() {
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ RuntimeContext context = (RuntimeContext) ctx.getJobletContext().getApplicationContext()
+ .getApplicationObject();
+ context.clearState(jobId);
+ }
+
+ @Override
+ public void deinitialize() throws HyracksDataException {
+
+ }
+
+ @Override
+ public int getInputArity() {
+ return 0;
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
+ throws HyracksDataException {
+
+ }
+
+ @Override
+ public IFrameWriter getInputFrameWriter(int index) {
+ return null;
+ }
+
+ @Override
+ public String getDisplayName() {
+ return "Clear State Operator";
+ }
+
+ };
+ }
+
+}
diff --git a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index e25a46a..496d066 100644
--- a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -56,11 +56,12 @@
private final IBufferCache bufferCache;
private final IVirtualBufferCache vBufferCache;
private final IFileMapManager fileMapManager;
- private final Map<StateKey, IStateObject> appStateMap = new ConcurrentHashMap<StateKey, IStateObject>();
- private final Map<String, Long> giraphJobIdToSuperStep = new ConcurrentHashMap<String, Long>();
- private final Map<String, Boolean> giraphJobIdToMove = new ConcurrentHashMap<String, Boolean>();
private final IOManager ioManager;
private final Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
+ private final Map<StateKey, IStateObject> appStateMap = new ConcurrentHashMap<StateKey, IStateObject>();
+ private final Map<String, Long> jobIdToSuperStep = new ConcurrentHashMap<String, Long>();
+ private final Map<String, Boolean> jobIdToMove = new ConcurrentHashMap<String, Boolean>();
+
private final ThreadFactory threadFactory = new ThreadFactory() {
public Thread newThread(Runnable r) {
return new Thread(r);
@@ -100,6 +101,18 @@
System.gc();
}
+ public void clearState(String jobId) throws HyracksDataException {
+ for (Entry<Long, List<FileReference>> entry : iterationToFiles.entrySet())
+ for (FileReference fileRef : entry.getValue())
+ fileRef.delete();
+
+ iterationToFiles.clear();
+ appStateMap.clear();
+ jobIdToMove.remove(jobId);
+ jobIdToSuperStep.remove(jobId);
+ System.gc();
+ }
+
public ILocalResourceRepository getLocalResourceRepository() {
return localResourceRepository;
}
@@ -132,14 +145,14 @@
return (RuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
}
- public synchronized void setVertexProperties(String giraphJobId, long numVertices, long numEdges) {
- Boolean toMove = giraphJobIdToMove.get(giraphJobId);
+ public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges) {
+ Boolean toMove = jobIdToMove.get(jobId);
if (toMove == null || toMove == true) {
- if (giraphJobIdToSuperStep.get(giraphJobId) == null) {
- giraphJobIdToSuperStep.put(giraphJobId, 0L);
+ if (jobIdToSuperStep.get(jobId) == null) {
+ jobIdToSuperStep.put(jobId, 0L);
}
- long superStep = giraphJobIdToSuperStep.get(giraphJobId);
+ long superStep = jobIdToSuperStep.get(jobId);
List<FileReference> files = iterationToFiles.remove(superStep - 1);
if (files != null) {
for (FileReference fileRef : files)
@@ -149,15 +162,15 @@
Vertex.setSuperstep(++superStep);
Vertex.setNumVertices(numVertices);
Vertex.setNumEdges(numEdges);
- giraphJobIdToSuperStep.put(giraphJobId, superStep);
- giraphJobIdToMove.put(giraphJobId, false);
+ jobIdToSuperStep.put(jobId, superStep);
+ jobIdToMove.put(jobId, false);
LOGGER.info("start iteration " + Vertex.getSuperstep());
}
System.gc();
}
public synchronized void endSuperStep(String giraphJobId) {
- giraphJobIdToMove.put(giraphJobId, true);
+ jobIdToMove.put(giraphJobId, true);
LOGGER.info("end iteration " + Vertex.getSuperstep());
}
diff --git a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
index 07d2d57..a280c45 100644
--- a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
+++ b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
@@ -139,6 +139,7 @@
job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setDynamicVertexValueSize(true);
Client.run(args, job);
}
diff --git a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/FailureVertex.java b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/FailureVertex.java
new file mode 100644
index 0000000..d2464c1
--- /dev/null
+++ b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/FailureVertex.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.io.FloatWritable;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * @author yingyib
+ */
+public class FailureVertex extends Vertex<VLongWritable, VLongWritable, FloatWritable, VLongWritable> {
+
+ @Override
+ public void compute(Iterator<VLongWritable> msgIterator) throws Exception {
+ if (getVertexId().get() == 10) {
+ throw new IllegalStateException("This job is going to fail");
+ }
+ }
+
+}
diff --git a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
index f60387a..f99321a 100644
--- a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
+++ b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
@@ -16,6 +16,7 @@
package edu.uci.ics.pregelix.example.client;
import java.io.IOException;
+import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -74,6 +75,13 @@
driver.runJob(job, options.planChoice, options.ipAddress, options.port, Boolean.parseBoolean(options.profiling));
}
+ public static void run(String[] args, List<PregelixJob> jobs) throws Exception {
+ Options options = prepareJobs(args, jobs);
+ Driver driver = new Driver(Client.class);
+ driver.runJobs(jobs, options.planChoice, options.ipAddress, options.port,
+ Boolean.parseBoolean(options.profiling));
+ }
+
private static Options prepareJob(String[] args, PregelixJob job) throws CmdLineException, IOException {
Options options = new Options();
CmdLineParser parser = new CmdLineParser(options);
@@ -84,6 +92,32 @@
for (int i = 1; i < inputs.length; i++)
FileInputFormat.addInputPaths(job, inputs[i]);
FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
+ setJobSpecificSettings(job, options);
+ return options;
+ }
+
+ private static Options prepareJobs(String[] args, List<PregelixJob> jobs) throws CmdLineException, IOException {
+ Options options = new Options();
+ CmdLineParser parser = new CmdLineParser(options);
+ parser.parseArgument(args);
+
+ for (int j = 0; j < jobs.size(); j++) {
+ PregelixJob job = jobs.get(j);
+ String[] inputs = options.inputPaths.split(";");
+ if (j == 0) {
+ FileInputFormat.setInputPaths(job, inputs[0]);
+ for (int i = 1; i < inputs.length; i++)
+ FileInputFormat.addInputPaths(job, inputs[i]);
+ }
+ if (j == jobs.size() - 1) {
+ FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
+ }
+ setJobSpecificSettings(job, options);
+ }
+ return options;
+ }
+
+ private static void setJobSpecificSettings(PregelixJob job, Options options) {
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, options.numVertices);
job.getConfiguration().setLong(PregelixJob.NUM_EDGES, options.numEdges);
job.getConfiguration().setLong(ShortestPathsVertex.SOURCE_ID, options.sourceId);
@@ -91,7 +125,6 @@
job.getConfiguration().setLong(ReachabilityVertex.DEST_ID, options.destId);
if (options.numIteration > 0)
job.getConfiguration().setLong(PageRankVertex.ITERATIONS, options.numIteration);
- return options;
}
}
diff --git a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
index 7d824ea..90065c2 100644
--- a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
+++ b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
@@ -14,8 +14,13 @@
*/
package edu.uci.ics.pregelix.example.data;
+import java.io.DataInput;
+import java.io.DataInputStream;
+
+import org.apache.hadoop.io.WritableUtils;
+
import edu.uci.ics.pregelix.api.graph.NormalizedKeyComputer;
-import edu.uci.ics.pregelix.api.util.SerDeUtils;
+import edu.uci.ics.pregelix.api.util.ResetableByteArrayInputStream;
/**
* @author yingyib
@@ -26,34 +31,42 @@
private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
private static final int NEGATIVE_LONG_MASK = (0 << 30);
+ private ResetableByteArrayInputStream bis = new ResetableByteArrayInputStream();
+ private DataInput dis = new DataInputStream(bis);
+
@Override
public int getNormalizedKey(byte[] bytes, int start, int length) {
- long value = SerDeUtils.readVLong(bytes, start, length);
- int highValue = (int) (value >> 32);
- if (highValue > 0) {
- /**
- * larger than Integer.MAX
- */
- int highNmk = getKey(highValue);
- highNmk >>= 2;
- highNmk |= POSTIVE_LONG_MASK;
- return highNmk;
- } else if (highValue == 0) {
- /**
- * smaller than Integer.MAX but >=0
- */
- int lowNmk = (int) value;
- lowNmk >>= 2;
- lowNmk |= NON_NEGATIVE_INT_MASK;
- return lowNmk;
- } else {
- /**
- * less than 0; TODO: have not optimized for that
- */
- int highNmk = getKey(highValue);
- highNmk >>= 2;
- highNmk |= NEGATIVE_LONG_MASK;
- return highNmk;
+ try {
+ bis.setByteArray(bytes, start);
+ long value = WritableUtils.readVLong(dis);
+ int highValue = (int) (value >> 32);
+ if (highValue > 0) {
+ /**
+ * larger than Integer.MAX
+ */
+ int highNmk = getKey(highValue);
+ highNmk >>= 2;
+ highNmk |= POSTIVE_LONG_MASK;
+ return highNmk;
+ } else if (highValue == 0) {
+ /**
+ * smaller than Integer.MAX but >=0
+ */
+ int lowNmk = (int) value;
+ lowNmk >>= 2;
+ lowNmk |= NON_NEGATIVE_INT_MASK;
+ return lowNmk;
+ } else {
+ /**
+ * less than 0; TODO: have not optimized for that
+ */
+ int highNmk = getKey(highValue);
+ highNmk >>= 2;
+ highNmk |= NEGATIVE_LONG_MASK;
+ return highNmk;
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
}
}
diff --git a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
index ec1109f..1c5f629 100644
--- a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
+++ b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
@@ -16,14 +16,14 @@
package edu.uci.ics.pregelix.example.io;
import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
+import java.io.DataInputStream;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
import edu.uci.ics.pregelix.api.io.WritableSizable;
-import edu.uci.ics.pregelix.api.util.SerDeUtils;
+import edu.uci.ics.pregelix.api.util.ResetableByteArrayInputStream;
/**
* A WritableComparable for longs in a variable-length format. Such values take
@@ -32,17 +32,7 @@
* @see org.apache.hadoop.io.WritableUtils#readVLong(DataInput)
*/
@SuppressWarnings("rawtypes")
-public class VLongWritable implements WritableComparable, WritableSizable {
- private static long ONE_BYTE_MAX = 2 ^ 7 - 1;
- private static long TWO_BYTE_MAX = 2 ^ 14 - 1;
- private static long THREE_BYTE_MAX = 2 ^ 21 - 1;
- private static long FOUR_BYTE_MAX = 2 ^ 28 - 1;
- private static long FIVE_BYTE_MAX = 2 ^ 35 - 1;;
- private static long SIX_BYTE_MAX = 2 ^ 42 - 1;;
- private static long SEVEN_BYTE_MAX = 2 ^ 49 - 1;;
- private static long EIGHT_BYTE_MAX = 2 ^ 54 - 1;;
-
- private long value;
+public class VLongWritable extends org.apache.hadoop.io.VLongWritable implements WritableSizable {
public VLongWritable() {
}
@@ -52,78 +42,46 @@
}
public int sizeInBytes() {
- if (value >= 0 && value <= ONE_BYTE_MAX) {
+ long i = get();
+ if (i >= -112 && i <= 127) {
return 1;
- } else if (value > ONE_BYTE_MAX && value <= TWO_BYTE_MAX) {
- return 2;
- } else if (value > TWO_BYTE_MAX && value <= THREE_BYTE_MAX) {
- return 3;
- } else if (value > THREE_BYTE_MAX && value <= FOUR_BYTE_MAX) {
- return 4;
- } else if (value > FOUR_BYTE_MAX && value <= FIVE_BYTE_MAX) {
- return 5;
- } else if (value > FIVE_BYTE_MAX && value <= SIX_BYTE_MAX) {
- return 6;
- } else if (value > SIX_BYTE_MAX && value <= SEVEN_BYTE_MAX) {
- return 7;
- } else if (value > SEVEN_BYTE_MAX && value <= EIGHT_BYTE_MAX) {
- return 8;
- } else {
- return 9;
}
- }
- /** Set the value of this LongWritable. */
- public void set(long value) {
- this.value = value;
- }
+ int len = -112;
+ if (i < 0) {
+ i ^= -1L; // take one's complement'
+ len = -120;
+ }
- /** Return the value of this LongWritable. */
- public long get() {
- return value;
- }
+ long tmp = i;
+ while (tmp != 0) {
+ tmp = tmp >> 8;
+ len--;
+ }
- public void readFields(DataInput in) throws IOException {
- value = SerDeUtils.readVLong(in);
- }
-
- public void write(DataOutput out) throws IOException {
- SerDeUtils.writeVLong(out, value);
- }
-
- /** Returns true iff <code>o</code> is a VLongWritable with the same value. */
- public boolean equals(Object o) {
- if (!(o instanceof VLongWritable))
- return false;
- VLongWritable other = (VLongWritable) o;
- return this.value == other.value;
- }
-
- public int hashCode() {
- return (int) value;
- }
-
- /** Compares two VLongWritables. */
- public int compareTo(Object o) {
- long thisValue = this.value;
- long thatValue = ((VLongWritable) o).value;
- return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
- }
-
- public String toString() {
- return Long.toString(value);
+ len = (len < -120) ? -(len + 120) : -(len + 112);
+ return len + 1;
}
/** A Comparator optimized for LongWritable. */
public static class Comparator extends WritableComparator {
+ private ResetableByteArrayInputStream bis = new ResetableByteArrayInputStream();
+ private DataInput dis = new DataInputStream(bis);
+
public Comparator() {
super(VLongWritable.class);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- long thisValue = SerDeUtils.readVLong(b1, s1, l1);
- long thatValue = SerDeUtils.readVLong(b2, s2, l2);
- return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
+ try {
+ bis.setByteArray(b1, s1);
+ long thisValue = WritableUtils.readVLong(dis);
+ bis.setByteArray(b2, s2);
+ long thatValue = WritableUtils.readVLong(dis);
+ return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
}
}
diff --git a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index 13cec61..cb7fd6d 100644
--- a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -126,6 +126,7 @@
job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setDynamicVertexValueSize(true);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
@@ -140,6 +141,7 @@
job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
job.setVertexPartitionerClass(DefaultVertexPartitioner.class);
+ job.setDynamicVertexValueSize(true);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
diff --git a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
index 196b114..3fe13b1 100644
--- a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
+++ b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
@@ -41,10 +41,13 @@
public void testVLong() throws Exception {
Random rand = new Random(System.currentTimeMillis());
MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+ msgList.add(new VLongWritable(Long.MAX_VALUE));
+ msgList.add(new VLongWritable(Long.MIN_VALUE));
+ msgList.add(new VLongWritable(-1));
for (int i = 0; i < 1000000; i++) {
msgList.add(new VLongWritable(Math.abs(rand.nextLong())));
}
- verifySizeEstimation(msgList);
+ verifyExactSizeEstimation(msgList);
}
@Test
@@ -96,7 +99,7 @@
}
verifySizeEstimation(msgList);
}
-
+
@Test
public void testNull() throws Exception {
MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
@@ -105,7 +108,7 @@
}
verifySizeEstimation(msgList);
}
-
+
@Test
public void testVInt() throws Exception {
Random rand = new Random(System.currentTimeMillis());
@@ -115,7 +118,7 @@
}
verifySizeEstimation(msgList);
}
-
+
@Test
public void testInt() throws Exception {
Random rand = new Random(System.currentTimeMillis());
@@ -148,4 +151,26 @@
}
}
-}
+ private void verifyExactSizeEstimation(MsgList<WritableSizable> msgList) throws Exception {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutput dos = new DataOutputStream(bos);
+ int accumulatedSize = 5;
+ for (int i = 0; i < msgList.size(); i++) {
+ bos.reset();
+ WritableSizable value = msgList.get(i);
+ value.write(dos);
+ if (value.sizeInBytes() != bos.size()) {
+ throw new Exception(value + " estimated size (" + value.sizeInBytes()
+ + ") is smaller than the actual size" + bos.size());
+ }
+ accumulatedSize += value.sizeInBytes();
+ }
+ bos.reset();
+ msgList.write(dos);
+ if (accumulatedSize < bos.size()) {
+ throw new Exception("Estimated list size (" + accumulatedSize + ") is smaller than the actual size"
+ + bos.size());
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java
new file mode 100644
index 0000000..aa0dfdd
--- /dev/null
+++ b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.test;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.ConnectedComponentsVertex;
+import edu.uci.ics.pregelix.example.ConnectedComponentsVertex.SimpleConnectedComponentsVertexOutputFormat;
+import edu.uci.ics.pregelix.example.FailureVertex;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+
+/**
+ * This test case tests the error message propagation.
+ *
+ * @author yingyib
+ */
+public class FailureVertexTest {
+
+ private static String HDFS_INPUTPATH2 = "data/webmapcomplex";
+ private static String HDFS_OUTPUTPAH2 = "actual/resultcomplex";
+
+ @Test
+ public void test() throws Exception {
+ TestCluster testCluster = new TestCluster();
+ try {
+ PregelixJob job = new PregelixJob(FailureVertex.class.getSimpleName());
+ job.setVertexClass(FailureVertex.class);
+ job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
+ job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setDynamicVertexValueSize(true);
+
+ FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
+ FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
+
+ Driver driver = new Driver(FailureVertex.class);
+ testCluster.setUp();
+ driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+ } catch (Exception e) {
+ Assert.assertTrue(e.toString().contains("This job is going to fail"));
+ } finally {
+ testCluster.tearDown();
+ }
+ }
+
+}
diff --git a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java
new file mode 100644
index 0000000..0d6f863
--- /dev/null
+++ b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.PageRankVertex;
+import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * @author yingyib
+ */
+public class JobConcatenationTest {
+
+ private static String INPUTPATH = "data/webmap";
+ private static String OUTPUTPAH = "actual/result";
+ private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal";
+
+ @Test
+ public void test() throws Exception {
+ TestCluster testCluster = new TestCluster();
+
+ try {
+ List<PregelixJob> jobs = new ArrayList<PregelixJob>();
+ PregelixJob job1 = new PregelixJob(PageRankVertex.class.getName());
+ job1.setVertexClass(PageRankVertex.class);
+ job1.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job1.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+ job1.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+ job1.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ FileInputFormat.setInputPaths(job1, INPUTPATH);
+ job1.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+
+ PregelixJob job2 = new PregelixJob(PageRankVertex.class.getName());
+ job2.setVertexClass(PageRankVertex.class);
+ job2.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job2.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+ job2.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+ job2.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ FileOutputFormat.setOutputPath(job2, new Path(OUTPUTPAH));
+ job2.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+
+ jobs.add(job1);
+ jobs.add(job2);
+
+ testCluster.setUp();
+ Driver driver = new Driver(PageRankVertex.class);
+ driver.runJobs(jobs, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+
+ TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ testCluster.tearDown();
+ }
+ }
+
+}
diff --git a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/TestCluster.java b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/TestCluster.java
new file mode 100644
index 0000000..d0cf654
--- /dev/null
+++ b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/TestCluster.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.test;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+
+@SuppressWarnings("deprecation")
+public class TestCluster {
+ private static final Logger LOGGER = Logger.getLogger(TestCluster.class.getName());
+
+ private static final String ACTUAL_RESULT_DIR = "actual";
+ private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+ private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
+ private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
+
+ private static final String DATA_PATH = "data/webmap/webmap_link.txt";
+ private static final String HDFS_PATH = "/webmap/";
+
+ private static final String DATA_PATH2 = "data/webmapcomplex/webmap_link.txt";
+ private static final String HDFS_PATH2 = "/webmapcomplex/";
+
+ private static final String DATA_PATH3 = "data/clique/clique.txt";
+ private static final String HDFS_PATH3 = "/clique/";
+
+ private static final String DATA_PATH4 = "data/clique2/clique.txt";
+ private static final String HDFS_PATH4 = "/clique2/";
+
+ private static final String DATA_PATH5 = "data/clique3/clique.txt";
+ private static final String HDFS_PATH5 = "/clique3/";
+
+ private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
+ private MiniDFSCluster dfsCluster;
+
+ private JobConf conf = new JobConf();
+ private int numberOfNC = 2;
+
+ public void setUp() throws Exception {
+ ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
+ ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
+ cleanupStores();
+ PregelixHyracksIntegrationUtil.init();
+ LOGGER.info("Hyracks mini-cluster started");
+ FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+ FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+ startHDFS();
+ }
+
+ private void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
+
+ private void startHDFS() throws IOException {
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+ FileSystem dfs = FileSystem.get(conf);
+ Path src = new Path(DATA_PATH);
+ Path dest = new Path(HDFS_PATH);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
+
+ src = new Path(DATA_PATH2);
+ dest = new Path(HDFS_PATH2);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
+
+ src = new Path(DATA_PATH3);
+ dest = new Path(HDFS_PATH3);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
+
+ src = new Path(DATA_PATH4);
+ dest = new Path(HDFS_PATH4);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
+
+ src = new Path(DATA_PATH5);
+ dest = new Path(HDFS_PATH5);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
+
+ DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
+ conf.writeXml(confOutput);
+ confOutput.flush();
+ confOutput.close();
+ }
+
+ /**
+ * cleanup hdfs cluster
+ */
+ private void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ }
+
+ public void tearDown() throws Exception {
+ PregelixHyracksIntegrationUtil.deinit();
+ LOGGER.info("Hyracks mini-cluster shut down");
+ cleanupHDFS();
+ }
+
+ protected static List<String> getFileList(String ignorePath) throws FileNotFoundException, IOException {
+ BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
+ String s = null;
+ List<String> ignores = new ArrayList<String>();
+ while ((s = reader.readLine()) != null) {
+ ignores.add(s);
+ }
+ reader.close();
+ return ignores;
+ }
+
+}
diff --git a/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml b/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
index decbde8..df72d9b 100644
--- a/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
+++ b/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
@@ -127,6 +127,7 @@
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
<property><name>topology.script.number.args</name><value>100</value></property>
<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
diff --git a/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml b/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
index cca66bb..b0bf024 100644
--- a/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
+++ b/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
@@ -128,6 +128,7 @@
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
<property><name>topology.script.number.args</name><value>100</value></property>
<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index d46457c..ab564fa 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -205,7 +205,11 @@
*/
if (terminate && (!vertex.isHalted() || vertex.hasMessage() || vertex.createdNewLiveVertex()))
terminate = false;
- aggregator.step(vertex);
+
+ if (msgContentList.segmentEnd()) {
+ /** the if condition makes sure aggregate only calls once per-vertex */
+ aggregator.step(vertex);
+ }
}
@Override
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index eba75c9..b4e1dd8 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -179,10 +179,7 @@
vertex.setOutputWriters(writers);
vertex.setOutputAppenders(appenders);
vertex.setOutputTupleBuilders(tbs);
-
- if (!msgIterator.hasNext() && vertex.isHalted()) {
- return;
- }
+
if (vertex.isHalted()) {
vertex.activate();
}
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
index 3d52a45..acd766e 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
@@ -180,7 +180,7 @@
throw new HyracksDataException(e);
}
}
- return size;
+ return size * 2;
}
private void emitResultTuple(IFrameTupleAccessor accessor, int tIndex, AggregateState state)
@@ -224,4 +224,4 @@
};
}
-}
\ No newline at end of file
+}
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/MsgListNullWriterFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/MsgListNullWriterFactory.java
index 4eaa21c..b7689de 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/MsgListNullWriterFactory.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/MsgListNullWriterFactory.java
@@ -32,6 +32,7 @@
@Override
public void writeNull(DataOutput out) throws HyracksDataException {
try {
+ out.writeByte(3); //start|end
out.writeInt(0);
} catch (IOException e) {
throw new HyracksDataException(e);
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
index 99bcac5..cd2012a 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
@@ -26,11 +26,11 @@
public class PreSuperStepRuntimeHookFactory implements IRuntimeHookFactory {
private static final long serialVersionUID = 1L;
private final IConfigurationFactory confFactory;
- private final String giraphJobId;
+ private final String jobId;
- public PreSuperStepRuntimeHookFactory(String giraphJobId, IConfigurationFactory confFactory) {
+ public PreSuperStepRuntimeHookFactory(String jobId, IConfigurationFactory confFactory) {
this.confFactory = confFactory;
- this.giraphJobId = giraphJobId;
+ this.jobId = jobId;
}
@Override
@@ -40,7 +40,7 @@
@Override
public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
Configuration conf = confFactory.createConfiguration(ctx);
- IterationUtils.setProperties(giraphJobId, ctx, conf);
+ IterationUtils.setProperties(jobId, ctx, conf);
}
};