support heterogenous cluster
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
index a527f87..c39cba7 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
@@ -149,7 +149,7 @@
for (String counterName : RESET_COUNTERS) {
updateCounter(slave, jo, counterName);
}
-
+
for (String counterName : AGG_COUNTERS) {
updateCounter(slave, jo, counterName);
}
@@ -168,7 +168,9 @@
private long extractCounterValue(Object counterObject) {
long counterValue = 0;
- if (counterObject instanceof JSONArray) {
+ if (counterObject == null) {
+ return counterValue;
+ } else if (counterObject instanceof JSONArray) {
JSONArray jArray = (JSONArray) counterObject;
Object[] values = jArray.toArray();
for (Object value : values) {
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 2fcaf91..bef9aa9 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -725,15 +725,16 @@
public static int getRecoveryCount(Configuration conf) {
return conf.getInt(PregelixJob.RECOVERY_COUNT, 0);
}
-
+
/***
* Get enable dynamic optimization
*
- * @param conf Configuration
+ * @param conf
+ * Configuration
* @return true if enabled; otherwise false
*/
- public static boolean getEnableDynamicOptimization(Configuration conf){
- return conf.getBoolean(PregelixJob.DYNAMIC_OPTIMIZATION, true);
+ public static boolean getEnableDynamicOptimization(Configuration conf) {
+ return conf.getBoolean(PregelixJob.DYNAMIC_OPTIMIZATION, false);
}
/***
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultIterationCompleteReporterHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultIterationCompleteReporterHook.java
index 1881429..7b004d8 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultIterationCompleteReporterHook.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultIterationCompleteReporterHook.java
@@ -14,6 +14,9 @@
*/
package edu.uci.ics.pregelix.api.util;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import edu.uci.ics.pregelix.api.job.IIterationCompleteReporterHook;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -24,9 +27,11 @@
*/
public class DefaultIterationCompleteReporterHook implements IIterationCompleteReporterHook {
+ private static final Log LOG = LogFactory.getLog(DefaultIterationCompleteReporterHook.class);
+
@Override
public void completeIteration(int superstep, PregelixJob job) {
- System.out.println("iteration complete reporter for " + superstep + " job " + job);
+ LOG.debug("iteration complete reporter for " + superstep + " job " + job.getJobName());
}
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index fdb4136..7bd5fb0 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -55,6 +55,7 @@
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
import edu.uci.ics.pregelix.core.optimizer.DynamicOptimizer;
import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
+import edu.uci.ics.pregelix.core.optimizer.NoOpOptimizer;
import edu.uci.ics.pregelix.core.util.ExceptionUtilities;
import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
@@ -111,8 +112,11 @@
boolean failed = false;
int retryCount = 0;
int maxRetryCount = 3;
- jobGen = selectJobGen(planChoice, currentJob);
- IOptimizer dynamicOptimzier = new DynamicOptimizer();
+
+ IOptimizer dynamicOptimizer = BspUtils.getEnableDynamicOptimization(currentJob.getConfiguration()) == false ? new NoOpOptimizer()
+ : new DynamicOptimizer(counterContext);
+ jobGen = selectJobGen(planChoice, currentJob, dynamicOptimizer);
+ jobGen = dynamicOptimizer.optimize(jobGen, 0);
do {
try {
@@ -140,9 +144,7 @@
}
/** run loop-body jobs with dynamic optimizer if it is enabled */
- if (BspUtils.getEnableDynamicOptimization(currentJob.getConfiguration())) {
- jobGen = dynamicOptimzier.optimize(counterContext, jobGen, i);
- }
+ jobGen = dynamicOptimizer.optimize(jobGen, i);
runLoopBody(deploymentId, currentJob, jobGen, i, lastSnapshotJobIndex, lastSnapshotSuperstep,
ckpHook, failed);
runClearState(deploymentId, jobGen);
@@ -197,8 +199,8 @@
.equals(currentInputPaths[0])));
}
- private JobGen selectJobGen(Plan planChoice, PregelixJob currentJob) {
- return JobGenFactory.createJobGen(planChoice, currentJob);
+ private JobGen selectJobGen(Plan planChoice, PregelixJob currentJob, IOptimizer optimizer) {
+ return JobGenFactory.createJobGen(planChoice, currentJob, optimizer);
}
private long loadData(PregelixJob currentJob, JobGen jobGen, DeploymentId deploymentId) throws IOException,
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index c1f6aae..163e476 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -47,6 +47,7 @@
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -100,6 +101,7 @@
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ClearStateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -140,6 +142,7 @@
protected String jobId = UUID.randomUUID().toString();;
protected int frameSize = ClusterConfig.getFrameSize();
protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
+ protected IOptimizer optimizer;
private static final Map<String, String> MERGE_POLICY_PROPERTIES;
static {
@@ -150,18 +153,19 @@
protected static final String SECONDARY_INDEX_ODD = "secondary1";
protected static final String SECONDARY_INDEX_EVEN = "secondary2";
- public JobGen(PregelixJob job) {
- init(job);
- }
-
- public JobGen(PregelixJob job, String jobId) {
- if(jobId!=null){
- this.jobId = jobId;
- }
- init(job);
+ public JobGen(PregelixJob job, IOptimizer optimizer) {
+ init(job, optimizer);
}
- private void init(PregelixJob job) {
+ public JobGen(PregelixJob job, String jobId, IOptimizer optimizer) {
+ if (jobId != null) {
+ this.jobId = jobId;
+ }
+ init(job, optimizer);
+ }
+
+ private void init(PregelixJob job, IOptimizer optimizer) {
+ this.optimizer = optimizer;
conf = job.getConfiguration();
pregelixJob = job;
initJobConfiguration();
@@ -178,7 +182,7 @@
}
public void reset(PregelixJob job) {
- init(job);
+ init(job, this.optimizer);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -232,12 +236,12 @@
int[] keyFields = new int[1];
keyFields[0] = 0;
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(spec,
storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
keyFields, getIndexDataflowHelperFactory(), new TransientLocalResourceFactoryProvider(),
NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, btreeCreate);
+ setLocationConstraint(spec, btreeCreate);
spec.setFrameSize(frameSize);
return spec;
}
@@ -258,7 +262,7 @@
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
JobSpecification spec = new JobSpecification();
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
/**
* the graph file scan operator and use count constraint first, will use
@@ -277,7 +281,7 @@
String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
readSchedule, confFactory);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
/**
* construct sort operator
@@ -289,7 +293,7 @@
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameLimit, sortFields,
nkmFactory, comparatorFactories, recordDescriptor);
- ClusterConfig.setLocationConstraint(spec, sorter);
+ setLocationConstraint(spec, sorter);
/**
* construct write file operator
@@ -336,7 +340,7 @@
RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/**
* construct btree search operator
@@ -346,14 +350,14 @@
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
null, null, true, true, getIndexDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
/**
* construct write file operator
@@ -463,7 +467,7 @@
public JobSpecification generateClearState() throws HyracksException {
JobSpecification spec = new JobSpecification();
ClearStateOperatorDescriptor clearState = new ClearStateOperatorDescriptor(spec, jobId);
- ClusterConfig.setLocationConstraint(spec, clearState);
+ setLocationConstraint(spec, clearState);
spec.addRoot(clearState);
return spec;
}
@@ -477,11 +481,11 @@
protected JobSpecification dropIndex(String indexName) throws HyracksException {
JobSpecification spec = new JobSpecification();
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, indexName);
IndexDropOperatorDescriptor drop = new IndexDropOperatorDescriptor(spec, storageManagerInterface,
lcManagerProvider, fileSplitProvider, getIndexDataflowHelperFactory());
- ClusterConfig.setLocationConstraint(spec, drop);
+ setLocationConstraint(spec, drop);
spec.addRoot(drop);
spec.setFrameSize(frameSize);
return spec;
@@ -515,7 +519,7 @@
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
JobSpecification spec = new JobSpecification();
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
/**
* the graph file scan operator and use count constraint first, will use
@@ -537,7 +541,7 @@
String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
readSchedule, confFactory);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
/**
* construct sort operator
@@ -549,7 +553,7 @@
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, sortFields,
nkmFactory, comparatorFactories, recordDescriptor);
- ClusterConfig.setLocationConstraint(spec, sorter);
+ setLocationConstraint(spec, sorter);
/**
* construct tree bulk load operator
@@ -564,7 +568,7 @@
storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
sortFields, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, true, 0, false,
getIndexDataflowHelperFactory(), NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+ setLocationConstraint(spec, btreeBulkLoad);
/**
* connect operator descriptors
@@ -596,7 +600,7 @@
RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/**
* construct btree search operator
@@ -606,7 +610,7 @@
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
@@ -615,7 +619,7 @@
BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
null, null, true, true, getIndexDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
ExternalSortOperatorDescriptor sort = null;
if (!ckpointing) {
@@ -625,7 +629,7 @@
sortCmpFactories[0] = JobGenUtil.getFinalBinaryComparatorFactory(vertexIdClass);
sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields, nkmFactory, sortCmpFactories,
recordDescriptor);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
}
/**
@@ -636,7 +640,7 @@
conf, vertexIdClass.getName(), vertexClass.getName());
VertexFileWriteOperatorDescriptor writer = new VertexFileWriteOperatorDescriptor(spec, confFactory,
inputRdFactory, preHookFactory);
- ClusterConfig.setLocationConstraint(spec, writer);
+ setLocationConstraint(spec, writer);
/**
* connect operator descriptors
@@ -681,14 +685,14 @@
* construct empty tuple operator
*/
EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/**
* construct the materializing write operator
*/
MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
false, jobId, lastSuccessfulIteration + 1);
- ClusterConfig.setLocationConstraint(spec, materializeRead);
+ setLocationConstraint(spec, materializeRead);
String checkpointPath = BspUtils.getMessageCheckpointPath(conf, lastSuccessfulIteration);;
PregelixJob tmpJob = createCloneJob("State checkpointing for job " + jobId, pregelixJob);
@@ -700,7 +704,7 @@
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
conf, vertexIdClass.getName(), MsgList.class.getName());
HDFSFileWriteOperatorDescriptor hdfsWriter = new HDFSFileWriteOperatorDescriptor(spec, tmpJob, inputRdFactory);
- ClusterConfig.setLocationConstraint(spec, hdfsWriter);
+ setLocationConstraint(spec, hdfsWriter);
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, materializeRead, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, hdfsWriter, 0);
@@ -743,7 +747,7 @@
String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
HDFSReadOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(spec, recordDescriptor, tmpJob, splits,
readSchedule, new KeyValueParserFactory());
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
/** construct the sort operator to sort message states */
int[] keyFields = new int[] { 0 };
@@ -752,24 +756,24 @@
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastCheckpointedIteration, vertexIdClass);
ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, recordDescriptor, Algorithm.QUICK_SORT);
- ClusterConfig.setLocationConstraint(spec, sort);
+ setLocationConstraint(spec, sort);
/**
* construct the materializing write operator
*/
MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec,
recordDescriptor, jobId, lastCheckpointedIteration);
- ClusterConfig.setLocationConstraint(spec, materialize);
+ setLocationConstraint(spec, materialize);
/** construct runtime hook */
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
new RecoveryRuntimeHookFactory(jobId, lastCheckpointedIteration, new ConfigurationFactory(
pregelixJob.getConfiguration())));
- ClusterConfig.setLocationConstraint(spec, postSuperStep);
+ setLocationConstraint(spec, postSuperStep);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink);
+ setLocationConstraint(spec, emptySink);
/**
* connect operator descriptors
@@ -807,7 +811,7 @@
*/
List<JobSpecification> list = new ArrayList<JobSpecification>();
list.add(bulkLoadLiveVertexBTree(iteration));
- JobGen jobGen = new JobGenInnerJoin(pregelixJob, jobId);
+ JobGen jobGen = new JobGenInnerJoin(pregelixJob, jobId, optimizer);
return Pair.of(list, jobGen);
}
@@ -827,12 +831,12 @@
* construct empty tuple operator
*/
EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/**
* construct btree search and function call update operator
*/
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
@@ -852,12 +856,12 @@
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
getIndexDataflowHelperFactory(), inputRdFactory, 1, new ExtractLiveVertexIdFunctionFactory(),
preHookFactory, null, rdFinal);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
/**
* construct bulk-load index operator
*/
- IFileSplitProvider secondaryFileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
+ IFileSplitProvider secondaryFileSplitProvider = getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
int[] fieldPermutation = new int[] { 0, 1 };
int[] keyFields = new int[] { 0 };
IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
@@ -866,7 +870,7 @@
TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
storageManagerInterface, lcManagerProvider, secondaryFileSplitProvider, typeTraits, indexCmpFactories,
fieldPermutation, keyFields, DEFAULT_BTREE_FILL_FACTOR, getIndexDataflowHelperFactory());
- ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+ setLocationConstraint(spec, btreeBulkLoad);
/** connect job spec */
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
@@ -876,4 +880,25 @@
return spec;
}
+ /**
+ * set the location constraint for operators
+ *
+ * @param spec
+ * @param operator
+ */
+ public void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator) {
+ optimizer.setOptimizedLocationConstraints(spec, operator);
+ }
+
+ /**
+ * get the file split provider
+ *
+ * @param jobId
+ * @param indexName
+ * @return the IFileSplitProvider instance
+ */
+ public IFileSplitProvider getFileSplitProvider(String jobId, String indexName){
+ return optimizer.getOptimizedFileSplitProvider(jobId, indexName);
+ }
+
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java
index ed580de..cbc9c81 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java
@@ -17,26 +17,27 @@
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.core.base.IDriver.Plan;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
public class JobGenFactory {
- public static JobGen createJobGen(Plan planChoice, PregelixJob currentJob) {
+ public static JobGen createJobGen(Plan planChoice, PregelixJob currentJob, IOptimizer optimizer) {
JobGen jobGen = null;
switch (planChoice) {
case INNER_JOIN:
- jobGen = new JobGenInnerJoin(currentJob);
+ jobGen = new JobGenInnerJoin(currentJob, optimizer);
break;
case OUTER_JOIN:
- jobGen = new JobGenOuterJoin(currentJob);
+ jobGen = new JobGenOuterJoin(currentJob, optimizer);
break;
case OUTER_JOIN_SORT:
- jobGen = new JobGenOuterJoinSort(currentJob);
+ jobGen = new JobGenOuterJoinSort(currentJob, optimizer);
break;
case OUTER_JOIN_SINGLE_SORT:
- jobGen = new JobGenOuterJoinSingleSort(currentJob);
+ jobGen = new JobGenOuterJoinSingleSort(currentJob, optimizer);
break;
default:
- jobGen = new JobGenInnerJoin(currentJob);
+ jobGen = new JobGenInnerJoin(currentJob, optimizer);
}
return jobGen;
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 7bdb069..f838fb8 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -63,6 +63,7 @@
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -93,12 +94,12 @@
public class JobGenInnerJoin extends JobGen {
private static final Logger LOGGER = Logger.getLogger(JobGen.class.getName());
- public JobGenInnerJoin(PregelixJob job) {
- super(job);
+ public JobGenInnerJoin(PregelixJob job, IOptimizer optimizer) {
+ super(job, optimizer);
}
- public JobGenInnerJoin(PregelixJob job, String jobId) {
- super(job, jobId);
+ public JobGenInnerJoin(PregelixJob job, String jobId, IOptimizer optimizer) {
+ super(job, jobId, optimizer);
}
protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
@@ -113,18 +114,18 @@
* construct empty tuple operator
*/
EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/** construct runtime hook */
RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PreSuperStepRuntimeHookFactory(jobId, confFactory));
- ClusterConfig.setLocationConstraint(spec, preSuperStep);
+ setLocationConstraint(spec, preSuperStep);
/**
* construct drop index operator
*/
- IFileSplitProvider secondaryFileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider secondaryFileSplitProvider = getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
/**
* construct btree search and function call update operator
@@ -159,7 +160,7 @@
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
getIndexDataflowHelperFactory(), inputRdFactory, 6, new StartComputeUpdateFunctionFactory(confFactory),
preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
/**
* termination state write operator
@@ -188,7 +189,7 @@
TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
storageManagerInterface, lcManagerProvider, secondaryFileSplitProvider, typeTraits, indexCmpFactories,
fieldPermutation, keyFields, DEFAULT_BTREE_FILL_FACTOR, getIndexDataflowHelperFactory());
- ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+ setLocationConstraint(spec, btreeBulkLoad);
/**
* construct local sort operator
@@ -199,7 +200,7 @@
.getClass());
ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
- ClusterConfig.setLocationConstraint(spec, localSort);
+ setLocationConstraint(spec, localSort);
/**
* construct local pre-clustered group-by operator
@@ -208,7 +209,7 @@
false, false);
ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
- ClusterConfig.setLocationConstraint(spec, localGby);
+ setLocationConstraint(spec, localGby);
/**
* construct global group-by operator
@@ -217,25 +218,25 @@
conf, true, true);
ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
- ClusterConfig.setLocationConstraint(spec, globalGby);
+ setLocationConstraint(spec, globalGby);
/**
* construct the materializing write operator
*/
MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
jobId, iteration);
- ClusterConfig.setLocationConstraint(spec, materialize);
+ setLocationConstraint(spec, materialize);
/**
* do pre- & post- super step
*/
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PostSuperStepRuntimeHookFactory(jobId));
- ClusterConfig.setLocationConstraint(spec, postSuperStep);
+ setLocationConstraint(spec, postSuperStep);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink);
+ setLocationConstraint(spec, emptySink);
/**
* add the insert operator to insert vertexes
@@ -244,7 +245,7 @@
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, insertOp);
+ setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
@@ -254,15 +255,15 @@
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, deleteOp);
+ setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink3);
+ setLocationConstraint(spec, emptySink3);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink4);
+ setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
@@ -330,7 +331,7 @@
* construct empty tuple operator
*/
EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/**
* construct pre-superstep
@@ -338,20 +339,20 @@
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PreSuperStepRuntimeHookFactory(jobId, confFactory));
- ClusterConfig.setLocationConstraint(spec, preSuperStep);
+ setLocationConstraint(spec, preSuperStep);
/**
* construct the materializing write operator
*/
MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
true, jobId, iteration);
- ClusterConfig.setLocationConstraint(spec, materializeRead);
+ setLocationConstraint(spec, materializeRead);
/**
* construct the index-set-union operator
*/
String readFile = iteration % 2 == 0 ? SECONDARY_INDEX_ODD : SECONDARY_INDEX_EVEN;
- IFileSplitProvider secondaryFileSplitProviderRead = ClusterConfig.getFileSplitProvider(jobId, readFile);
+ IFileSplitProvider secondaryFileSplitProviderRead = getFileSplitProvider(jobId, readFile);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
@@ -359,12 +360,12 @@
IndexNestedLoopJoinOperatorDescriptor setUnion = new IndexNestedLoopJoinOperatorDescriptor(spec, rdFinal,
storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, typeTraits,
comparatorFactories, true, keyFields, keyFields, true, true, getIndexDataflowHelperFactory(), true);
- ClusterConfig.setLocationConstraint(spec, setUnion);
+ setLocationConstraint(spec, setUnion);
/**
* construct index-join-function-update operator
*/
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
VLongWritable.class.getName());
RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
@@ -379,7 +380,7 @@
JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
getIndexDataflowHelperFactory(), inputRdFactory, 6, new ComputeUpdateFunctionFactory(confFactory),
preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
- ClusterConfig.setLocationConstraint(spec, join);
+ setLocationConstraint(spec, join);
/**
* construct bulk-load index operator
@@ -389,12 +390,12 @@
indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
WritableComparator.get(vertexIdClass).getClass());
String writeFile = iteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
- IFileSplitProvider secondaryFileSplitProviderWrite = ClusterConfig.getFileSplitProvider(jobId, writeFile);
+ IFileSplitProvider secondaryFileSplitProviderWrite = getFileSplitProvider(jobId, writeFile);
TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderWrite, typeTraits,
indexCmpFactories, fieldPermutation, keyFields, DEFAULT_BTREE_FILL_FACTOR,
getIndexDataflowHelperFactory());
- ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+ setLocationConstraint(spec, btreeBulkLoad);
/**
* construct local sort operator
@@ -405,7 +406,7 @@
.getClass());
ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
- ClusterConfig.setLocationConstraint(spec, localSort);
+ setLocationConstraint(spec, localSort);
/**
* construct local pre-clustered group-by operator
@@ -414,7 +415,7 @@
false, false);
ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
- ClusterConfig.setLocationConstraint(spec, localGby);
+ setLocationConstraint(spec, localGby);
/**
* construct global group-by operator
@@ -423,23 +424,23 @@
conf, true, true);
ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
- ClusterConfig.setLocationConstraint(spec, globalGby);
+ setLocationConstraint(spec, globalGby);
/**
* construct the materializing write operator
*/
MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
jobId, iteration);
- ClusterConfig.setLocationConstraint(spec, materialize);
+ setLocationConstraint(spec, materialize);
/** construct runtime hook */
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PostSuperStepRuntimeHookFactory(jobId));
- ClusterConfig.setLocationConstraint(spec, postSuperStep);
+ setLocationConstraint(spec, postSuperStep);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink);
+ setLocationConstraint(spec, emptySink);
/**
* termination state write operator
@@ -464,7 +465,7 @@
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, insertOp);
+ setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
@@ -474,15 +475,15 @@
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, deleteOp);
+ setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink3);
+ setLocationConstraint(spec, emptySink3);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink4);
+ setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
@@ -596,7 +597,7 @@
String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
HDFSReadOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(spec, recordDescriptor, tmpJob, splits,
readSchedule, new KeyValueParserFactory());
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
/** construct the sort operator to sort message states */
int[] keyFields = new int[] { 0 };
@@ -605,7 +606,7 @@
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration, vertexIdClass);
ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, recordDescriptor, Algorithm.QUICK_SORT);
- ClusterConfig.setLocationConstraint(spec, sort);
+ setLocationConstraint(spec, sort);
/**
* construct bulk-load index operator
@@ -618,12 +619,12 @@
indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration + 1, WritableComparator
.get(vertexIdClass).getClass());
String writeFile = lastSuccessfulIteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
- IFileSplitProvider secondaryFileSplitProviderWrite = ClusterConfig.getFileSplitProvider(jobId, writeFile);
+ IFileSplitProvider secondaryFileSplitProviderWrite = getFileSplitProvider(jobId, writeFile);
TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderWrite, typeTraits,
indexCmpFactories, fieldPermutation, new int[] { 0 }, DEFAULT_BTREE_FILL_FACTOR,
getIndexDataflowHelperFactory());
- ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+ setLocationConstraint(spec, btreeBulkLoad);
/**
* connect operator descriptors
@@ -648,7 +649,7 @@
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
Class<? extends Writable> msgListClass = MsgList.class;
String readFile = lastSuccessfulIteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
- IFileSplitProvider secondaryFileSplitProviderRead = ClusterConfig.getFileSplitProvider(jobId, readFile);
+ IFileSplitProvider secondaryFileSplitProviderRead = getFileSplitProvider(jobId, readFile);
JobSpecification spec = new JobSpecification();
/**
* construct empty tuple operator
@@ -663,7 +664,7 @@
RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/**
* construct btree search operator
@@ -681,7 +682,7 @@
storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, typeTraits,
comparatorFactories, null, null, null, true, true, getIndexDataflowHelperFactory(), false,
NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
/**
* construct write file operator
@@ -689,7 +690,7 @@
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
conf, vertexIdClass.getName(), MsgList.class.getName());
HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, job, inputRdFactory);
- ClusterConfig.setLocationConstraint(spec, writer);
+ setLocationConstraint(spec, writer);
/**
* connect operator descriptors
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 68e6706..39a56bf 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -41,7 +41,7 @@
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
-import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -69,12 +69,12 @@
public class JobGenOuterJoin extends JobGen {
- public JobGenOuterJoin(PregelixJob job) {
- super(job);
+ public JobGenOuterJoin(PregelixJob job, IOptimizer optimizer) {
+ super(job, optimizer);
}
-
- public JobGenOuterJoin(PregelixJob job, String jobId) {
- super(job, jobId);
+
+ public JobGenOuterJoin(PregelixJob job, String jobId, IOptimizer optimizer) {
+ super(job, jobId, optimizer);
}
@Override
@@ -90,12 +90,12 @@
* construct empty tuple operator
*/
EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/** construct runtime hook */
RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PreSuperStepRuntimeHookFactory(jobId, confFactory));
- ClusterConfig.setLocationConstraint(spec, preSuperStep);
+ setLocationConstraint(spec, preSuperStep);
/**
* construct btree search function update operator
@@ -104,7 +104,7 @@
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);;
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
@@ -129,7 +129,7 @@
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
getIndexDataflowHelperFactory(), inputRdFactory, 5, new StartComputeUpdateFunctionFactory(confFactory),
preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
/**
* construct local sort operator
@@ -140,7 +140,7 @@
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
- ClusterConfig.setLocationConstraint(spec, localSort);
+ setLocationConstraint(spec, localSort);
/**
* construct local pre-clustered group-by operator
@@ -149,7 +149,7 @@
false, false);
ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
- ClusterConfig.setLocationConstraint(spec, localGby);
+ setLocationConstraint(spec, localGby);
/**
* construct global group-by operator
@@ -160,22 +160,22 @@
conf, true, true);
ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
- ClusterConfig.setLocationConstraint(spec, globalGby);
+ setLocationConstraint(spec, globalGby);
/**
* construct the materializing write operator
*/
MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
jobId, iteration);
- ClusterConfig.setLocationConstraint(spec, materialize);
+ setLocationConstraint(spec, materialize);
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PostSuperStepRuntimeHookFactory(jobId));
- ClusterConfig.setLocationConstraint(spec, postSuperStep);
+ setLocationConstraint(spec, postSuperStep);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink2 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink2);
+ setLocationConstraint(spec, emptySink2);
/**
* termination state write operator
@@ -202,7 +202,7 @@
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, insertOp);
+ setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
@@ -212,14 +212,14 @@
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, deleteOp);
+ setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink4);
+ setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
/** connect all operators **/
@@ -286,7 +286,7 @@
* construct empty tuple operator
*/
EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/**
* construct pre-superstep hook
@@ -294,19 +294,19 @@
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PreSuperStepRuntimeHookFactory(jobId, confFactory));
- ClusterConfig.setLocationConstraint(spec, preSuperStep);
+ setLocationConstraint(spec, preSuperStep);
/**
* construct the materializing write operator
*/
MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
true, jobId, iteration);
- ClusterConfig.setLocationConstraint(spec, materializeRead);
+ setLocationConstraint(spec, materializeRead);
/**
* construct index join function update operator
*/
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
@@ -329,7 +329,7 @@
getIndexDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
rdPartialAggregate, rdInsert, rdDelete);
- ClusterConfig.setLocationConstraint(spec, join);
+ setLocationConstraint(spec, join);
/**
* construct local sort operator
@@ -339,7 +339,7 @@
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
- ClusterConfig.setLocationConstraint(spec, localSort);
+ setLocationConstraint(spec, localSort);
/**
* construct local pre-clustered group-by operator
@@ -348,7 +348,7 @@
false, false);
ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
- ClusterConfig.setLocationConstraint(spec, localGby);
+ setLocationConstraint(spec, localGby);
/**
* construct global group-by operator
@@ -357,23 +357,23 @@
conf, true, true);
ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
- ClusterConfig.setLocationConstraint(spec, globalGby);
+ setLocationConstraint(spec, globalGby);
/**
* construct the materializing write operator
*/
MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
jobId, iteration);
- ClusterConfig.setLocationConstraint(spec, materialize);
+ setLocationConstraint(spec, materialize);
/** construct runtime hook */
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PostSuperStepRuntimeHookFactory(jobId));
- ClusterConfig.setLocationConstraint(spec, postSuperStep);
+ setLocationConstraint(spec, postSuperStep);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink);
+ setLocationConstraint(spec, emptySink);
/**
* termination state write operator
@@ -399,7 +399,7 @@
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, insertOp);
+ setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
@@ -409,15 +409,15 @@
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, deleteOp);
+ setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink3);
+ setLocationConstraint(spec, emptySink3);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink4);
+ setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 3e4b213..c10e6c2 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -41,7 +41,7 @@
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
-import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -69,12 +69,12 @@
public class JobGenOuterJoinSingleSort extends JobGen {
- public JobGenOuterJoinSingleSort(PregelixJob job) {
- super(job);
+ public JobGenOuterJoinSingleSort(PregelixJob job, IOptimizer optimizer) {
+ super(job, optimizer);
}
-
- public JobGenOuterJoinSingleSort(PregelixJob job, String jobId) {
- super(job, jobId);
+
+ public JobGenOuterJoinSingleSort(PregelixJob job, String jobId, IOptimizer optimizer) {
+ super(job, jobId, optimizer);
}
@Override
@@ -90,12 +90,12 @@
* construct empty tuple operator
*/
EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/** construct runtime hook */
RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PreSuperStepRuntimeHookFactory(jobId, confFactory));
- ClusterConfig.setLocationConstraint(spec, preSuperStep);
+ setLocationConstraint(spec, preSuperStep);
/**
* construct btree search operator
@@ -104,7 +104,7 @@
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
@@ -132,7 +132,7 @@
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
getIndexDataflowHelperFactory(), inputRdFactory, 5, new StartComputeUpdateFunctionFactory(confFactory),
preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
/**
* construct global sort operator
@@ -144,7 +144,7 @@
.getClass());
ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
- ClusterConfig.setLocationConstraint(spec, globalSort);
+ setLocationConstraint(spec, globalSort);
/**
* construct global group-by operator
@@ -155,22 +155,22 @@
conf, true, false);
ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
- ClusterConfig.setLocationConstraint(spec, globalGby);
+ setLocationConstraint(spec, globalGby);
/**
* construct the materializing write operator
*/
MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
jobId, iteration);
- ClusterConfig.setLocationConstraint(spec, materialize);
+ setLocationConstraint(spec, materialize);
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PostSuperStepRuntimeHookFactory(jobId));
- ClusterConfig.setLocationConstraint(spec, postSuperStep);
+ setLocationConstraint(spec, postSuperStep);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink2 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink2);
+ setLocationConstraint(spec, emptySink2);
/**
* termination state write operator
@@ -196,7 +196,7 @@
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, insertOp);
+ setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
@@ -206,15 +206,15 @@
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, deleteOp);
+ setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink3);
+ setLocationConstraint(spec, emptySink3);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink4);
+ setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
@@ -277,7 +277,7 @@
* construct empty tuple operator
*/
EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/**
* construct pre-superstep hook
@@ -285,19 +285,19 @@
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PreSuperStepRuntimeHookFactory(jobId, confFactory));
- ClusterConfig.setLocationConstraint(spec, preSuperStep);
+ setLocationConstraint(spec, preSuperStep);
/**
* construct the materializing write operator
*/
MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
true, jobId, iteration);
- ClusterConfig.setLocationConstraint(spec, materializeRead);
+ setLocationConstraint(spec, materializeRead);
/**
* construct index join function update operator
*/
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
@@ -320,7 +320,7 @@
getIndexDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
rdPartialAggregate, rdInsert, rdDelete);
- ClusterConfig.setLocationConstraint(spec, join);
+ setLocationConstraint(spec, join);
/**
* construct global sort operator
@@ -331,7 +331,7 @@
.getClass());
ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
- ClusterConfig.setLocationConstraint(spec, globalSort);
+ setLocationConstraint(spec, globalSort);
/**
* construct global group-by operator
@@ -340,23 +340,23 @@
conf, true, false);
ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
- ClusterConfig.setLocationConstraint(spec, globalGby);
+ setLocationConstraint(spec, globalGby);
/**
* construct the materializing write operator
*/
MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
jobId, iteration);
- ClusterConfig.setLocationConstraint(spec, materialize);
+ setLocationConstraint(spec, materialize);
/** construct runtime hook */
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PostSuperStepRuntimeHookFactory(jobId));
- ClusterConfig.setLocationConstraint(spec, postSuperStep);
+ setLocationConstraint(spec, postSuperStep);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink);
+ setLocationConstraint(spec, emptySink);
/**
* termination state write operator
@@ -379,7 +379,7 @@
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, insertOp);
+ setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
@@ -389,15 +389,15 @@
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, deleteOp);
+ setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink3);
+ setLocationConstraint(spec, emptySink3);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink4);
+ setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 7ca771c..953d82c 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -41,7 +41,7 @@
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
-import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -69,8 +69,8 @@
public class JobGenOuterJoinSort extends JobGen {
- public JobGenOuterJoinSort(PregelixJob job) {
- super(job);
+ public JobGenOuterJoinSort(PregelixJob job, IOptimizer optimizer) {
+ super(job, optimizer);
}
@Override
@@ -86,12 +86,12 @@
* construct empty tuple operator
*/
EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/** construct runtime hook */
RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PreSuperStepRuntimeHookFactory(jobId, confFactory));
- ClusterConfig.setLocationConstraint(spec, preSuperStep);
+ setLocationConstraint(spec, preSuperStep);
/**
* construct btree search function update operator
@@ -100,7 +100,7 @@
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
@@ -125,7 +125,7 @@
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
getIndexDataflowHelperFactory(), inputRdFactory, 5, new StartComputeUpdateFunctionFactory(confFactory),
preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
/**
* construct local sort operator
@@ -137,7 +137,7 @@
.getClass());
ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
- ClusterConfig.setLocationConstraint(spec, localSort);
+ setLocationConstraint(spec, localSort);
/**
* construct local pre-clustered group-by operator
@@ -146,14 +146,14 @@
false, false);
ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
- ClusterConfig.setLocationConstraint(spec, localGby);
+ setLocationConstraint(spec, localGby);
/**
* construct global sort operator
*/
ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
- ClusterConfig.setLocationConstraint(spec, globalSort);
+ setLocationConstraint(spec, globalSort);
/**
* construct global group-by operator
@@ -164,22 +164,22 @@
conf, true, true);
ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
- ClusterConfig.setLocationConstraint(spec, globalGby);
+ setLocationConstraint(spec, globalGby);
/**
* construct the materializing write operator
*/
MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
jobId, iteration);
- ClusterConfig.setLocationConstraint(spec, materialize);
+ setLocationConstraint(spec, materialize);
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PostSuperStepRuntimeHookFactory(jobId));
- ClusterConfig.setLocationConstraint(spec, postSuperStep);
+ setLocationConstraint(spec, postSuperStep);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink2 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink2);
+ setLocationConstraint(spec, emptySink2);
/**
* termination state write operator
@@ -206,7 +206,7 @@
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, insertOp);
+ setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
@@ -216,15 +216,15 @@
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, deleteOp);
+ setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink3);
+ setLocationConstraint(spec, emptySink3);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink4);
+ setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
/** connect all operators **/
@@ -287,7 +287,7 @@
* construct empty tuple operator
*/
EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/**
* construct pre-superstep hook
@@ -295,19 +295,19 @@
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PreSuperStepRuntimeHookFactory(jobId, confFactory));
- ClusterConfig.setLocationConstraint(spec, preSuperStep);
+ setLocationConstraint(spec, preSuperStep);
/**
* construct the materializing write operator
*/
MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
true, jobId, iteration);
- ClusterConfig.setLocationConstraint(spec, materializeRead);
+ setLocationConstraint(spec, materializeRead);
/**
* construct index join function update operator
*/
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
@@ -330,7 +330,7 @@
getIndexDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
rdPartialAggregate, rdInsert, rdDelete);
- ClusterConfig.setLocationConstraint(spec, join);
+ setLocationConstraint(spec, join);
/**
* construct local sort operator
@@ -341,7 +341,7 @@
.getClass());
ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
- ClusterConfig.setLocationConstraint(spec, localSort);
+ setLocationConstraint(spec, localSort);
/**
* construct local pre-clustered group-by operator
@@ -350,14 +350,14 @@
false, false);
ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
- ClusterConfig.setLocationConstraint(spec, localGby);
+ setLocationConstraint(spec, localGby);
/**
* construct global sort operator
*/
ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
- ClusterConfig.setLocationConstraint(spec, globalSort);
+ setLocationConstraint(spec, globalSort);
/**
* construct global group-by operator
@@ -366,23 +366,23 @@
conf, true, true);
ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
- ClusterConfig.setLocationConstraint(spec, globalGby);
+ setLocationConstraint(spec, globalGby);
/**
* construct the materializing write operator
*/
MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
jobId, iteration);
- ClusterConfig.setLocationConstraint(spec, materialize);
+ setLocationConstraint(spec, materialize);
/** construct runtime hook */
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PostSuperStepRuntimeHookFactory(jobId));
- ClusterConfig.setLocationConstraint(spec, postSuperStep);
+ setLocationConstraint(spec, postSuperStep);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink);
+ setLocationConstraint(spec, emptySink);
/**
* termination state write operator
@@ -408,7 +408,7 @@
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, insertOp);
+ setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
@@ -418,15 +418,15 @@
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, deleteOp);
+ setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink3);
+ setLocationConstraint(spec, emptySink3);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink4);
+ setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
index 5c1a4b8..244d624 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -84,7 +84,7 @@
}
/**
- * get file split provider
+ * get file split provider, for test only
*
* @param jobId
* @return
@@ -175,26 +175,6 @@
* @param operator
* @throws HyracksDataException
*/
- public static void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator)
- throws HyracksException {
- int count = 0;
- String[] locations = new String[NCs.length * stores.length];
- for (String nc : NCs) {
- for (int i = 0; i < stores.length; i++) {
- locations[count] = nc;
- count++;
- }
- }
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, locations);
- }
-
- /**
- * set location constraint
- *
- * @param spec
- * @param operator
- * @throws HyracksDataException
- */
public static void setCountConstraint(JobSpecification spec, IOperatorDescriptor operator) throws HyracksException {
int count = NCs.length * stores.length;
PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, count);
@@ -255,11 +235,35 @@
}
return locations;
}
+
+ /**
+ * set the default location constraint
+ *
+ * @param spec
+ * @param operator
+ * @throws HyracksDataException
+ */
+ public static void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator)
+ throws HyracksException {
+ int count = 0;
+ String[] locations = new String[NCs.length * stores.length];
+ for (String nc : NCs) {
+ for (int i = 0; i < stores.length; i++) {
+ locations[count] = nc;
+ count++;
+ }
+ }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, locations);
+ }
public static String[] getNCNames() {
return NCs;
}
+ public static String[] getStores() {
+ return stores;
+ }
+
public static void addToBlackListNodes(Collection<String> nodes) {
blackListNodes.addAll(nodes);
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java
index 01fc81b..064ca42 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java
@@ -15,14 +15,105 @@
package edu.uci.ics.pregelix.core.optimizer;
-import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.io.IntWritable;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.client.stats.Counters;
+import edu.uci.ics.hyracks.client.stats.IClusterCounterContext;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.pregelix.core.jobgen.JobGen;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
public class DynamicOptimizer implements IOptimizer {
+ private IClusterCounterContext counterContext;
+ private Map<String, IntWritable> machineToDegreeOfParallelism = new HashMap<String, IntWritable>();
+ private int dop = 0;
+
+ public DynamicOptimizer(IClusterCounterContext counterContext) {
+ this.counterContext = counterContext;
+ }
+
@Override
- public JobGen optimize(ICounterContext counterContext, JobGen jobGen, int iteration) {
- return jobGen;
+ public JobGen optimize(JobGen jobGen, int iteration) {
+ try {
+ initializeLoadPerMachine();
+ return jobGen;
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public void setOptimizedLocationConstraints(JobSpecification spec, IOperatorDescriptor operator) {
+ try {
+ String[] constraints = new String[dop];
+ int index = 0;
+ for (Entry<String, IntWritable> entry : machineToDegreeOfParallelism.entrySet()) {
+ String loc = entry.getKey();
+ IntWritable count = machineToDegreeOfParallelism.get(loc);
+ for (int j = 0; j < count.get(); j++) {
+ constraints[index++] = loc;
+ }
+ }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, constraints);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public IFileSplitProvider getOptimizedFileSplitProvider(String jobId, String indexName) {
+ FileSplit[] fileSplits = new FileSplit[dop];
+ String[] stores = ClusterConfig.getStores();
+ int splitIndex = 0;
+ for (Entry<String, IntWritable> entry : machineToDegreeOfParallelism.entrySet()) {
+ String ncName = entry.getKey();
+ IntWritable count = machineToDegreeOfParallelism.get(ncName);
+ for (int j = 0; j < count.get(); j++) {
+ //cycles stores, each machine has the number of stores = the number of cores
+ int storeCursor = j % stores.length;
+ String st = stores[storeCursor];
+ FileSplit split = new FileSplit(ncName, st + File.separator + ncName + "-data" + File.separator + jobId
+ + File.separator + indexName + (j / stores.length));
+ fileSplits[splitIndex++] = split;
+ }
+ }
+ return new ConstantFileSplitProvider(fileSplits);
+ }
+
+ /**
+ * initialize the load-per-machine map
+ *
+ * @return the degree of parallelism
+ * @throws HyracksException
+ */
+ private int initializeLoadPerMachine() throws HyracksException {
+ machineToDegreeOfParallelism.clear();
+ String[] locationConstraints = ClusterConfig.getLocationConstraint();
+ for (String loc : locationConstraints) {
+ machineToDegreeOfParallelism.put(loc, new IntWritable(0));
+ }
+ dop = 0;
+ for (Entry<String, IntWritable> entry : machineToDegreeOfParallelism.entrySet()) {
+ String loc = entry.getKey();
+ //reserve one core for heartbeat
+ int load = (int) counterContext.getCounter(Counters.NUM_PROCESSOR, false).get() - 1;
+ IntWritable count = machineToDegreeOfParallelism.get(loc);
+ count.set(load);
+ dop += load;
+ }
+ return dop;
}
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java
index b5913c4..c8856aa 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java
@@ -15,11 +15,17 @@
package edu.uci.ics.pregelix.core.optimizer;
-import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.pregelix.core.jobgen.JobGen;
public interface IOptimizer {
- public JobGen optimize(ICounterContext counterContext, JobGen jobGen, int iteration);
-
+ public JobGen optimize(JobGen jobGen, int iteration);
+
+ public void setOptimizedLocationConstraints(JobSpecification spec, IOperatorDescriptor operator);
+
+ public IFileSplitProvider getOptimizedFileSplitProvider(String jobId, String indexName);
+
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/NoOpOptimizer.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/NoOpOptimizer.java
new file mode 100644
index 0000000..cd0ca37
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/NoOpOptimizer.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.core.optimizer;
+
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.pregelix.core.jobgen.JobGen;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+
+public class NoOpOptimizer implements IOptimizer {
+
+ @Override
+ public JobGen optimize(JobGen jobGen, int iteration) {
+ return jobGen;
+ }
+
+ @Override
+ public void setOptimizedLocationConstraints(JobSpecification spec, IOperatorDescriptor operator) {
+ try {
+ ClusterConfig.setLocationConstraint(spec, operator);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public IFileSplitProvider getOptimizedFileSplitProvider(String jobId, String indexName) {
+ try {
+ return ClusterConfig.getFileSplitProvider(jobId, indexName);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
index b4e17b6..5855fd3 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
@@ -35,6 +35,8 @@
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoin;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
+import edu.uci.ics.pregelix.core.optimizer.NoOpOptimizer;
import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
import edu.uci.ics.pregelix.example.PageRankVertex;
import edu.uci.ics.pregelix.example.PageRankVertex.SimulatedPageRankVertexInputFormat;
@@ -83,7 +85,9 @@
FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
FileUtils.cleanDirectory(new File(EXPECT_RESULT_DIR));
FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
- giraphTestJobGen = new JobGenOuterJoin(job);
+
+ IOptimizer dynamicOptimizer = new NoOpOptimizer();
+ giraphTestJobGen = new JobGenOuterJoin(job, dynamicOptimizer);
}
private void cleanupStores() throws IOException {
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index fcfeb95..0a5e891 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -25,7 +25,6 @@
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
import edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner;
-import edu.uci.ics.pregelix.api.util.HadoopCountersGlobalAggregateHook;
import edu.uci.ics.pregelix.example.ConnectedComponentsVertex;
import edu.uci.ics.pregelix.example.ConnectedComponentsVertex.SimpleConnectedComponentsVertexOutputFormat;
import edu.uci.ics.pregelix.example.EarlyTerminationVertex;
@@ -54,8 +53,6 @@
import edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingAggregator;
import edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingVertex;
import edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingVertex.TriangleCountingVertexOutputFormat;
-import edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingWithAggregateHadoopCountersVertex.TriangleHadoopCountersAggregator;
-import edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingWithAggregateHadoopCountersVertex;
public class JobGenerator {
private static String outputBase = "src/test/resources/jobs/";
@@ -82,6 +79,7 @@
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
job.setCheckpointHook(ConservativeCheckpointHook.class);
+ job.setEnableDynamicOptimization(true);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -98,6 +96,7 @@
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
job.setCheckpointHook(ConservativeCheckpointHook.class);
+ job.setEnableDynamicOptimization(true);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -126,6 +125,7 @@
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
job.setCheckpointHook(ConservativeCheckpointHook.class);
+ job.setEnableDynamicOptimization(true);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -140,6 +140,7 @@
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job.setEnableDynamicOptimization(true);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -155,6 +156,7 @@
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
+ job.setEnableDynamicOptimization(true);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -228,6 +230,7 @@
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job.setEnableDynamicOptimization(true);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -240,19 +243,7 @@
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH3);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
- job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
- }
-
- private static void generateTriangleCountingWithHadoopCountersJob(String jobName, String outputPath) throws IOException {
- PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(TriangleCountingWithAggregateHadoopCountersVertex.class);
- job.addGlobalAggregatorClass(TriangleCountingAggregator.class);
- job.setCounterAggregatorClass(TriangleCountingWithAggregateHadoopCountersVertex.TriangleHadoopCountersAggregator.class);
- job.setVertexInputFormatClass(TextTriangleCountingInputFormat.class);
- job.setVertexOutputFormatClass(TriangleCountingVertexOutputFormat.class);
- job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
- FileInputFormat.setInputPaths(job, HDFS_INPUTPATH3);
- FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
+ job.setDynamicVertexValueSize(true);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -391,7 +382,6 @@
private static void genTriangleCounting() throws IOException {
generateTriangleCountingJob("Triangle Counting", outputBase + "TriangleCounting.xml");
-// generateTriangleCountingWithHadoopCountersJob("Triangle Counting", outputBase + "TriangleCountingWithHadoopCounters.xml");
}
private static void genMaximalClique() throws IOException {
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/Record.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/Record.java
new file mode 100644
index 0000000..10f5829
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/Record.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.util;
+
+@SuppressWarnings("rawtypes")
+public class Record implements Comparable {
+
+ private String recordText;
+
+ public Record(String text) {
+ recordText = text;
+ }
+
+ @Override
+ public int hashCode() {
+ return recordText.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return recordText;
+ }
+
+ @Override
+ public int compareTo(Object o) {
+ if (!(o instanceof Record)) {
+ throw new IllegalStateException("uncomparable items");
+ }
+ Record record = (Record) o;
+ boolean equal = equalStrings(recordText, record.recordText);
+ if (equal) {
+ return 0;
+ } else {
+ return recordText.compareTo(record.recordText);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof Record)) {
+ return false;
+ }
+ Record record = (Record) o;
+ return equalStrings(recordText, record.recordText);
+ }
+
+ private boolean equalStrings(String s1, String s2) {
+ String[] rowsOne = s1.split("\n");
+ String[] rowsTwo = s2.split("\n");
+
+ if (rowsOne.length != rowsTwo.length)
+ return false;
+
+ for (int i = 0; i < rowsOne.length; i++) {
+ String row1 = rowsOne[i];
+ String row2 = rowsTwo[i];
+
+ if (row1.equals(row2))
+ continue;
+
+ boolean spaceOrTab = false;
+ spaceOrTab = row1.contains(" ");
+ String[] fields1 = spaceOrTab ? row1.split(" ") : row1.split("\t");
+ String[] fields2 = spaceOrTab ? row2.split(" ") : row2.split("\t");
+
+ for (int j = 0; j < fields1.length; j++) {
+ if (j >= fields2.length) {
+ return false;
+ }
+ if (fields1[j].equals(fields2[j])) {
+ continue;
+ } else if (fields1[j].indexOf('.') < 0) {
+ return false;
+ } else {
+ Double double1 = Double.parseDouble(fields1[j]);
+ Double double2 = Double.parseDouble(fields2[j]);
+ float float1 = (float) double1.doubleValue();
+ float float2 = (float) double2.doubleValue();
+
+ if (Math.abs(float1 - float2) < 1.0e-7)
+ continue;
+ else {
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java
index fe07cf5..e6347ed 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java
@@ -16,83 +16,82 @@
import java.io.BufferedReader;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.junit.Assert;
public class TestUtils {
+ private static final String PREFIX = "part";
+
public static void compareWithResultDir(File expectedFileDir, File actualFileDir) throws Exception {
- String[] fileNames = expectedFileDir.list();
- for (String fileName : fileNames) {
- compareWithResult(new File(expectedFileDir, fileName), new File(actualFileDir, fileName));
- }
+ Collection<Record> expectedRecords = loadRecords(expectedFileDir);
+ Collection<Record> actualRecords = loadRecords(actualFileDir);
+ boolean equal = collectionEqual(expectedRecords, actualRecords);
+ Assert.assertTrue(equal);
}
- public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
- BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
- BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
- String lineExpected, lineActual;
- int num = 1;
- try {
- while ((lineExpected = readerExpected.readLine()) != null) {
- lineActual = readerActual.readLine();
- if (lineActual == null) {
- throw new Exception("Actual result changed at line " + num + ":\n< " + lineExpected + "\n> ");
+ public static boolean collectionEqual(Collection<Record> c1, Collection<Record> c2) {
+ for (Record r1 : c1) {
+ boolean exists = false;
+ for (Record r2 : c2) {
+ if (r1.equals(r2)) {
+ exists = true;
+ break;
}
- if (!equalStrings(lineExpected, lineActual)) {
- throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
- + lineActual);
- }
- ++num;
}
- lineActual = readerActual.readLine();
- if (lineActual != null) {
- throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineActual);
+ if (!exists) {
+ return false;
}
- } finally {
- readerExpected.close();
- readerActual.close();
}
- }
-
- private static boolean equalStrings(String s1, String s2) {
- String[] rowsOne = s1.split("\n");
- String[] rowsTwo = s2.split("\n");
-
- if (rowsOne.length != rowsTwo.length)
- return false;
-
- for (int i = 0; i < rowsOne.length; i++) {
- String row1 = rowsOne[i];
- String row2 = rowsTwo[i];
-
- if (row1.equals(row2))
- continue;
-
- boolean spaceOrTab = false;
- spaceOrTab = row1.contains(" ");
- String[] fields1 = spaceOrTab ? row1.split(" ") : row1.split("\t");
- String[] fields2 = spaceOrTab ? row2.split(" ") : row2.split("\t");
-
- for (int j = 0; j < fields1.length; j++) {
- if (fields1[j].equals(fields2[j])) {
- continue;
- } else if (fields1[j].indexOf('.') < 0) {
- return false;
- } else {
- Double double1 = Double.parseDouble(fields1[j]);
- Double double2 = Double.parseDouble(fields2[j]);
- float float1 = (float) double1.doubleValue();
- float float2 = (float) double2.doubleValue();
-
- if (Math.abs(float1 - float2) < 1.0e-7)
- continue;
- else {
- return false;
- }
+ for (Record r2 : c2) {
+ boolean exists = false;
+ for (Record r1 : c1) {
+ if (r2.equals(r1)) {
+ exists = true;
+ break;
}
}
+ if (!exists) {
+ return false;
+ }
}
return true;
}
+ public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
+ Collection<Record> expectedRecords = new ArrayList<Record>();
+ Collection<Record> actualRecords = new ArrayList<Record>();
+ populateResultFile(expectedRecords, expectedFile);
+ populateResultFile(actualRecords, actualFile);
+ boolean equal = expectedRecords.equals(actualRecords);
+ Assert.assertTrue(equal);
+ }
+
+ private static Collection<Record> loadRecords(File dir) throws Exception {
+ String[] fileNames = dir.list();
+ Collection<Record> records = new ArrayList<Record>();
+ for (String fileName : fileNames) {
+ if (fileName.startsWith(PREFIX)) {
+ File file = new File(dir, fileName);
+ populateResultFile(records, file);
+ }
+ }
+ return records;
+ }
+
+ private static void populateResultFile(Collection<Record> records, File file) throws FileNotFoundException,
+ IOException {
+ BufferedReader reader = new BufferedReader(new FileReader(file));
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ records.add(new Record(line));
+ }
+ reader.close();
+ }
+
}
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
index 591446c..3091c83 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
@@ -80,6 +80,7 @@
<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>pregelix.dynamicopt</name><value>true</value></property>
<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
<property><name>mapred.queue.names</name><value>default</value></property>
@@ -125,7 +126,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
index 32c2a1a..b6af65c 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
@@ -29,7 +29,6 @@
<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex$SimpleMinCombiner</value></property>
<property><name>mapred.output.compress</name><value>false</value></property>
<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
@@ -86,6 +85,7 @@
<property><name>fs.checkpoint.period</name><value>3600</value></property>
<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>pregelix.dynamicopt</name><value>true</value></property>
<property><name>fs.s3.maxRetries</name><value>4</value></property>
<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml b/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml
index d06068d..d908da8 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml
@@ -124,7 +124,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml b/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml
index 01d85a5..d5ec8f1 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml
@@ -124,7 +124,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml
index 072ea9e..b4c42e6 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml
@@ -124,7 +124,7 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.maximalclique.TextMaximalCliqueInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator,class edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml
index 3ae367d..6cf075b 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml
@@ -124,7 +124,7 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.maximalclique.TextMaximalCliqueInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator,class edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
index 6cb617f..49e2e6f 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
@@ -125,7 +125,7 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.maximalclique.TextMaximalCliqueInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator,class edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml
index 76d6e87..8316c64 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml
@@ -125,7 +125,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml
index 1f52250..a894ccd 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml
@@ -124,7 +124,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml
index 9d2c9e1..a9f8925 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml
@@ -1,146 +1,145 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>pregelix.updateIntensive</name><value>true</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>pregelix.framesize</name><value>2048</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>pregelix.numVertices</name><value>20</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.job.name</name><value>Message Overflow LSM</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>pregelix.incStateLength</name><value>true</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.output.dir</name><value>/result</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
<property><name>mapred.input.dir</name><value>file:/webmap</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex$SimpleMessageOverflowVertexOutputFormat</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Message Overflow LSM</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>pregelix.updateIntensive</name><value>true</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.framesize</name><value>2048</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex$SimpleMessageOverflowVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
index f1c27ca..d29b2da 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
@@ -125,7 +125,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimulatedPageRankVertexInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>pregelix.incStateLength</name><value>false</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
index dfb4e71..6fe04fb 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
@@ -30,7 +30,6 @@
<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimpleSumCombiner</value></property>
<property><name>mapred.output.compress</name><value>false</value></property>
<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
@@ -86,6 +85,7 @@
<property><name>fs.checkpoint.period</name><value>3600</value></property>
<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>pregelix.dynamicopt</name><value>true</value></property>
<property><name>fs.s3.maxRetries</name><value>4</value></property>
<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
index 49a6e20..d0f9759 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
@@ -30,7 +30,6 @@
<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimpleSumCombiner</value></property>
<property><name>mapred.output.compress</name><value>false</value></property>
<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
@@ -87,6 +86,7 @@
<property><name>fs.checkpoint.period</name><value>3600</value></property>
<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>pregelix.dynamicopt</name><value>true</value></property>
<property><name>fs.s3.maxRetries</name><value>4</value></property>
<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
index 789ea32..0173390 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
@@ -80,6 +80,7 @@
<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>pregelix.dynamicopt</name><value>true</value></property>
<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
<property><name>mapred.queue.names</name><value>default</value></property>
@@ -125,7 +126,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
index 796b1d1..a7a38e0 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
@@ -80,6 +80,7 @@
<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>pregelix.dynamicopt</name><value>true</value></property>
<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
<property><name>mapred.queue.names</name><value>default</value></property>
@@ -125,7 +126,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>pregelix.incStateLength</name><value>false</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
index 8834ead..225429a 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
@@ -1,146 +1,145 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityCombiner</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>pregelix.numVertices</name><value>23</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.job.name</name><value>Reachibility</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>ReachibilityVertex.sourceId</name><value>1</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
-<property><name>ReachibilityVertex.destId</name><value>10</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
<property><name>mapred.input.dir</name><value>file:/webmapcomplex</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityVertexOutputFormat</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Reachibility</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>23</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>ReachibilityVertex.destId</name><value>10</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityCombiner</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>ReachibilityVertex.sourceId</name><value>1</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml
index 234dbf9..bd9da92 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml
@@ -1,146 +1,145 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityCombiner</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>pregelix.numVertices</name><value>23</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.job.name</name><value>Reachibility</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>ReachibilityVertex.sourceId</name><value>1</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
-<property><name>ReachibilityVertex.destId</name><value>25</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
<property><name>mapred.input.dir</name><value>file:/webmapcomplex</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityVertexOutputFormat</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Reachibility</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>23</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>ReachibilityVertex.destId</name><value>25</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityCombiner</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>ReachibilityVertex.sourceId</name><value>1</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
index b1c57c5..9acd7bc 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
@@ -126,7 +126,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimulatedPageRankVertexInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
index f1d2dc6..6c25575 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
@@ -126,7 +126,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextShortestPathsInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml b/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml
index 951ac6f..80cea20 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml
@@ -123,9 +123,10 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.trianglecounting.TextTriangleCountingInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator,class edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingAggregator</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
<property><name>topology.script.number.args</name><value>100</value></property>
<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>