avoid btree update code path when the target entry has sufficient space
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexInputFormat.java
new file mode 100644
index 0000000..503b521
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexInputFormat.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.io.internal;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * @author yingyib
+ */
+@SuppressWarnings("rawtypes")
+public class InternalVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
+ extends VertexInputFormat<I, V, E, M> {
+ /** Uses the SequenceFileInputFormat to do everything */
+ protected SequenceFileInputFormat sequenceInputFormat = new SequenceFileInputFormat();
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException, InterruptedException {
+ return sequenceInputFormat.getSplits(context);
+ }
+
+ @Override
+ public VertexReader<I, V, E, M> createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
+ return new VertexReader<I, V, E, M>() {
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
+ InterruptedException {
+
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return false;
+ }
+
+ @Override
+ public Vertex<I, V, E, M> getCurrentVertex() throws IOException, InterruptedException {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return 0;
+ }
+
+ };
+ }
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexOutputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexOutputFormat.java
new file mode 100644
index 0000000..0581658
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexOutputFormat.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.io.internal;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+
+/**
+ * @author yingyib
+ */
+@SuppressWarnings("rawtypes")
+public class InternalVertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable> extends
+ VertexOutputFormat<I, V, E> {
+
+ @Override
+ public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ return new VertexWriter<I, V, E>() {
+
+ @Override
+ public void initialize(TaskAttemptContext context) throws IOException, InterruptedException {
+
+ }
+
+ @Override
+ public void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException, InterruptedException {
+
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+
+ }
+
+ };
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+ return new OutputCommitter() {
+
+ @Override
+ public void abortTask(TaskAttemptContext arg0) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void cleanupJob(JobContext arg0) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext arg0) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void setupJob(JobContext arg0) throws IOException {
+
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext arg0) throws IOException {
+
+ }
+
+ };
+ }
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index dae7818..d2d5cf0 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -201,4 +201,9 @@
final public void setLSMStorage(boolean variableSizedUpdateHeavyFlag) {
getConfiguration().setBoolean(UPDATE_INTENSIVE, variableSizedUpdateHeavyFlag);
}
+
+ @Override
+ public String toString() {
+ return getJobName();
+ }
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 5c8cb33..1b4f226 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -93,32 +93,48 @@
DeploymentId deploymentId = prepareJobs(ipAddress, port);
LOG.info("job started");
- for (int i = 0; i < jobs.size(); i++) {
- lastJob = currentJob;
- currentJob = jobs.get(i);
+ int lastSnapshotJobIndex = 0;
+ int lastSnapshotSuperstep = 0;
+ boolean failed = false;
+ int retryCount = 0;
+ int maxRetryCount = 3;
- /** add hadoop configurations */
- addHadoopConfiguration(currentJob, ipAddress, port);
+ do {
+ try {
+ for (int i = lastSnapshotJobIndex; i < jobs.size(); i++) {
+ lastJob = currentJob;
+ currentJob = jobs.get(i);
- /** load the data */
- if (i == 0 || compatible(lastJob, currentJob)) {
- if (i != 0) {
- finishJobs(jobGen, deploymentId);
+ /** add hadoop configurations */
+ addHadoopConfiguration(currentJob, ipAddress, port);
+
+ /** load the data */
+ if (i == 0 || compatible(lastJob, currentJob)) {
+ if (i != 0) {
+ finishJobs(jobGen, deploymentId);
+ }
+ jobGen = selectJobGen(planChoice, currentJob);
+ loadData(currentJob, jobGen, deploymentId);
+ } else {
+ jobGen.reset(currentJob);
+ }
+
+ /** run loop-body jobs */
+ runLoopBody(deploymentId, currentJob, jobGen, lastSnapshotSuperstep);
+ runClearState(deploymentId, jobGen);
}
- jobGen = selectJobGen(planChoice, currentJob);
- loadData(currentJob, jobGen, deploymentId);
- } else {
- jobGen.reset(currentJob);
+
+ /** finish the jobs */
+ finishJobs(jobGen, deploymentId);
+ hcc.unDeployBinary(deploymentId);
+ } catch (IOException ioe) {
+ /** disk failures */
+ //restart from snapshot
+ failed = true;
+ retryCount++;
+ ioe.printStackTrace();
}
-
- /** run loop-body jobs */
- runLoopBody(deploymentId, currentJob, jobGen);
- runClearState(deploymentId, jobGen);
- }
-
- /** finish the jobs */
- finishJobs(jobGen, deploymentId);
- hcc.unDeployBinary(deploymentId);
+ } while (failed && retryCount < maxRetryCount);
LOG.info("job finished");
} catch (Exception e) {
throw new HyracksException(e);
@@ -225,8 +241,12 @@
ClusterConfig.loadClusterConfig(ipAddress, port);
}
- private void runLoopBody(DeploymentId deploymentId, PregelixJob job, JobGen jobGen) throws Exception {
- int i = 1;
+ private void runLoopBody(DeploymentId deploymentId, PregelixJob job, JobGen jobGen, int snapshotSuperstep)
+ throws Exception {
+ if (snapshotSuperstep > 0) {
+ /** reload the snapshot */
+ }
+ int i = snapshotSuperstep + 1;
boolean terminate = false;
long start, end, time;
do {
@@ -234,7 +254,7 @@
runLoopBodyIteration(deploymentId, jobGen, i);
end = System.currentTimeMillis();
time = end - start;
- LOG.info("iteration " + i + " finished " + time + "ms");
+ LOG.info(job + ": iteration " + i + " finished " + time + "ms");
terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId())
|| IterationUtils.readForceTerminationState(job.getConfiguration(), jobGen.getJobId());
i++;
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
index 8e731ba..392f728 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
@@ -33,8 +33,8 @@
int srcStart = fieldEndOffsets[0];
int srcLen = fieldEndOffsets[1] - fieldEndOffsets[0]; // the updated vertex size
int frSize = frameTuple.getFieldLength(1); // the vertex binary size in the leaf page
- if (srcLen == frSize) {
- //doing in-place update if the vertex size is fixed, save the "real update" overhead
+ if (srcLen <= frSize) {
+ //doing in-place update if the vertex size is not larger than the original size, save the "real update" overhead
System.arraycopy(cloneUpdateTb.getByteArray(), srcStart, frameTuple.getFieldData(1),
frameTuple.getFieldStart(1), srcLen);
cloneUpdateTb.reset();
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
index 85f3959..2fa1a4b 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
@@ -49,7 +49,7 @@
int availableBefore = bbis.available();
Object instance = recordDescriptor.getFields()[i].deserialize(di);
int availableAfter = bbis.available();
- if (availableBefore - availableAfter != len) {
+ if (availableBefore - availableAfter > len) {
throw new IllegalStateException(ERROR_MSG);
}
@@ -64,11 +64,12 @@
public Object[] deserializeRecord(IFrameTupleAccessor left, int tIndex, ITupleReference right)
throws HyracksDataException {
try {
+ /** skip vertex id field in deserialization */
byte[] data = left.getBuffer().array();
int tStart = left.getTupleStartOffset(tIndex) + left.getFieldSlotsLength();
int leftFieldCount = left.getFieldCount();
int fStart = tStart;
- for (int i = 0; i < leftFieldCount; ++i) {
+ for (int i = 1; i < leftFieldCount; ++i) {
/**
* reset the input
*/
@@ -82,13 +83,14 @@
int availableBefore = bbis.available();
Object instance = recordDescriptor.getFields()[i].deserialize(di);
int availableAfter = bbis.available();
- if (availableBefore - availableAfter != fieldLength) {
+ if (availableBefore - availableAfter > fieldLength) {
throw new IllegalStateException(ERROR_MSG);
}
record[i] = instance;
}
- for (int i = leftFieldCount; i < record.length; ++i) {
+ /** skip vertex id field in deserialization */
+ for (int i = leftFieldCount + 1; i < record.length; ++i) {
byte[] rightData = right.getFieldData(i - leftFieldCount);
int rightOffset = right.getFieldStart(i - leftFieldCount);
int len = right.getFieldLength(i - leftFieldCount);
@@ -97,7 +99,7 @@
int availableBefore = bbis.available();
Object instance = recordDescriptor.getFields()[i].deserialize(di);
int availableAfter = bbis.available();
- if (availableBefore - availableAfter != len) {
+ if (availableBefore - availableAfter > len) {
throw new IllegalStateException(ERROR_MSG);
}
record[i] = instance;
@@ -113,12 +115,13 @@
byte[] data = tb.getByteArray();
int[] offset = tb.getFieldEndOffsets();
int start = 0;
- for (int i = 0; i < offset.length; ++i) {
+ /** skip vertex id fields in deserialization */
+ for (int i = 1; i < offset.length; ++i) {
/**
* reset the input
*/
+ start = offset[i - 1];
bbis.setByteArray(data, start);
- start = offset[i];
int fieldLength = i == 0 ? offset[0] : offset[i] - offset[i - 1];
/**
@@ -127,12 +130,13 @@
int availableBefore = bbis.available();
Object instance = recordDescriptor.getFields()[i].deserialize(di);
int availableAfter = bbis.available();
- if (availableBefore - availableAfter != fieldLength) {
+ if (availableBefore - availableAfter > fieldLength) {
throw new IllegalStateException(ERROR_MSG);
}
record[i] = instance;
}
- for (int i = offset.length; i < record.length; ++i) {
+ /** skip vertex id fields in deserialization */
+ for (int i = offset.length + 1; i < record.length; ++i) {
byte[] rightData = right.getFieldData(i - offset.length);
int rightOffset = right.getFieldStart(i - offset.length);
bbis.setByteArray(rightData, rightOffset);
@@ -141,7 +145,7 @@
int availableBefore = bbis.available();
Object instance = recordDescriptor.getFields()[i].deserialize(di);
int availableAfter = bbis.available();
- if (availableBefore - availableAfter != fieldLength) {
+ if (availableBefore - availableAfter > fieldLength) {
throw new IllegalStateException(ERROR_MSG);
}
record[i] = instance;