Merge branch 'master' into yingyi/fullstack_fix
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index 2992dfe..ac722e2 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -477,9 +477,14 @@
private void acquireWriteLatch(boolean markDirty) {
latch.writeLock().lock();
if (markDirty) {
- if (dirty.compareAndSet(false, true)) {
- pinCount.incrementAndGet();
- }
+ markDirty();
+ }
+ }
+
+ @Override
+ public void markDirty() {
+ if (dirty.compareAndSet(false, true)) {
+ pinCount.incrementAndGet();
}
}
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/ICachedPageInternal.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/ICachedPageInternal.java
index 22c02d1..b2ba638 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/ICachedPageInternal.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/ICachedPageInternal.java
@@ -20,4 +20,6 @@
public Object getReplacementStrategyObject();
public boolean pinIfGoodVictim();
+
+ public void markDirty();
}
\ No newline at end of file
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 6549c52..15aa7c4 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -182,6 +182,15 @@
}
/**
+ * Set whether the vertex state length is fixed
+ *
+ * @param jobId
+ */
+ final public void setFixedVertexValueSize(boolean fixedSize) {
+ getConfiguration().setBoolean(INCREASE_STATE_LENGTH, !fixedSize);
+ }
+
+ /**
* Set the frame size for a job
*
* @param frameSize
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 4ee1deb..6493127 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -515,7 +515,7 @@
* @return the boolean setting of the parameter, by default it is false
*/
public static boolean getDynamicVertexValueSize(Configuration conf) {
- return conf.getBoolean(PregelixJob.INCREASE_STATE_LENGTH, false);
+ return conf.getBoolean(PregelixJob.INCREASE_STATE_LENGTH, true);
}
/**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index b345e01..b7216c8 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -37,7 +37,6 @@
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -98,8 +97,6 @@
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.RawBinaryComparatorFactory;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.RawNormalizedKeyComputerFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ClearStateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -208,12 +205,13 @@
@Override
public JobSpecification generateCreatingJob() throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
JobSpecification spec = new JobSpecification();
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+ comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);
int[] keyFields = new int[1];
keyFields[0] = 0;
@@ -271,7 +269,7 @@
sortFields[0] = 0;
INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+ comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameLimit, sortFields,
nkmFactory, comparatorFactories, recordDescriptor);
ClusterConfig.setLocationConstraint(spec, sorter);
@@ -330,7 +328,7 @@
RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+ comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
@@ -529,9 +527,9 @@
*/
int[] sortFields = new int[1];
sortFields[0] = 0;
- INormalizedKeyComputerFactory nkmFactory = RawNormalizedKeyComputerFactory.INSTANCE;
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+ comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, sortFields,
nkmFactory, comparatorFactories, recordDescriptor);
ClusterConfig.setLocationConstraint(spec, sorter);
@@ -590,7 +588,7 @@
RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+ comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
@@ -607,8 +605,7 @@
int[] keyFields = new int[] { 0 };
INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getFinalNormalizedKeyComputerFactory(conf);
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
- sortCmpFactories[0] = JobGenUtil.getFinalBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
+ sortCmpFactories[0] = JobGenUtil.getFinalBinaryComparatorFactory(vertexIdClass);
sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields, nkmFactory, sortCmpFactories,
recordDescriptor);
ClusterConfig.setLocationConstraint(spec, scanner);
@@ -672,7 +669,7 @@
* construct the materializing write operator
*/
MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
- false);
+ false, jobId, lastSuccessfulIteration + 1);
ClusterConfig.setLocationConstraint(spec, materializeRead);
String checkpointPath = BspUtils.getMessageCheckpointPath(conf, lastSuccessfulIteration);;
@@ -734,8 +731,7 @@
int[] keyFields = new int[] { 0 };
INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
- sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastCheckpointedIteration,
- WritableComparator.get(vertexIdClass).getClass());
+ sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastCheckpointedIteration, vertexIdClass);
ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, recordDescriptor, Algorithm.QUICK_SORT);
ClusterConfig.setLocationConstraint(spec, sort);
@@ -744,7 +740,7 @@
* construct the materializing write operator
*/
MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec,
- recordDescriptor);
+ recordDescriptor, jobId, lastCheckpointedIteration);
ClusterConfig.setLocationConstraint(spec, materialize);
/** construct runtime hook */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 103c1b6..8c61e6a 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -63,7 +63,6 @@
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.RawBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -129,7 +128,7 @@
RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+ comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
@@ -218,7 +217,8 @@
/**
* construct the materializing write operator
*/
- MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+ MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
+ jobId, iteration);
ClusterConfig.setLocationConstraint(spec, materialize);
/**
@@ -314,7 +314,7 @@
RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), messageValueClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+ comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);;
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
@@ -339,7 +339,7 @@
* construct the materializing write operator
*/
MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
- true);
+ true, jobId, iteration);
ClusterConfig.setLocationConstraint(spec, materializeRead);
/**
@@ -422,7 +422,8 @@
/**
* construct the materializing write operator
*/
- MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+ MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
+ jobId, iteration);
ClusterConfig.setLocationConstraint(spec, materialize);
/** construct runtime hook */
@@ -595,8 +596,7 @@
int[] keyFields = new int[] { 0 };
INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
- sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration,
- WritableComparator.get(vertexIdClass).getClass());
+ sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration, vertexIdClass);
ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, recordDescriptor, Algorithm.QUICK_SORT);
ClusterConfig.setLocationConstraint(spec, sort);
@@ -665,7 +665,7 @@
RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), msgListClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+ comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 308f422..8446379 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -17,7 +17,6 @@
import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -43,7 +42,6 @@
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.RawBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -101,7 +99,7 @@
RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+ comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);;
IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
@@ -134,8 +132,7 @@
int[] keyFields = new int[] { 0 };
INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
- sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
- .getClass());
+ sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
ClusterConfig.setLocationConstraint(spec, localSort);
@@ -163,7 +160,8 @@
/**
* construct the materializing write operator
*/
- MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+ MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
+ jobId, iteration);
ClusterConfig.setLocationConstraint(spec, materialize);
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
@@ -272,7 +270,7 @@
RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), messageValueClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+ comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
@@ -297,7 +295,7 @@
* construct the materializing write operator
*/
MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
- true);
+ true, jobId, iteration);
ClusterConfig.setLocationConstraint(spec, materializeRead);
/**
@@ -332,8 +330,7 @@
*/
INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
- sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
- .getClass());
+ sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
ClusterConfig.setLocationConstraint(spec, localSort);
@@ -359,7 +356,8 @@
/**
* construct the materializing write operator
*/
- MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+ MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
+ jobId, iteration);
ClusterConfig.setLocationConstraint(spec, materialize);
/** construct runtime hook */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 718a271..0259c5c 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -42,7 +42,6 @@
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.RawBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -100,7 +99,7 @@
RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+ comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
@@ -156,7 +155,8 @@
/**
* construct the materializing write operator
*/
- MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+ MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
+ jobId, iteration);
ClusterConfig.setLocationConstraint(spec, materialize);
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
@@ -261,7 +261,7 @@
RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), messageValueClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+ comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
@@ -286,7 +286,7 @@
* construct the materializing write operator
*/
MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
- true);
+ true, jobId, iteration);
ClusterConfig.setLocationConstraint(spec, materializeRead);
/**
@@ -339,7 +339,8 @@
/**
* construct the materializing write operator
*/
- MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+ MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
+ jobId, iteration);
ClusterConfig.setLocationConstraint(spec, materialize);
/** construct runtime hook */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 75635c9..0a38d72 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -42,7 +42,6 @@
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.RawBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -100,7 +99,7 @@
RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+ comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
@@ -169,7 +168,8 @@
/**
* construct the materializing write operator
*/
- MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+ MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
+ jobId, iteration);
ClusterConfig.setLocationConstraint(spec, materialize);
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
@@ -275,7 +275,7 @@
RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), messageValueClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
+ comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
@@ -300,7 +300,7 @@
* construct the materializing write operator
*/
MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
- true);
+ true, jobId, iteration);
ClusterConfig.setLocationConstraint(spec, materializeRead);
/**
@@ -369,7 +369,8 @@
/**
* construct the materializing write operator
*/
- MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+ MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
+ jobId, iteration);
ClusterConfig.setLocationConstraint(spec, materialize);
/** construct runtime hook */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
index 97cea99..9f2a66d 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
@@ -16,6 +16,7 @@
package edu.uci.ics.pregelix.core.jobgen;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
@@ -76,8 +77,8 @@
* @return
*/
@SuppressWarnings("unchecked")
- public static IBinaryComparatorFactory getFinalBinaryComparatorFactory(Class keyClass) {
- return new WritableComparingBinaryComparatorFactory(keyClass);
+ public static IBinaryComparatorFactory getFinalBinaryComparatorFactory(Class vertexIdClass) {
+ return new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass).getClass());
}
/**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index aabd4ba..70de9ed 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -62,7 +62,7 @@
ccConfig.jobHistorySize = 1;
ccConfig.profileDumpPeriod = -1;
ccConfig.heartbeatPeriod = 50;
- ccConfig.maxHeartbeatLapsePeriods = 15;
+ ccConfig.maxHeartbeatLapsePeriods = 10;
// cluster controller
cc = new ClusterControllerService(ccConfig);
diff --git a/pregelix/pregelix-dataflow-std-base/pom.xml b/pregelix/pregelix-dataflow-std-base/pom.xml
index d4c0ee6..2b435a1 100644
--- a/pregelix/pregelix-dataflow-std-base/pom.xml
+++ b/pregelix/pregelix-dataflow-std-base/pom.xml
@@ -1,28 +1,24 @@
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!-- ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License"); ! you may
+ not use this file except in compliance with the License. ! you may obtain
+ a copy of the License from ! ! http://www.apache.org/licenses/LICENSE-2.0
+ ! ! Unless required by applicable law or agreed to in writing, software !
+ distributed under the License is distributed on an "AS IS" BASIS, ! WITHOUT
+ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ! See the
+ License for the specific language governing permissions and ! limitations
+ under the License. ! -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>pregelix-dataflow-std-base</artifactId>
<packaging>jar</packaging>
<name>pregelix-dataflow-std-base</name>
<parent>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>pregelix</artifactId>
- <version>0.2.10-SNAPSHOT</version>
- </parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>pregelix</artifactId>
+ <version>0.2.10-SNAPSHOT</version>
+ </parent>
<properties>
@@ -58,7 +54,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
- <version>2.4.1</version>
+ <version>2.4.1</version>
<configuration>
<filesets>
<fileset>
@@ -94,6 +90,13 @@
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-common</artifactId>
+ <version>0.2.10-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
<version>0.2.10-SNAPSHOT</version>
<type>jar</type>
diff --git a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IUpdateFunction.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IUpdateFunction.java
index b8ba7bd..081b3bc 100644
--- a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IUpdateFunction.java
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IUpdateFunction.java
@@ -18,17 +18,18 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
public interface IUpdateFunction extends IFunction {
- /**
- * update the tuple pointed by tupleRef called after process,
- * one-input-tuple-at-a-time
- *
- * @param tupleRef
- * @throws HyracksDataException
- */
- public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb)
- throws HyracksDataException;
+ /**
+ * update the tuple pointed by tupleRef called after process,
+ * one-input-tuple-at-a-time
+ *
+ * @param tupleRef
+ * @throws HyracksDataException
+ */
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
+ throws HyracksDataException;
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index b22e468..b646661 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -34,6 +34,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
@@ -43,6 +44,7 @@
import edu.uci.ics.pregelix.dataflow.util.CopyUpdateUtil;
import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
import edu.uci.ics.pregelix.dataflow.util.SearchKeyTupleReference;
+import edu.uci.ics.pregelix.dataflow.util.StorageType;
import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
public class IndexNestedLoopJoinFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
@@ -68,6 +70,7 @@
private ArrayTupleBuilder cloneUpdateTb;
private final UpdateBuffer updateBuffer;
private final SearchKeyTupleReference tempTupleReference = new SearchKeyTupleReference();
+ private final StorageType storageType;
public IndexNestedLoopJoinFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
@@ -77,6 +80,11 @@
throws HyracksDataException {
treeIndexOpHelper = (IndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
opDesc, ctx, partition);
+ if (treeIndexOpHelper instanceof TreeIndexDataflowHelper) {
+ storageType = StorageType.TreeIndex;
+ } else {
+ storageType = StorageType.LSMIndex;
+ }
this.lowKeyInclusive = lowKeyInclusive;
this.highKeyInclusive = highKeyInclusive;
this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
@@ -170,13 +178,13 @@
/**
* call the update function
*/
- functionProxy.functionCall(leftAccessor, tIndex, indexEntryTuple, cloneUpdateTb);
+ functionProxy.functionCall(leftAccessor, tIndex, indexEntryTuple, cloneUpdateTb, cursor);
/**
* doing copy update
*/
CopyUpdateUtil.copyUpdate(tempTupleReference, indexEntryTuple, updateBuffer, cloneUpdateTb, indexAccessor,
- cursor, rangePred);
+ cursor, rangePred, false, storageType);
}
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index 0ecfd03..2557a07 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -36,6 +36,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
@@ -45,6 +46,7 @@
import edu.uci.ics.pregelix.dataflow.util.CopyUpdateUtil;
import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
import edu.uci.ics.pregelix.dataflow.util.SearchKeyTupleReference;
+import edu.uci.ics.pregelix.dataflow.util.StorageType;
import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
public class IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable extends
@@ -79,6 +81,7 @@
private ArrayTupleBuilder cloneUpdateTb;
private final UpdateBuffer updateBuffer;
private final SearchKeyTupleReference tempTupleReference = new SearchKeyTupleReference();
+ private final StorageType storageType;
public IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
@@ -88,6 +91,11 @@
inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
treeIndexOpHelper = (IndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
opDesc, ctx, partition);
+ if (treeIndexOpHelper instanceof TreeIndexDataflowHelper) {
+ storageType = StorageType.TreeIndex;
+ } else {
+ storageType = StorageType.LSMIndex;
+ }
this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
if (lowKeyFields != null && lowKeyFields.length > 0) {
@@ -287,13 +295,13 @@
/**
* function call
*/
- functionProxy.functionCall(leftAccessor, tIndex, indexEntryTuple, cloneUpdateTb);
+ functionProxy.functionCall(leftAccessor, tIndex, indexEntryTuple, cloneUpdateTb, cursor);
/**
* doing clone update
*/
CopyUpdateUtil.copyUpdate(tempTupleReference, indexEntryTuple, updateBuffer, cloneUpdateTb, indexAccessor,
- cursor, rangePred);
+ cursor, rangePred, true, storageType);
}
/** write result for outer case */
@@ -301,11 +309,11 @@
/**
* function call
*/
- functionProxy.functionCall(nullTupleBuilder, frameTuple, cloneUpdateTb);
+ functionProxy.functionCall(nullTupleBuilder, frameTuple, cloneUpdateTb, cursor);
//doing clone update
CopyUpdateUtil.copyUpdate(tempTupleReference, frameTuple, updateBuffer, cloneUpdateTb, indexAccessor, cursor,
- rangePred);
+ rangePred, true, storageType);
}
@Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index a9c787f..1d9fd70 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -34,6 +34,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
@@ -43,6 +44,7 @@
import edu.uci.ics.pregelix.dataflow.util.CopyUpdateUtil;
import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
import edu.uci.ics.pregelix.dataflow.util.SearchKeyTupleReference;
+import edu.uci.ics.pregelix.dataflow.util.StorageType;
import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
public class IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
@@ -71,6 +73,7 @@
private ArrayTupleBuilder cloneUpdateTb;
private UpdateBuffer updateBuffer;
private final SearchKeyTupleReference tempTupleReference = new SearchKeyTupleReference();
+ private final StorageType storageType;
public IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
@@ -79,6 +82,11 @@
IRecordDescriptorFactory inputRdFactory, int outputArity) throws HyracksDataException {
treeIndexOpHelper = (IndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
opDesc, ctx, partition);
+ if (treeIndexOpHelper instanceof TreeIndexDataflowHelper) {
+ storageType = StorageType.TreeIndex;
+ } else {
+ storageType = StorageType.LSMIndex;
+ }
this.isForward = isForward;
this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
@@ -238,21 +246,21 @@
/** write the right result */
private void writeRightResults(ITupleReference frameTuple) throws Exception {
- functionProxy.functionCall(frameTuple, cloneUpdateTb);
+ functionProxy.functionCall(frameTuple, cloneUpdateTb, cursor);
//doing clone update
CopyUpdateUtil.copyUpdate(tempTupleReference, frameTuple, updateBuffer, cloneUpdateTb, indexAccessor, cursor,
- rangePred);
+ rangePred, true, storageType);
}
/** write the left result */
private void writeLeftResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference frameTuple)
throws Exception {
- functionProxy.functionCall(leftAccessor, tIndex, frameTuple, cloneUpdateTb);
+ functionProxy.functionCall(leftAccessor, tIndex, frameTuple, cloneUpdateTb, cursor);
//doing clone update
CopyUpdateUtil.copyUpdate(tempTupleReference, frameTuple, updateBuffer, cloneUpdateTb, indexAccessor, cursor,
- rangePred);
+ rangePred, true, storageType);
}
@Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
index de87909..1003431 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
@@ -35,6 +35,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
@@ -44,6 +45,7 @@
import edu.uci.ics.pregelix.dataflow.util.CopyUpdateUtil;
import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
import edu.uci.ics.pregelix.dataflow.util.SearchKeyTupleReference;
+import edu.uci.ics.pregelix.dataflow.util.StorageType;
import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
public class TreeSearchFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
@@ -77,6 +79,7 @@
private ArrayTupleBuilder cloneUpdateTb;
private final UpdateBuffer updateBuffer;
private final SearchKeyTupleReference tempTupleReference = new SearchKeyTupleReference();
+ private final StorageType storageType;
public TreeSearchFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
@@ -86,6 +89,11 @@
throws HyracksDataException {
treeIndexHelper = (IndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
opDesc, ctx, partition);
+ if (treeIndexHelper instanceof TreeIndexDataflowHelper) {
+ storageType = StorageType.TreeIndex;
+ } else {
+ storageType = StorageType.LSMIndex;
+ }
this.isForward = isForward;
this.lowKeyInclusive = lowKeyInclusive;
this.highKeyInclusive = highKeyInclusive;
@@ -171,11 +179,11 @@
while (cursor.hasNext()) {
cursor.next();
ITupleReference tuple = cursor.getTuple();
- functionProxy.functionCall(tuple, cloneUpdateTb);
+ functionProxy.functionCall(tuple, cloneUpdateTb, cursor);
//doing clone update
CopyUpdateUtil.copyUpdate(tempTupleReference, tuple, updateBuffer, cloneUpdateTb, indexAccessor, cursor,
- rangePred);
+ rangePred, true, storageType);
}
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
index 392f728..0ff3f04 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
@@ -27,19 +27,9 @@
public static void copyUpdate(SearchKeyTupleReference tempTupleReference, ITupleReference frameTuple,
UpdateBuffer updateBuffer, ArrayTupleBuilder cloneUpdateTb, IIndexAccessor indexAccessor,
- IIndexCursor cursor, RangePredicate rangePred) throws HyracksDataException, IndexException {
+ IIndexCursor cursor, RangePredicate rangePred, boolean scan, StorageType type) throws HyracksDataException,
+ IndexException {
if (cloneUpdateTb.getSize() > 0) {
- int[] fieldEndOffsets = cloneUpdateTb.getFieldEndOffsets();
- int srcStart = fieldEndOffsets[0];
- int srcLen = fieldEndOffsets[1] - fieldEndOffsets[0]; // the updated vertex size
- int frSize = frameTuple.getFieldLength(1); // the vertex binary size in the leaf page
- if (srcLen <= frSize) {
- //doing in-place update if the vertex size is not larger than the original size, save the "real update" overhead
- System.arraycopy(cloneUpdateTb.getByteArray(), srcStart, frameTuple.getFieldData(1),
- frameTuple.getFieldStart(1), srcLen);
- cloneUpdateTb.reset();
- return;
- }
if (!updateBuffer.appendTuple(cloneUpdateTb)) {
tempTupleReference.reset(frameTuple.getFieldData(0), frameTuple.getFieldStart(0),
frameTuple.getFieldLength(0));
@@ -51,11 +41,18 @@
if (!updateBuffer.appendTuple(cloneUpdateTb)) {
throw new HyracksDataException("cannot append tuple builder!");
}
- //search again and recover the cursor
+ //search again and recover the cursor to the exact point as the one before it is closed
cursor.reset();
- rangePred.setLowKey(tempTupleReference, false);
- rangePred.setHighKey(null, true);
+ rangePred.setLowKey(tempTupleReference, true);
+ if (scan) {
+ rangePred.setHighKey(null, true);
+ } else {
+ rangePred.setHighKey(tempTupleReference, true);
+ }
indexAccessor.search(cursor, rangePred);
+ if (cursor.hasNext()) {
+ cursor.next();
+ }
}
cloneUpdateTb.reset();
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
index a1e5b86..e5bdb17 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
@@ -22,6 +22,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
@@ -80,10 +81,10 @@
* @throws HyracksDataException
*/
public void functionCall(IFrameTupleAccessor leftAccessor, int leftTupleIndex, ITupleReference right,
- ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+ ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor) throws HyracksDataException {
Object[] tuple = tupleDe.deserializeRecord(leftAccessor, leftTupleIndex, right);
function.process(tuple);
- function.update(right, cloneUpdateTb);
+ function.update(right, cloneUpdateTb, cursor);
}
/**
@@ -92,10 +93,11 @@
* @param updateRef
* @throws HyracksDataException
*/
- public void functionCall(ITupleReference updateRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+ public void functionCall(ITupleReference updateRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
+ throws HyracksDataException {
Object[] tuple = tupleDe.deserializeRecord(updateRef);
function.process(tuple);
- function.update(updateRef, cloneUpdateTb);
+ function.update(updateRef, cloneUpdateTb, cursor);
}
/**
@@ -107,11 +109,11 @@
* update pointer
* @throws HyracksDataException
*/
- public void functionCall(ArrayTupleBuilder tb, ITupleReference inPlaceUpdateRef, ArrayTupleBuilder cloneUpdateTb)
- throws HyracksDataException {
+ public void functionCall(ArrayTupleBuilder tb, ITupleReference inPlaceUpdateRef, ArrayTupleBuilder cloneUpdateTb,
+ IIndexCursor cursor) throws HyracksDataException {
Object[] tuple = tupleDe.deserializeRecord(tb, inPlaceUpdateRef);
function.process(tuple);
- function.update(inPlaceUpdateRef, cloneUpdateTb);
+ function.update(inPlaceUpdateRef, cloneUpdateTb, cursor);
}
/**
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/StorageType.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/StorageType.java
new file mode 100644
index 0000000..fb2d1eb
--- /dev/null
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/StorageType.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.dataflow.util;
+
+public enum StorageType {
+ TreeIndex,
+ LSMIndex
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
index b44b643..a5d2ab7 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
@@ -31,11 +31,15 @@
public class MaterializingReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
private final boolean removeIterationState;
+ private final String jobId;
+ private final int iteration;
public MaterializingReadOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor,
- boolean removeIterationState) {
+ boolean removeIterationState, String jobId, int iteration) {
super(spec, 1, 1);
this.removeIterationState = removeIterationState;
+ this.jobId = jobId;
+ this.iteration = iteration - 1;
recordDescriptors[0] = recordDescriptor;
}
@@ -55,8 +59,8 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
if (!complete) {
- MaterializerTaskState state = (MaterializerTaskState) IterationUtils.getIterationState(ctx,
- partition);
+ MaterializerTaskState state = (MaterializerTaskState) IterationUtils.getIterationState(ctx, jobId,
+ partition, iteration);
RunFileReader in = state.getRunFileWriter().createReader();
writer.open();
try {
@@ -85,7 +89,7 @@
* remove last iteration's state
*/
if (removeIterationState) {
- IterationUtils.removeIterationState(ctx, partition);
+ IterationUtils.removeIterationState(ctx, jobId, partition, iteration);
}
writer.close();
complete = true;
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java
index 00dcbd1..921dc40 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java
@@ -38,9 +38,14 @@
public class MaterializingWriteOperatorDescriptor extends AbstractOperatorDescriptor {
private static final long serialVersionUID = 1L;
private final static int MATERIALIZER_ACTIVITY_ID = 0;
+ private final String jobId;
+ private final int iteration;
- public MaterializingWriteOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor) {
+ public MaterializingWriteOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor, String jobId,
+ int iteration) {
super(spec, 1, 1);
+ this.jobId = jobId;
+ this.iteration = iteration;
recordDescriptors[0] = recordDescriptor;
}
@@ -69,13 +74,12 @@
@Override
public void open() throws HyracksDataException {
/** remove last iteration's state */
- IterationUtils.removeIterationState(ctx, partition);
+ IterationUtils.removeIterationState(ctx, jobId, partition, iteration);
state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
partition));
INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
- FileReference file = context.createManagedWorkspaceFile(MaterializingWriteOperatorDescriptor.class
- .getSimpleName());
+ FileReference file = context.createManagedWorkspaceFile(jobId);
state.setRunFileWriter(new RunFileWriter(file, ctx.getIOManager()));
state.getRunFileWriter().open();
writer.open();
@@ -92,7 +96,7 @@
/**
* set iteration state
*/
- IterationUtils.setIterationState(ctx, partition, state);
+ IterationUtils.setIterationState(ctx, jobId, partition, iteration, state);
writer.close();
}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/PJobContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/PJobContext.java
new file mode 100644
index 0000000..15836e6
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/PJobContext.java
@@ -0,0 +1,130 @@
+package edu.uci.ics.pregelix.dataflow.context;
+
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+
+public class PJobContext {
+ private static final Logger LOGGER = Logger.getLogger(RuntimeContext.class.getName());
+
+ private final Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
+ private final Map<TaskIterationID, IStateObject> appStateMap = new ConcurrentHashMap<TaskIterationID, IStateObject>();
+ private final Map<String, Long> jobIdToSuperStep = new ConcurrentHashMap<String, Long>();
+ private final Map<String, Boolean> jobIdToMove = new ConcurrentHashMap<String, Boolean>();
+
+ public void close() throws HyracksDataException {
+ for (Entry<Long, List<FileReference>> entry : iterationToFiles.entrySet())
+ for (FileReference fileRef : entry.getValue())
+ fileRef.delete();
+
+ iterationToFiles.clear();
+ appStateMap.clear();
+ }
+
+ public void clearState(String jobId) throws HyracksDataException {
+ for (Entry<Long, List<FileReference>> entry : iterationToFiles.entrySet())
+ for (FileReference fileRef : entry.getValue())
+ fileRef.delete();
+
+ iterationToFiles.clear();
+ appStateMap.remove(jobId);
+ jobIdToMove.remove(jobId);
+ jobIdToSuperStep.remove(jobId);
+ }
+
+ public Map<TaskIterationID, IStateObject> getAppStateStore() {
+ return appStateMap;
+ }
+
+ public static RuntimeContext get(IHyracksTaskContext ctx) {
+ return (RuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
+ }
+
+ public void setVertexProperties(String jobId, long numVertices, long numEdges, long currentIteration, ClassLoader cl) {
+ Boolean toMove = jobIdToMove.get(jobId);
+ if (toMove == null || toMove == true) {
+ if (jobIdToSuperStep.get(jobId) == null) {
+ if (currentIteration <= 0) {
+ jobIdToSuperStep.put(jobId, 0L);
+ } else {
+ jobIdToSuperStep.put(jobId, currentIteration);
+ }
+ }
+
+ long superStep = jobIdToSuperStep.get(jobId);
+ List<FileReference> files = iterationToFiles.remove(superStep - 1);
+ if (files != null) {
+ for (FileReference fileRef : files)
+ fileRef.delete();
+ }
+
+ setProperties(jobId, numVertices, numEdges, currentIteration, superStep, false, cl);
+ }
+ System.gc();
+ }
+
+ public void recoverVertexProperties(String jobId, long numVertices, long numEdges, long currentIteration,
+ ClassLoader cl) {
+ if (jobIdToSuperStep.get(jobId) == null) {
+ if (currentIteration <= 0) {
+ jobIdToSuperStep.put(jobId, 0L);
+ } else {
+ jobIdToSuperStep.put(jobId, currentIteration);
+ }
+ }
+
+ long superStep = jobIdToSuperStep.get(jobId);
+ List<FileReference> files = iterationToFiles.remove(superStep - 1);
+ if (files != null) {
+ for (FileReference fileRef : files)
+ fileRef.delete();
+ }
+
+ setProperties(jobId, numVertices, numEdges, currentIteration, superStep, true, cl);
+ }
+
+ public void endSuperStep(String pregelixJobId) {
+ jobIdToMove.put(pregelixJobId, true);
+ LOGGER.info("end iteration " + Vertex.getSuperstep());
+ }
+
+ public Map<Long, List<FileReference>> getIterationToFiles() {
+ return iterationToFiles;
+ }
+
+ private void setProperties(String jobId, long numVertices, long numEdges, long currentIteration, long superStep,
+ boolean toMove, ClassLoader cl) {
+ try {
+ Class<?> vClass = (Class<?>) cl.loadClass("edu.uci.ics.pregelix.api.graph.Vertex");
+ Method superStepMethod = vClass.getMethod("setSuperstep", Long.TYPE);
+ Method numVerticesMethod = vClass.getMethod("setNumVertices", Long.TYPE);
+ Method numEdgesMethod = vClass.getMethod("setNumEdges", Long.TYPE);
+
+ if (currentIteration > 0) {
+ //Vertex.setSuperstep(currentIteration);
+ superStepMethod.invoke(null, currentIteration);
+ } else {
+ //Vertex.setSuperstep(++superStep);
+ superStepMethod.invoke(null, ++superStep);
+ }
+ //Vertex.setNumVertices(numVertices);
+ numVerticesMethod.invoke(null, numVertices);
+ //Vertex.setNumEdges(numEdges);
+ numEdgesMethod.invoke(null, numEdges);
+ jobIdToSuperStep.put(jobId, superStep);
+ jobIdToMove.put(jobId, toMove);
+ LOGGER.info("start iteration " + Vertex.getSuperstep());
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index f3f7513..5c44d65 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -20,7 +20,6 @@
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
-import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -48,7 +47,6 @@
import edu.uci.ics.pregelix.api.graph.Vertex;
public class RuntimeContext implements IWorkspaceFileFactory {
- private static final Logger LOGGER = Logger.getLogger(RuntimeContext.class.getName());
private final IIndexLifecycleManager lcManager;
private final ILocalResourceRepository localResourceRepository;
@@ -57,10 +55,7 @@
private final List<IVirtualBufferCache> vbcs;
private final IFileMapManager fileMapManager;
private final IOManager ioManager;
- private final Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
- private final Map<StateKey, IStateObject> appStateMap = new ConcurrentHashMap<StateKey, IStateObject>();
- private final Map<String, Long> jobIdToSuperStep = new ConcurrentHashMap<String, Long>();
- private final Map<String, Boolean> jobIdToMove = new ConcurrentHashMap<String, Boolean>();
+ private final Map<String, PJobContext> activeJobs = new ConcurrentHashMap<String, PJobContext>();
private final ThreadFactory threadFactory = new ThreadFactory() {
public Thread newThread(Runnable r) {
@@ -92,27 +87,11 @@
}
public void close() throws HyracksDataException {
- for (Entry<Long, List<FileReference>> entry : iterationToFiles.entrySet())
- for (FileReference fileRef : entry.getValue())
- fileRef.delete();
-
- iterationToFiles.clear();
bufferCache.close();
- appStateMap.clear();
-
- System.gc();
- }
-
- public void clearState(String jobId) throws HyracksDataException {
- for (Entry<Long, List<FileReference>> entry : iterationToFiles.entrySet())
- for (FileReference fileRef : entry.getValue())
- fileRef.delete();
-
- iterationToFiles.clear();
- appStateMap.clear();
- jobIdToMove.remove(jobId);
- jobIdToSuperStep.remove(jobId);
- System.gc();
+ for (Entry<String, PJobContext> entry : activeJobs.entrySet()) {
+ entry.getValue().close();
+ }
+ activeJobs.clear();
}
public ILocalResourceRepository getLocalResourceRepository() {
@@ -139,87 +118,55 @@
return fileMapManager;
}
- public Map<StateKey, IStateObject> getAppStateStore() {
- return appStateMap;
+ public Map<TaskIterationID, IStateObject> getAppStateStore(String jobId) {
+ PJobContext activeJob = getActiveJob(jobId);
+ return activeJob.getAppStateStore();
}
public static RuntimeContext get(IHyracksTaskContext ctx) {
return (RuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
}
- public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges, long currentIteration) {
- Boolean toMove = jobIdToMove.get(jobId);
- if (toMove == null || toMove == true) {
- if (jobIdToSuperStep.get(jobId) == null) {
- if (currentIteration <= 0) {
- jobIdToSuperStep.put(jobId, 0L);
- } else {
- jobIdToSuperStep.put(jobId, currentIteration);
- }
- }
-
- long superStep = jobIdToSuperStep.get(jobId);
- List<FileReference> files = iterationToFiles.remove(superStep - 1);
- if (files != null) {
- for (FileReference fileRef : files)
- fileRef.delete();
- }
-
- if (currentIteration > 0) {
- Vertex.setSuperstep(currentIteration);
- } else {
- Vertex.setSuperstep(++superStep);
- }
- Vertex.setNumVertices(numVertices);
- Vertex.setNumEdges(numEdges);
- jobIdToSuperStep.put(jobId, superStep);
- jobIdToMove.put(jobId, false);
- LOGGER.info("start iteration " + Vertex.getSuperstep());
- }
- System.gc();
+ public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges, long currentIteration,
+ ClassLoader cl) {
+ PJobContext activeJob = getActiveJob(jobId);
+ activeJob.setVertexProperties(jobId, numVertices, numEdges, currentIteration, cl);
}
public synchronized void recoverVertexProperties(String jobId, long numVertices, long numEdges,
- long currentIteration) {
- if (jobIdToSuperStep.get(jobId) == null) {
- if (currentIteration <= 0) {
- jobIdToSuperStep.put(jobId, 0L);
- } else {
- jobIdToSuperStep.put(jobId, currentIteration);
- }
- }
-
- long superStep = jobIdToSuperStep.get(jobId);
- List<FileReference> files = iterationToFiles.remove(superStep - 1);
- if (files != null) {
- for (FileReference fileRef : files)
- fileRef.delete();
- }
-
- if (currentIteration > 0) {
- Vertex.setSuperstep(currentIteration);
- } else {
- Vertex.setSuperstep(++superStep);
- }
- Vertex.setNumVertices(numVertices);
- Vertex.setNumEdges(numEdges);
- jobIdToSuperStep.put(jobId, superStep);
- jobIdToMove.put(jobId, true);
- LOGGER.info("recovered iteration " + Vertex.getSuperstep());
+ long currentIteration, ClassLoader cl) {
+ PJobContext activeJob = getActiveJob(jobId);
+ activeJob.recoverVertexProperties(jobId, numVertices, numEdges, currentIteration, cl);
}
- public synchronized void endSuperStep(String pregelixJobId) {
- jobIdToMove.put(pregelixJobId, true);
- LOGGER.info("end iteration " + Vertex.getSuperstep());
+ public synchronized void endSuperStep(String jobId) {
+ PJobContext activeJob = getActiveJob(jobId);
+ activeJob.endSuperStep(jobId);
+ }
+
+ public void clearState(String jobId) throws HyracksDataException {
+ PJobContext activeJob = getActiveJob(jobId);
+ activeJob.clearState(jobId);
+ activeJobs.remove(jobId);
+ }
+
+ private PJobContext getActiveJob(String jobId) {
+ PJobContext activeJob = activeJobs.get(jobId);
+ if (activeJob == null) {
+ activeJob = new PJobContext();
+ activeJobs.put(jobId, activeJob);
+ }
+ return activeJob;
}
@Override
- public FileReference createManagedWorkspaceFile(String prefix) throws HyracksDataException {
- final FileReference fRef = ioManager.createWorkspaceFile(prefix);
- List<FileReference> files = iterationToFiles.get(Vertex.getSuperstep());
+ public FileReference createManagedWorkspaceFile(String jobId) throws HyracksDataException {
+ final FileReference fRef = ioManager.createWorkspaceFile(jobId);
+ PJobContext activeJob = getActiveJob(jobId);
+ List<FileReference> files = activeJob.getIterationToFiles().get(Vertex.getSuperstep());
if (files == null) {
files = new ArrayList<FileReference>();
- iterationToFiles.put(Vertex.getSuperstep(), files);
+ activeJob.getIterationToFiles().put(Vertex.getSuperstep(), files);
}
files.add(fRef);
return fRef;
@@ -229,4 +176,5 @@
public FileReference createUnmanagedWorkspaceFile(String prefix) throws HyracksDataException {
return ioManager.createWorkspaceFile(prefix);
}
+
}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/StateKey.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/TaskID.java
similarity index 64%
rename from pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/StateKey.java
rename to pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/TaskID.java
index cbd90b9..f219ed2 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/StateKey.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/TaskID.java
@@ -12,34 +12,44 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package edu.uci.ics.pregelix.dataflow.context;
-import edu.uci.ics.hyracks.api.job.JobId;
+public class TaskID {
-public class StateKey {
- private final JobId jobId;
- private final int partition;
+ private String jobId;
+ private int partition;
- public StateKey(JobId jobId, int partition) {
+ public TaskID(String jobId, int partition) {
this.jobId = jobId;
this.partition = partition;
}
+ public String getJobId() {
+ return jobId;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
@Override
public int hashCode() {
- return jobId.hashCode() * partition;
+ return jobId.hashCode() + partition;
}
@Override
public boolean equals(Object o) {
- if (!(o instanceof StateKey))
+ if (!(o instanceof TaskID)) {
return false;
- StateKey key = (StateKey) o;
- return key.jobId.equals(jobId) && key.partition == partition;
+ }
+ TaskID tid = (TaskID) o;
+ return jobId.equals(tid.getJobId()) && partition == tid.getPartition();
}
@Override
public String toString() {
- return jobId.toString() + ":" + partition;
+ return "job:" + jobId + " partition:" + partition;
}
+
}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/TaskIterationID.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/TaskIterationID.java
new file mode 100644
index 0000000..53c6a0c
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/TaskIterationID.java
@@ -0,0 +1,51 @@
+package edu.uci.ics.pregelix.dataflow.context;
+
+public class TaskIterationID {
+
+ private TaskID tid;
+ private int iteration;
+
+ public TaskIterationID(TaskID tid, int iteration) {
+ this.tid = tid;
+ this.iteration = iteration;
+ }
+
+ public TaskIterationID(String jobId, int partition, int iteration) {
+ this.tid = new TaskID(jobId, partition);
+ this.iteration = iteration;
+ }
+
+ public TaskID getTaskID() {
+ return tid;
+ }
+
+ public int getIteration() {
+ return iteration;
+ }
+
+ public TaskIterationID getNextTaskIterationID() {
+ return new TaskIterationID(tid, iteration + 1);
+ }
+
+ public TaskIterationID getPreviousTaskIterationID() {
+ return new TaskIterationID(tid, iteration - 1);
+ }
+
+ @Override
+ public int hashCode() {
+ return tid.hashCode() + iteration;
+ }
+
+ public boolean equals(Object o) {
+ if (!(o instanceof TaskIterationID)) {
+ return false;
+ }
+ TaskIterationID tiid = (TaskIterationID) o;
+ return tid.equals(tiid.getTaskID()) && iteration == tiid.getIteration();
+ }
+
+ @Override
+ public String toString() {
+ return tid.toString() + " iteration:" + iteration;
+ }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index 02097bf..8052b7c 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -28,73 +28,66 @@
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.api.util.JobStateUtils;
import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
-import edu.uci.ics.pregelix.dataflow.context.StateKey;
+import edu.uci.ics.pregelix.dataflow.context.TaskIterationID;
public class IterationUtils {
public static final String TMP_DIR = "/tmp/";
- public static void setIterationState(IHyracksTaskContext ctx, int partition, IStateObject state) {
+ public static void setIterationState(IHyracksTaskContext ctx, String pregelixJobId, int partition, int iteration,
+ IStateObject state) {
INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
- Map<StateKey, IStateObject> map = context.getAppStateStore();
- map.put(new StateKey(ctx.getJobletContext().getJobId(), partition), state);
+ Map<TaskIterationID, IStateObject> map = context.getAppStateStore(pregelixJobId);
+ map.put(new TaskIterationID(pregelixJobId, partition, iteration), state);
}
- public static IStateObject getIterationState(IHyracksTaskContext ctx, int partition) {
- JobId currentId = ctx.getJobletContext().getJobId();
- JobId lastId = new JobId(currentId.getId() - 1);
+ public static IStateObject getIterationState(IHyracksTaskContext ctx, String pregelixJobId, int partition,
+ int iteration) {
INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
- Map<StateKey, IStateObject> map = context.getAppStateStore();
- IStateObject state = map.get(new StateKey(lastId, partition));
- while (state == null) {
- /** in case the last job is a checkpointing job */
- lastId = new JobId(lastId.getId() - 1);
- state = map.get(new StateKey(lastId, partition));
- }
+ Map<TaskIterationID, IStateObject> map = context.getAppStateStore(pregelixJobId);
+ IStateObject state = map.get(new TaskIterationID(pregelixJobId, partition, iteration));
return state;
}
- public static void removeIterationState(IHyracksTaskContext ctx, int partition) {
- JobId currentId = ctx.getJobletContext().getJobId();
- JobId lastId = new JobId(currentId.getId() - 1);
+ public static void removeIterationState(IHyracksTaskContext ctx, String pregelixJobId, int partition, int iteration) {
INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
- Map<StateKey, IStateObject> map = context.getAppStateStore();
- map.remove(new StateKey(lastId, partition));
+ Map<TaskIterationID, IStateObject> map = context.getAppStateStore(pregelixJobId);
+ map.remove(new TaskIterationID(pregelixJobId, partition, iteration));
}
- public static void endSuperStep(String giraphJobId, IHyracksTaskContext ctx) {
+ public static void endSuperStep(String pregelixJobId, IHyracksTaskContext ctx) {
INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
- context.endSuperStep(giraphJobId);
+ context.endSuperStep(pregelixJobId);
}
- public static void setProperties(String jobId, IHyracksTaskContext ctx, Configuration conf, long currentIteration) {
- INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
- RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
- context.setVertexProperties(jobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
- conf.getLong(PregelixJob.NUM_EDGES, -1), currentIteration);
- }
-
- public static void recoverProperties(String jobId, IHyracksTaskContext ctx, Configuration conf,
+ public static void setProperties(String pregelixJobId, IHyracksTaskContext ctx, Configuration conf,
long currentIteration) {
INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
- context.recoverVertexProperties(jobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
- conf.getLong(PregelixJob.NUM_EDGES, -1), currentIteration);
+ context.setVertexProperties(pregelixJobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
+ conf.getLong(PregelixJob.NUM_EDGES, -1), currentIteration, ctx.getJobletContext().getClassLoader());
}
- public static void writeTerminationState(Configuration conf, String jobId, boolean terminate)
+ public static void recoverProperties(String pregelixJobId, IHyracksTaskContext ctx, Configuration conf,
+ long currentIteration) {
+ INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
+ RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
+ context.recoverVertexProperties(pregelixJobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
+ conf.getLong(PregelixJob.NUM_EDGES, -1), currentIteration, ctx.getJobletContext().getClassLoader());
+ }
+
+ public static void writeTerminationState(Configuration conf, String pregelixJobId, boolean terminate)
throws HyracksDataException {
try {
FileSystem dfs = FileSystem.get(conf);
- String pathStr = IterationUtils.TMP_DIR + jobId;
+ String pathStr = IterationUtils.TMP_DIR + pregelixJobId;
Path path = new Path(pathStr);
FSDataOutputStream output = dfs.create(path, true);
output.writeBoolean(terminate);
@@ -105,11 +98,11 @@
}
}
- public static void writeGlobalAggregateValue(Configuration conf, String jobId, Writable agg)
+ public static void writeGlobalAggregateValue(Configuration conf, String pregelixJobId, Writable agg)
throws HyracksDataException {
try {
FileSystem dfs = FileSystem.get(conf);
- String pathStr = IterationUtils.TMP_DIR + jobId + "agg";
+ String pathStr = IterationUtils.TMP_DIR + pregelixJobId + "agg";
Path path = new Path(pathStr);
FSDataOutputStream output = dfs.create(path, true);
agg.write(output);
@@ -120,10 +113,10 @@
}
}
- public static boolean readTerminationState(Configuration conf, String jobId) throws HyracksDataException {
+ public static boolean readTerminationState(Configuration conf, String pregelixJobId) throws HyracksDataException {
try {
FileSystem dfs = FileSystem.get(conf);
- String pathStr = IterationUtils.TMP_DIR + jobId;
+ String pathStr = IterationUtils.TMP_DIR + pregelixJobId;
Path path = new Path(pathStr);
FSDataInputStream input = dfs.open(path);
boolean terminate = input.readBoolean();
@@ -134,8 +127,8 @@
}
}
- public static void writeForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
- JobStateUtils.writeForceTerminationState(conf, jobId);
+ public static void writeForceTerminationState(Configuration conf, String pregelixJobId) throws HyracksDataException {
+ JobStateUtils.writeForceTerminationState(conf, pregelixJobId);
}
public static boolean readForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
index a866c1c..2508a1e 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
@@ -218,6 +218,7 @@
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setFixedVertexValueSize(true);
Client.run(args, job);
}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryInnerJoinTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryInnerJoinTest.java
index ff1e29f..421f2f5 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryInnerJoinTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryInnerJoinTest.java
@@ -56,6 +56,7 @@
FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
job.setCheckpointHook(ConservativeCheckpointHook.class);
+ job.setFixedVertexValueSize(true);
testCluster.setUp();
Driver driver = new Driver(PageRankVertex.class);
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
index 3fdaf15..b3ad112 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
@@ -55,6 +55,7 @@
FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
job.setCheckpointHook(ConservativeCheckpointHook.class);
+ job.setFixedVertexValueSize(true);
testCluster.setUp();
Driver driver = new Driver(PageRankVertex.class);
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryWithoutCheckpointTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryWithoutCheckpointTest.java
index e006ccd..9a2ef2c 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryWithoutCheckpointTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryWithoutCheckpointTest.java
@@ -53,6 +53,7 @@
FileInputFormat.setInputPaths(job, INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job.setFixedVertexValueSize(true);
testCluster.setUp();
Driver driver = new Driver(PageRankVertex.class);
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/MultiJobTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/MultiJobTest.java
new file mode 100644
index 0000000..3f42c0d
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/MultiJobTest.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
+import edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.ConnectedComponentsVertex.SimpleConnectedComponentsVertexOutputFormat;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * This test case tests multi-user workload.
+ *
+ * @author yingyib
+ */
+public class MultiJobTest {
+ private static String INPUTPATH = "data/webmapcomplex";
+ private static String OUTPUTPAH = "actual/result";
+ private static String OUTPUTPAH2 = "actual/result2";
+ private static String EXPECTEDPATH = "src/test/resources/expected/ConnectedComponentsRealComplex";
+
+ @Test
+ public void test() throws Exception {
+ TestCluster testCluster = new TestCluster();
+ try {
+ PregelixJob job = new PregelixJob(ConnectedComponentsVertex.class.getName());
+ job.setVertexClass(ConnectedComponentsVertex.class);
+ job.setVertexClass(ConnectedComponentsVertex.class);
+ job.setVertexInputFormatClass(TextConnectedComponentsInputFormat.class);
+ job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
+ job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setVertexPartitionerClass(DefaultVertexPartitioner.class);
+ job.setDynamicVertexValueSize(true);
+ FileInputFormat.setInputPaths(job, INPUTPATH);
+ FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
+ job.setCheckpointHook(ConservativeCheckpointHook.class);
+
+ testCluster.setUp();
+ Thread thread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ Driver driver = new Driver(PageRankVertex.class);
+ PregelixJob job2 = new PregelixJob(ConnectedComponentsVertex.class.getName());
+ job2.setVertexClass(ConnectedComponentsVertex.class);
+ job2.setVertexClass(ConnectedComponentsVertex.class);
+ job2.setVertexInputFormatClass(TextConnectedComponentsInputFormat.class);
+ job2.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
+ job2.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
+ job2.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job2.setVertexPartitionerClass(DefaultVertexPartitioner.class);
+ job2.setDynamicVertexValueSize(true);
+ FileInputFormat.setInputPaths(job2, INPUTPATH);
+ FileOutputFormat.setOutputPath(job2, new Path(OUTPUTPAH2));
+ job2.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
+ job2.setCheckpointHook(ConservativeCheckpointHook.class);
+ driver.runJob(job2, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+ TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH2));
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ });
+ thread.start();
+ Driver driver = new Driver(PageRankVertex.class);
+ driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+ TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+ thread.join();
+ } catch (Exception e) {
+ PregelixHyracksIntegrationUtil.deinit();
+ testCluster.cleanupHDFS();
+ throw e;
+ }
+ }
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertex.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertex.java
index 9629b5b..1e6359d 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertex.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertex.java
@@ -15,6 +15,7 @@
package edu.uci.ics.pregelix.example;
import java.util.Iterator;
+import java.util.Random;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
@@ -26,29 +27,80 @@
* @author yingyib
*/
public class UpdateVertex extends Vertex<VLongWritable, Text, FloatWritable, VLongWritable> {
+ private final int MAX_VALUE_SIZE = 32768 / 2;
+ private VLongWritable msg = new VLongWritable();
+ private Text tempValue = new Text();
+ private Random rand = new Random();
@Override
public void compute(Iterator<VLongWritable> msgIterator) throws Exception {
- if (getSuperstep() < 14) {
- Text value = getVertexValue();
- String newValue = value.toString() + value.toString();
- value.set(newValue);
- activate();
- } else if (getSuperstep() >= 14 && getSuperstep() < 28) {
- Text value = getVertexValue();
- String valueStr = value.toString();
- char[] valueChars = valueStr.toCharArray();
- if (valueChars.length <= 1) {
- throw new IllegalStateException("illegal value length: " + valueChars.length);
- }
- char[] newValueChars = new char[valueChars.length / 2];
- for (int i = 0; i < newValueChars.length; i++) {
- newValueChars[i] = valueChars[i];
- }
- value.set(new String(newValueChars));
- activate();
+ if (getSuperstep() == 1) {
+ updateAndSendMsg();
+ } else if (getSuperstep() > 1 && getSuperstep() < 2) {
+ verifyVertexSize(msgIterator);
+ updateAndSendMsg();
} else {
voteToHalt();
}
}
+
+ private void verifyVertexSize(Iterator<VLongWritable> msgIterator) {
+ if (!msgIterator.hasNext()) {
+ throw new IllegalStateException("no message for vertex " + " " + getVertexId() + " " + getVertexValue());
+ }
+ /**
+ * verify the size
+ */
+ int valueSize = getVertexValue().toString().toCharArray().length;
+ long expectedValueSize = msgIterator.next().get();
+ if (valueSize != expectedValueSize) {
+ if (valueSize == -expectedValueSize) {
+ //verify fixed size update
+ char[] valueCharArray = getVertexValue().toString().toCharArray();
+ for (int i = 0; i < valueCharArray.length; i++) {
+ if (valueCharArray[i] != 'b') {
+ throw new IllegalStateException("vertex id: " + getVertexId()
+ + " has a un-propagated update in the last iteration");
+ }
+ }
+ } else {
+ throw new IllegalStateException("vertex id: " + getVertexId() + " vertex value size:" + valueSize
+ + ", expected value size:" + expectedValueSize);
+ }
+ }
+ if (msgIterator.hasNext()) {
+ throw new IllegalStateException("more than one message for vertex " + " " + getVertexId() + " "
+ + getVertexValue());
+ }
+ }
+
+ private void updateAndSendMsg() {
+ int newValueSize = rand.nextInt(MAX_VALUE_SIZE);
+ char[] charArray = new char[newValueSize];
+ for (int i = 0; i < charArray.length; i++) {
+ charArray[i] = 'a';
+ }
+ /**
+ * set a self-message
+ */
+ msg.set(newValueSize);
+ boolean fixedSize = getVertexId().get() < 2000;
+ if (fixedSize) {
+ int oldSize = getVertexValue().toString().toCharArray().length;
+ charArray = new char[oldSize];
+ for (int i = 0; i < oldSize; i++) {
+ charArray[i] = 'b';
+ }
+ msg.set(-oldSize);
+ }
+
+ /**
+ * set the vertex value
+ */
+ tempValue.set(new String(charArray));
+ setVertexValue(tempValue);
+
+ sendMsg(getVertexId(), msg);
+ activate();
+ }
}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexInputFormat.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexInputFormat.java
index e28c9e2..076337a 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexInputFormat.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexInputFormat.java
@@ -15,45 +15,60 @@
package edu.uci.ics.pregelix.example;
import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexReader;
-import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
-import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.example.io.DoubleWritable;
import edu.uci.ics.pregelix.example.io.VLongWritable;
-public class UpdateVertexInputFormat extends TextVertexInputFormat<VLongWritable, Text, FloatWritable, DoubleWritable> {
+public class UpdateVertexInputFormat extends VertexInputFormat<VLongWritable, Text, FloatWritable, DoubleWritable> {
@Override
public VertexReader<VLongWritable, Text, FloatWritable, DoubleWritable> createVertexReader(InputSplit split,
TaskAttemptContext context) throws IOException {
- return new UpdateVertexReader(textInputFormat.createRecordReader(split, context), context);
+ return new UpdateVertexReader(context);
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException, InterruptedException {
+ InputSplit split = new FileSplit(new Path("testdata"), 0, 0, new String[0]);
+ return Collections.singletonList(split);
}
}
@SuppressWarnings("rawtypes")
-class UpdateVertexReader extends TextVertexReader<VLongWritable, Text, FloatWritable, DoubleWritable> {
+class UpdateVertexReader implements VertexReader<VLongWritable, Text, FloatWritable, DoubleWritable> {
- private final static String separator = " ";
+ private final static int MAX_ID = 65536;
+ private final static int MIN_ID = -65536;
private Vertex vertex;
private VLongWritable vertexId = new VLongWritable();
+ private int currentId = MIN_ID;
+ private TaskAttemptContext context;
- public UpdateVertexReader(RecordReader<LongWritable, Text> lineRecordReader, TaskAttemptContext context) {
- super(lineRecordReader);
+ public UpdateVertexReader(TaskAttemptContext context) {
+ this.context = context;
+ }
+
+ private TaskAttemptContext getContext() {
+ return context;
}
@Override
public boolean nextVertex() throws IOException, InterruptedException {
- return getRecordReader().nextKeyValue();
+ return currentId < MAX_ID;
}
@SuppressWarnings("unchecked")
@@ -66,22 +81,32 @@
vertex.getEdges().clear();
vertex.reset();
- Text line = getRecordReader().getCurrentValue();
- String[] fields = line.toString().split(separator);
- if (fields.length > 0) {
- /**
- * set the src vertex id
- */
- long src = Long.parseLong(fields[0]);
- vertexId.set(src);
- vertex.setVertexId(vertexId);
+ /**
+ * set the src vertex id
+ */
+ vertexId.set(currentId++);
+ vertex.setVertexId(vertexId);
- /**
- * set the vertex value
- */
- vertex.setVertexValue(new Text("aaa"));
- }
+ /**
+ * set the vertex value
+ */
+ vertex.setVertexValue(new Text("aaa"));
return vertex;
}
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return 0;
+ }
}
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexOutputFormat.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexOutputFormat.java
index 5c16302..493a33e 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexOutputFormat.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexOutputFormat.java
@@ -47,9 +47,6 @@
public void writeVertex(Vertex<VLongWritable, Text, FloatWritable, ?> vertex) throws IOException,
InterruptedException {
int len = vertex.getVertexValue().toString().toCharArray().length;
- if (len != 1) {
- throw new IllegalStateException("invalid value length: " + len);
- }
getRecordWriter().write(new Text(vertex.getVertexId().toString()), new Text(Integer.toString(len)));
}
}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexTest.java
index fe5f2d1..6adc5bb 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertexTest.java
@@ -14,14 +14,13 @@
*/
package edu.uci.ics.pregelix.example;
-import junit.framework.Assert;
-
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.base.IDriver.Plan;
import edu.uci.ics.pregelix.core.driver.Driver;
import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
import edu.uci.ics.pregelix.example.util.TestCluster;
@@ -33,7 +32,7 @@
*/
public class UpdateVertexTest {
- private static String INPUT_PATH = "data/update";
+ private static String INPUT_PATH = "/data/webmap/";
private static String OUTPUT_PATH = "actual/resultcomplex";
@Test
@@ -52,9 +51,13 @@
Driver driver = new Driver(UpdateVertex.class);
testCluster.setUp();
- driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+
+ Plan[] plans = new Plan[] { Plan.INNER_JOIN, Plan.OUTER_JOIN };
+ for (Plan plan : plans) {
+ driver.runJob(job, plan, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT, false);
+ }
} catch (Exception e) {
- Assert.assertTrue(e.toString().contains("This job is going to fail"));
+ throw new IllegalStateException(e);
} finally {
testCluster.tearDown();
}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index f6857fe..2dfd071 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -74,6 +74,7 @@
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setFixedVertexValueSize(true);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
@@ -89,6 +90,7 @@
job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
job.setVertexPartitionerClass(DefaultVertexPartitioner.class);
+ job.setFixedVertexValueSize(true);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
@@ -116,6 +118,7 @@
job.setVertexInputFormatClass(TextPageRankInputFormat.class);
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setFixedVertexValueSize(true);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
@@ -190,6 +193,7 @@
job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setFixedVertexValueSize(true);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
index 65e0b30..d29b2da 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
@@ -127,6 +127,7 @@
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimulatedPageRankVertexInputFormat</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
<property><name>topology.script.number.args</name><value>100</value></property>
<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
index b50b02a..69f8154 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
@@ -128,6 +128,7 @@
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
<property><name>topology.script.number.args</name><value>100</value></property>
<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
index 217fbba..04fc686 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
@@ -1,145 +1,146 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
-<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.input.dir</name><value>file:/webmapcomplex</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>fs.checkpoint.size</name><value>67108864</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.job.name</name><value>PageRank</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>pregelix.numVertices</name><value>23</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>pregelix.checkpointHook</name><value>edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimpleSumCombiner</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexOutputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>pregelix.partitionerClass</name><value>edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>pregelix.checkpointHook</name><value>edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimpleSumCombiner</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>pregelix.partitionerClass</name><value>edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>pregelix.numVertices</name><value>23</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.job.name</name><value>PageRank</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>pregelix.incStateLength</name><value>false</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
<property><name>mapred.acls.enabled</name><value>false</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmapcomplex</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexOutputFormat</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
index 636b055..8969fd7 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
@@ -127,6 +127,7 @@
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
<property><name>topology.script.number.args</name><value>100</value></property>
<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
diff --git a/pregelix/pregelix-runtime/pom.xml b/pregelix/pregelix-runtime/pom.xml
index 6564eb0..245c0f5 100644
--- a/pregelix/pregelix-runtime/pom.xml
+++ b/pregelix/pregelix-runtime/pom.xml
@@ -141,6 +141,13 @@
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-common</artifactId>
+ <version>0.2.10-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-btree</artifactId>
<version>0.2.10-SNAPSHOT</version>
<type>jar</type>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index f3a0bb4..b1d1043 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -31,6 +31,9 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPageInternal;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.graph.Vertex;
@@ -110,7 +113,8 @@
public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
throws HyracksDataException {
this.conf = confFactory.createConfiguration(ctx);
- this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
+ //LSM index does not have in-place update
+ this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf) || BspUtils.useLSM(conf);
this.aggregator = BspUtils.createGlobalAggregator(conf);
this.aggregator.init();
@@ -262,7 +266,8 @@
}
@Override
- public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
+ throws HyracksDataException {
try {
if (vertex != null && vertex.hasUpdate()) {
if (!dynamicStateLength) {
@@ -271,12 +276,16 @@
int offset = tupleRef.getFieldStart(1);
bbos.setByteArray(data, offset);
vertex.write(output);
+
+ BTreeRangeSearchCursor btreeCursor = (BTreeRangeSearchCursor) cursor;
+ ICachedPageInternal page = (ICachedPageInternal) btreeCursor.getPage();
+ //IMPORTANT: mark the page to be dirty
+ page.markDirty();
} else {
// write the vertex id
DataOutput tbOutput = cloneUpdateTb.getDataOutput();
vertex.getVertexId().write(tbOutput);
cloneUpdateTb.addFieldEndOffset();
-
// write the vertex value
vertex.write(tbOutput);
cloneUpdateTb.addFieldEndOffset();
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/NoOpUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/NoOpUpdateFunctionFactory.java
index 88577c2..8947c01 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/NoOpUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/NoOpUpdateFunctionFactory.java
@@ -20,6 +20,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -57,7 +58,8 @@
}
@Override
- public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
+ throws HyracksDataException {
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index ca8ec01..05b4e87 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -31,6 +31,9 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPageInternal;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.graph.Vertex;
@@ -113,7 +116,8 @@
public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
throws HyracksDataException {
this.conf = confFactory.createConfiguration(ctx);
- this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
+ //LSM index does not have in-place update
+ this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf) || BspUtils.useLSM(conf);;
this.aggregator = BspUtils.createGlobalAggregator(conf);
this.aggregator.init();
@@ -255,7 +259,8 @@
}
@Override
- public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
+ throws HyracksDataException {
try {
if (vertex != null && vertex.hasUpdate()) {
if (!dynamicStateLength) {
@@ -264,6 +269,11 @@
int offset = tupleRef.getFieldStart(1);
bbos.setByteArray(data, offset);
vertex.write(output);
+
+ BTreeRangeSearchCursor btreeCursor = (BTreeRangeSearchCursor) cursor;
+ ICachedPageInternal page = (ICachedPageInternal) btreeCursor.getPage();
+ //IMPORTANT: mark the page to be dirty
+ page.markDirty();
} else {
// write the vertex id
DataOutput tbOutput = cloneUpdateTb.getDataOutput();