Merge branch 'zheilbron/hyracks_msr_demo' of https://code.google.com/p/hyracks into zheilbron/hyracks_msr_demo
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
index 965ade7..eddc4df 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
@@ -25,6 +25,6 @@
public IIOManager getIOManager();
public ByteBuffer allocateFrame() throws HyracksDataException;
-
+
public void deallocateFrames(int frameCount);
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
index 3ea81ef..97ce664 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
@@ -46,5 +46,4 @@
// TODO Auto-generated method stub
}
-
}
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
index 46487eb..c39cba7 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
@@ -149,7 +149,7 @@
for (String counterName : RESET_COUNTERS) {
updateCounter(slave, jo, counterName);
}
-
+
for (String counterName : AGG_COUNTERS) {
updateCounter(slave, jo, counterName);
}
@@ -168,7 +168,9 @@
private long extractCounterValue(Object counterObject) {
long counterValue = 0;
- if (counterObject instanceof JSONArray) {
+ if (counterObject == null) {
+ return counterValue;
+ } else if (counterObject instanceof JSONArray) {
JSONArray jArray = (JSONArray) counterObject;
Object[] values = jArray.toArray();
for (Object value : values) {
@@ -256,4 +258,4 @@
}
-}
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 20075ff..fa9b6b3 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -113,7 +113,7 @@
public ByteBuffer allocateFrame() throws HyracksDataException {
return joblet.allocateFrame();
}
-
+
@Override
public void deallocateFrames(int frameCount) {
joblet.deallocateFrames(frameCount);
@@ -241,7 +241,8 @@
sem.acquire();
final int cIdx = i;
executor.execute(new Runnable() {
- public void run() {
+ @Override
+ public void run() {
if (aborted) {
return;
}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
index b52a54e..33e8e034 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
@@ -32,7 +32,7 @@
return new IValueParser() {
@Override
public void parse(char[] buffer, int start, int length, DataOutput out) throws HyracksDataException {
- int n = 0;
+ long n = 0;
int sign = 1;
int i = 0;
boolean pre = true;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 60e9c40..86d738f 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -33,7 +33,8 @@
import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
public class InMemoryHashJoin {
-
+
+ private final IHyracksTaskContext ctx;
private final List<ByteBuffer> buffers;
private final FrameTupleAccessor accessorBuild;
private final ITuplePartitionComputer tpcBuild;
@@ -61,7 +62,8 @@
ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1,
ISerializableTable table, IPredicateEvaluator predEval, boolean reverse) throws HyracksDataException {
- this.tableSize = tableSize;
+ this.ctx = ctx;
+ this.tableSize = tableSize;
this.table = table;
storedTuplePointer = new TuplePointer();
buffers = new ArrayList<ByteBuffer>();
@@ -146,6 +148,9 @@
if (appender.getTupleCount() > 0) {
flushFrame(outBuffer, writer);
}
+ int nFrames = buffers.size();
+ buffers.clear();
+ ctx.deallocateFrames(nFrames);
}
private void flushFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
index 979ef59..994b0ef 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -139,16 +139,7 @@
boolean prdEval = evaluatePredicate(i, j);
if (c == 0 && prdEval) {
matchFound = true;
- if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
- flushFrame(outBuffer, writer);
- appender.reset(outBuffer, true);
- if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
- int tSize = accessorOuter.getTupleEndOffset(i) - accessorOuter.getTupleStartOffset(i)
- + accessorInner.getTupleEndOffset(j) - accessorInner.getTupleStartOffset(j);
- throw new HyracksDataException("Record size (" + tSize + ") larger than frame size ("
- + appender.getBuffer().capacity() + ")");
- }
- }
+ appendToResults(i, j, writer);
}
}
@@ -177,7 +168,35 @@
return ( (predEvaluator == null) || predEvaluator.evaluate(accessorOuter, tIx1, accessorInner, tIx2) );
}
}
-
+
+ private void appendToResults(int outerTupleId, int innerTupleId, IFrameWriter writer) throws HyracksDataException{
+ if(!isReversed){
+ if (!appender.appendConcat(accessorOuter, outerTupleId, accessorInner, innerTupleId)) {
+ flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.appendConcat(accessorOuter, outerTupleId, accessorInner, innerTupleId)) {
+ int tSize = accessorOuter.getTupleEndOffset(outerTupleId) - accessorOuter.getTupleStartOffset(outerTupleId)
+ + accessorInner.getTupleEndOffset(innerTupleId) - accessorInner.getTupleStartOffset(innerTupleId);
+ throw new HyracksDataException("Record size (" + tSize + ") larger than frame size ("
+ + appender.getBuffer().capacity() + ")");
+ }
+ }
+ }
+ else{ //Role Reversal Optimization is triggered
+ if (!appender.appendConcat(accessorInner, innerTupleId, accessorOuter, outerTupleId)) {
+ flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.appendConcat(accessorInner, innerTupleId, accessorOuter, outerTupleId)) {
+ int tSize = accessorInner.getTupleEndOffset(innerTupleId) - accessorInner.getTupleStartOffset(innerTupleId)+
+ accessorOuter.getTupleEndOffset(outerTupleId) - accessorOuter.getTupleStartOffset(outerTupleId);
+ throw new HyracksDataException("Record size (" + tSize + ") larger than frame size ("
+ + appender.getBuffer().capacity() + ")");
+ }
+ }
+ }
+
+ }
+
public void closeCache() throws HyracksDataException {
if (runFileWriter != null) {
runFileWriter.close();
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 4e9376d..276f60f 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -19,7 +19,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.BitSet;
-import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -120,6 +119,12 @@
private final boolean isLeftOuter;
private final INullWriterFactory[] nullWriterFactories1;
+ //Flags added for test purpose
+ private static boolean skipInMemoryHJ = false;
+ private static boolean forceNLJ = false;
+ private static boolean forceRR = false;
+
+
private static final Logger LOGGER = Logger.getLogger(OptimizedHybridHashJoinOperatorDescriptor.class.getName());
public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
@@ -305,7 +310,7 @@
public void close() throws HyracksDataException {
state.hybridHJ.closeBuild();
ctx.setStateObject(state);
- LOGGER.log(Level.FINE, "OptimizedHybridHashJoin closed its build phase");
+ LOGGER.fine("OptimizedHybridHashJoin closed its build phase");
}
@Override
@@ -414,63 +419,60 @@
int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid);
int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
int beforeMax = (bSize > pSize) ? bSize : pSize;
- joinPartitionPair(state.hybridHJ, bReader, pReader, pid, beforeMax, 1);
-
+ joinPartitionPair(state.hybridHJ, bReader, pReader, pid, beforeMax, 1, false);
}
writer.close();
}
private void joinPartitionPair(OptimizedHybridHashJoin ohhj, RunFileReader buildSideReader,
- RunFileReader probeSideReader, int pid, int beforeMax, int level) throws HyracksDataException {
+ RunFileReader probeSideReader, int pid, int beforeMax, int level, boolean wasReversed) throws HyracksDataException {
ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerFamily(probeKeys,
hashFunctionGeneratorFactories).createPartitioner(level);
ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys,
hashFunctionGeneratorFactories).createPartitioner(level);
- long buildPartSize = ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize();
- long probePartSize = ohhj.getProbePartitionSize(pid) / ctx.getFrameSize();
+ long buildPartSize = wasReversed ? (ohhj.getProbePartitionSize(pid) / ctx.getFrameSize()) : (ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize());
+ long probePartSize = wasReversed ? (ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize()) : (ohhj.getProbePartitionSize(pid) / ctx.getFrameSize());
- LOGGER.log(Level.FINE,"Joining Partition Pairs (pid "+pid+") - (level "+level+") - BuildSize:\t"+buildPartSize+"\tProbeSize:\t"+probePartSize+" - MemForJoin "+(state.memForJoin));
-
+ LOGGER.fine("\n>>>Joining Partition Pairs (pid "+pid+") - (level "+level+") - wasReversed "+wasReversed+" - BuildSize:\t"+buildPartSize+"\tProbeSize:\t"+probePartSize+" - MemForJoin "+(state.memForJoin)+" - LeftOuter is "+isLeftOuter);
+
//Apply in-Mem HJ if possible
- if ((buildPartSize < state.memForJoin) || (probePartSize < state.memForJoin)) {
- int tabSize = -1;
-
- if (isLeftOuter || buildPartSize < probePartSize) {
- tabSize = ohhj.getBuildPartitionSizeInTup(pid);
-
- if (tabSize == 0) {
- throw new HyracksDataException(
- "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
+ if(!skipInMemoryHJ){
+ if ((buildPartSize < state.memForJoin) || (probePartSize < state.memForJoin && !isLeftOuter)) {
+ int tabSize = -1;
+ if (!forceRR && (isLeftOuter || (buildPartSize < probePartSize)) ) { //Case 1.1 - InMemHJ (wout Role-Reversal)
+ LOGGER.fine("\t>>>Case 1.1 (IsLeftOuter || buildSize<probe) AND ApplyInMemHJ - [Level "+level+"]");
+ tabSize = wasReversed ? ohhj.getProbePartitionSizeInTup(pid) : ohhj.getBuildPartitionSizeInTup(pid);
+ if (tabSize == 0) {
+ throw new HyracksDataException(
+ "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
+ }
+ //Build Side is smaller
+ applyInMemHashJoin(buildKeys, probeKeys, tabSize, probeRd, buildRd, hpcRep0, hpcRep1,
+ buildSideReader, probeSideReader, false, pid); //checked-confirmed
+ } else { //Case 1.2 - InMemHJ with Role Reversal
+ LOGGER.fine("\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJ WITH RoleReversal - [Level "+level+"]");
+ tabSize = wasReversed ? ohhj.getBuildPartitionSizeInTup(pid) : ohhj.getProbePartitionSizeInTup(pid);
+ if (tabSize == 0) {
+ throw new HyracksDataException(
+ "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
+ }
+ //Probe Side is smaller
+ applyInMemHashJoin(probeKeys, buildKeys, tabSize, buildRd, probeRd, hpcRep1, hpcRep0,
+ probeSideReader, buildSideReader, true, pid); //checked-confirmed
}
- //Build Side is smaller
- applyInMemHashJoin(buildKeys, probeKeys, tabSize, probeRd, buildRd, hpcRep0, hpcRep1,
- buildSideReader, probeSideReader, false, pid);
-
- }
-
- else { //Role Reversal
- tabSize = ohhj.getProbePartitionSizeInTup(pid);
- if (tabSize == 0) {
- throw new HyracksDataException(
- "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
- }
- //Probe Side is smaller
-
- applyInMemHashJoin(probeKeys, buildKeys, tabSize, buildRd, probeRd, hpcRep1, hpcRep0,
- probeSideReader, buildSideReader, true, pid);
}
}
//Apply (Recursive) HHJ
else {
+ LOGGER.fine("\t>>>Case 2. ApplyRecursiveHHJ - [Level "+level+"]");
OptimizedHybridHashJoin rHHj;
- if (isLeftOuter || buildPartSize < probePartSize) { //Build Side is smaller
- LOGGER.log(Level.FINE,"\tApply RecursiveHHJ for (pid "+pid+") - (level "+level+") [buildSize is smaller]");
+ if (!forceRR && (isLeftOuter || buildPartSize < probePartSize)) { //Case 2.1 - Recursive HHJ (wout Role-Reversal)
+ LOGGER.fine("\t\t>>>Case 2.1 - RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "+level+"]");
int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor,
nPartitions);
-
rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL,
- probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc, predEvaluator);
+ probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc, predEvaluator); //checked-confirmed
buildSideReader.open();
rHHj.initBuild();
@@ -495,7 +497,8 @@
: maxAfterProbeSize;
BitSet rPStatus = rHHj.getPartitinStatus();
- if (afterMax < NLJ_SWITCH_THRESHOLD * beforeMax) {
+ if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
+ LOGGER.fine("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "+level+"]");
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -504,12 +507,12 @@
continue;
}
- joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1));
+ joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1), false); //checked-confirmed
}
- } else { //Switch to NLJ (Further recursion seems not to be useful)
- LOGGER.log(Level.FINE,"\tSwitched to NLJ for (pid "+pid+") - (level "+level+") (reverse false) [coming from buildSize was smaller]");
- for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+ } else { //Case 2.1.2 - Switch to NLJ
+ LOGGER.fine("\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "+level+"]");
+ for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -520,21 +523,21 @@
int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
if (isLeftOuter || buildSideInTups < probeSideInTups) {
- applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rbrfw, rprfw,
- nljComparator0, false);
- } else {
applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rprfw, rbrfw,
- nljComparator1, false);
+ nljComparator0, false); //checked-modified
+ } else {
+ applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rbrfw, rprfw,
+ nljComparator1, true); //checked-modified
}
}
}
- } else { //Role Reversal (Probe Side is smaller)
- LOGGER.log(Level.FINE,"\tApply RecursiveHHJ for (pid "+pid+") - (level "+level+") WITH REVERSAL [probeSize is smaller]");
+ } else { //Case 2.2 - Recursive HHJ (with Role-Reversal)
+ LOGGER.fine("\t\t>>>Case 2.2. - RecursiveHHJ WITH RoleReversal - [Level "+level+"]");
int n = getNumberOfPartitions(state.memForJoin, (int) probePartSize, fudgeFactor,
nPartitions);
rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, BUILD_REL, PROBE_REL,
- buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc, predEvaluator);
+ buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc, predEvaluator); //checked-confirmed
rHHj.setIsReversed(true); //Added to use predicateEvaluator (for inMemoryHashJoin) correctly
probeSideReader.open();
@@ -557,7 +560,8 @@
: maxAfterProbeSize;
BitSet rPStatus = rHHj.getPartitinStatus();
- if (afterMax < NLJ_SWITCH_THRESHOLD * beforeMax) {
+ if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.2.1 - Keep applying HHJ
+ LOGGER.fine("\t\t>>>Case 2.2.1 - KEEP APPLYING RecursiveHHJ WITH RoleReversal - [Level "+level+"]");
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -566,10 +570,10 @@
continue;
}
- joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1));
+ joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1), true); //checked-confirmed
}
- } else { //Switch to NLJ (Further recursion seems not to be effective)
- LOGGER.log(Level.FINE,"\tSwitched to NLJ for (pid "+pid+") - (level "+level+") (reverse true) [coming from probeSize was smaller]");
+ } else { //Case 2.2.2 - Switch to NLJ
+ LOGGER.fine("\t\t>>>Case 2.2.2 - SWITCHED to NLJ RecursiveHHJ WITH RoleReversal - [Level "+level+"]");
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -582,10 +586,10 @@
long probeSideSize = rprfw.getFileSize();
if (buildSideSize > probeSideSize) {
applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rbrfw, rprfw,
- nljComparator1, true);
+ nljComparator0, true); //checked-modified
} else {
applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rprfw, rbrfw,
- nljComparator0, true);
+ nljComparator1, true); //checked-modified
}
}
}
@@ -599,7 +603,6 @@
RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepLarger,
ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader, boolean reverse, int pid)
throws HyracksDataException {
- LOGGER.log(Level.FINE,"\t(pid "+pid+") - applyInMemHashJoin (reversal "+reverse+")");
ISerializableTable table = new SerializableHashTable(tabSize, ctx);
InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(
ctx.getFrameSize(), probeRDesc), hpcRepLarger, new FrameTupleAccessor(ctx.getFrameSize(),
@@ -617,7 +620,7 @@
}
bReader.close();
rPartbuff.clear();
- // probe
+ // probe
pReader.open();
while (pReader.nextFrame(rPartbuff)) {
joiner.join(rPartbuff, writer);
@@ -630,10 +633,11 @@
private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljComparator, boolean reverse)
throws HyracksDataException {
+ NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
+ new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize, predEvaluator, isLeftOuter, nullWriters1);
+ nlj.setIsReversed(reverse);
- NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
- new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize, predEvaluator, false, null);
-
+
ByteBuffer cacheBuff = ctx.allocateFrame();
innerReader.open();
while (innerReader.nextFrame(cacheBuff)) {
@@ -660,4 +664,16 @@
return op;
}
}
-}
+
+ public void setSkipInMemHJ(boolean b){
+ skipInMemoryHJ = b;
+ }
+
+ public void setForceNLJ(boolean b){
+ forceNLJ = b;
+ }
+
+ public void setForceRR(boolean b){
+ forceRR = (!isLeftOuter && b);
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/SerializableHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/SerializableHashTable.java
index a245c9d..1ea8393 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/SerializableHashTable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/SerializableHashTable.java
@@ -136,12 +136,14 @@
@Override
public void close() {
- for (int i = 0; i < headers.length; i++)
+ int nFrames = contents.size();
+ for (int i = 0; i < headers.length; i++)
headers[i] = null;
contents.clear();
frameCurrentIndex.clear();
tupleCount = 0;
currentLargestFrameIndex = 0;
+ ctx.deallocateFrames(nFrames);
}
private void insertNewEntry(IntSerDeBuffer header, int headerOffset, int entryCapacity, TuplePointer pointer)
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
index f18099b..2e4c812 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
@@ -116,4 +116,4 @@
// TODO Auto-generated method stub
}
-}
\ No newline at end of file
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
index 5ea6413..c1473dc 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
@@ -31,7 +31,7 @@
* 2. a final phase which aggregates all partially aggregated states
*
* @param <I extends Writable> vertex identifier type
- * @param <E extends Writable> vertex value type
+ * @param <V extends Writable> vertex value type
* @param <E extends Writable> edge type
* @param <M extends Writable> message type
* @param <P extends Writable>
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index b2ac4a5..8135479 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -234,19 +234,44 @@
/**
* Vote to halt. Once all vertex vote to halt and no more messages, a
* Pregelix job will terminate.
+ *
+ * The state of the current vertex value is saved.
*/
public final void voteToHalt() {
halt = true;
updated = true;
}
+
+ /**
+ * Vote to halt. Once all vertex vote to halt and no more messages, a
+ * Pregelix job will terminate.
+ *
+ * @param update whether or not to save the vertex value
+ */
+ public final void voteToHalt(boolean update) {
+ halt = true;
+ updated = update;
+ }
/**
* Activate a halted vertex such that it is alive again.
+ *
+ * The state of the current vertex value is saved.
*/
public final void activate() {
halt = false;
updated = true;
}
+
+ /**
+ * Activate a halted vertex such that it is alive again.
+ *
+ * @param update whether or not to save the vertex value
+ */
+ public final void activate(boolean update) {
+ halt = false;
+ updated = update;
+ }
/**
* @return the vertex is halted (true) or not (false)
@@ -610,10 +635,6 @@
* Terminate the current partition where the current vertex stays in.
* This will immediately take effect and the upcoming vertice in the
* same partition cannot be processed.
- * <<<<<<< HEAD
- *
- =======
- * >>>>>>> master
*/
protected final void terminatePartition() {
voteToHalt();
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/IIterationCompleteReporterHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/IIterationCompleteReporterHook.java
new file mode 100644
index 0000000..5254b8c
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/IIterationCompleteReporterHook.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.job;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Interface for an object whose {@link completeIteration} method is called at the end
+ * of each pregelix job iteration.
+ *
+ * This class can be used to extend/replace the simple reporting in pregelix or to
+ * implement aggregation across iterations of a job (rather than having the values
+ * reset after each iteration).
+ * One object is created for each job.
+ *
+ * @author jake.biesinger
+ */
+public interface IIterationCompleteReporterHook {
+
+ public void completeIteration(int superstep, PregelixJob job) throws HyracksDataException;
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 06e79de..f2c9c84 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -16,6 +16,7 @@
package edu.uci.ics.pregelix.api.job;
import java.io.IOException;
+import java.lang.reflect.Modifier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
@@ -26,7 +27,9 @@
import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.util.HadoopCountersGlobalAggregateHook;
import edu.uci.ics.pregelix.api.util.GlobalCountAggregator;
+import edu.uci.ics.pregelix.api.util.HadoopCountersAggregator;
/**
* This class represents a Pregelix job.
@@ -82,8 +85,16 @@
public static final String CKP_INTERVAL = "pregelix.ckpinterval";
/** the dynamic optimization */
public static final String DYNAMIC_OPTIMIZATION = "pregelix.dynamicopt";
+ /** the iteration complete reporter hook */
+ public static final String ITERATION_COMPLETE_CLASS = "pregelix.iterationCompleteReporter";
/** comma */
public static final String COMMA_STR = ",";
+ /** period */
+ public static final String PERIOD_STR = ".";
+ /** the names of the aggregator classes active for all vertex types */
+ public static final String[] DEFAULT_GLOBAL_AGGREGATOR_CLASSES = { GlobalCountAggregator.class.getName() };
+ /** The name of an optional class that aggregates all Vertexes into mapreduce.Counters */
+ public static final String COUNTERS_AGGREGATOR_CLASS = "pregelix.aggregatedCountersClass";
/**
* Construct a Pregelix job from an existing configuration
@@ -93,7 +104,6 @@
*/
public PregelixJob(Configuration conf) throws IOException {
super(conf);
- this.addGlobalAggregatorClass(GlobalCountAggregator.class);
}
/**
@@ -105,7 +115,6 @@
*/
public PregelixJob(String jobName) throws IOException {
super(new Configuration(), jobName);
- this.addGlobalAggregatorClass(GlobalCountAggregator.class);
}
/**
@@ -119,7 +128,6 @@
*/
public PregelixJob(Configuration conf, String jobName) throws IOException {
super(conf, jobName);
- this.addGlobalAggregatorClass(GlobalCountAggregator.class);
}
/**
@@ -262,16 +270,35 @@
final public void setCheckpointingInterval(int ckpInterval) {
getConfiguration().setInt(CKP_INTERVAL, ckpInterval);
}
-
+
+ /**
+ * Users can provide an IIterationCompleteReporterHook implementation to perform actions
+ * at the end of each iteration
+ *
+ * @param reporterClass
+ */
+ final public void setIterationCompleteReporterHook(Class<? extends IIterationCompleteReporterHook> reporterClass) {
+ getConfiguration().setClass(ITERATION_COMPLETE_CLASS, reporterClass, IIterationCompleteReporterHook.class);
+ }
+
/**
* Indicate if dynamic optimization is enabled
*
* @param dynamicOpt
*/
- final public void setEnableDynamicOptimization(boolean dynamicOpt){
+ final public void setEnableDynamicOptimization(boolean dynamicOpt) {
getConfiguration().setBoolean(DYNAMIC_OPTIMIZATION, dynamicOpt);
}
+ final public void setCounterAggregatorClass(Class<? extends HadoopCountersAggregator<?, ?, ?, ?, ?>> aggClass) {
+ if (Modifier.isAbstract(aggClass.getModifiers())) {
+ throw new IllegalArgumentException("Aggregate class must be a concrete class, not an abstract one! (was " + aggClass.getName() + ")");
+ }
+ getConfiguration().setClass(COUNTERS_AGGREGATOR_CLASS, aggClass, HadoopCountersAggregator.class);
+ addGlobalAggregatorClass(aggClass);
+ setIterationCompleteReporterHook(HadoopCountersGlobalAggregateHook.class);
+ }
+
@Override
public String toString() {
return getJobName();
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index f44942f..bef9aa9 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -15,14 +15,23 @@
package edu.uci.ics.pregelix.api.util;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.util.ReflectionUtils;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.MessageCombiner;
import edu.uci.ics.pregelix.api.graph.MsgList;
@@ -33,6 +42,7 @@
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
import edu.uci.ics.pregelix.api.io.WritableSizable;
import edu.uci.ics.pregelix.api.job.ICheckpointHook;
+import edu.uci.ics.pregelix.api.job.IIterationCompleteReporterHook;
import edu.uci.ics.pregelix.api.job.PregelixJob;
/**
@@ -40,6 +50,10 @@
* them.
*/
public class BspUtils {
+
+ public static final String TMP_DIR = "/tmp/";
+ private static final String COUNTERS_VALUE_ON_ITERATION = ".counters.valueOnIter.";
+ private static final String COUNTERS_LAST_ITERATION_COMPLETED = ".counters.lastIterCompleted";
/**
* Get the user's subclassed {@link VertexInputFormat}.
@@ -123,9 +137,17 @@
public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable, P extends Writable, F extends Writable> List<Class<? extends GlobalAggregator<I, V, E, M, P, F>>> getGlobalAggregatorClasses(
Configuration conf) {
String aggStrs = conf.get(PregelixJob.GLOBAL_AGGREGATOR_CLASS);
- String[] classnames = aggStrs.split(PregelixJob.COMMA_STR);
+ String[] classnames;
+ if (aggStrs == null) {
+ classnames = new String[0];
+ } else {
+ classnames = aggStrs.split(PregelixJob.COMMA_STR);
+ }
try {
List<Class<? extends GlobalAggregator<I, V, E, M, P, F>>> classes = new ArrayList<Class<? extends GlobalAggregator<I, V, E, M, P, F>>>();
+ for (String defaultClass : PregelixJob.DEFAULT_GLOBAL_AGGREGATOR_CLASSES) {
+ classes.add((Class<? extends GlobalAggregator<I, V, E, M, P, F>>) conf.getClassByName(defaultClass));
+ }
for (int i = 0; i < classnames.length; i++) {
classes.add((Class<? extends GlobalAggregator<I, V, E, M, P, F>>) conf.getClassByName(classnames[i]));
}
@@ -563,6 +585,24 @@
}
/**
+ * Create a hook that indicates an iteration is complete
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user aggregate value
+ */
+ public static IIterationCompleteReporterHook createIterationCompleteHook(Configuration conf) {
+ Class<? extends IIterationCompleteReporterHook> itCompleteClass = getIterationCompleteReporterHookClass(conf);
+ try {
+ return itCompleteClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("createVertexPartitioner: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("createVertexPartitioner: Illegally accessed", e);
+ }
+ }
+
+ /**
* Get the user's subclassed vertex partitioner class.
*
* @param conf
@@ -587,6 +627,20 @@
}
/**
+ * Get the user's subclassed iteration complete reporter hook class.
+ *
+ * @param conf
+ * Configuration to check
+ * @return The user defined vertex iteration complete reporter class
+ */
+ @SuppressWarnings("unchecked")
+ public static <V extends IIterationCompleteReporterHook> Class<V> getIterationCompleteReporterHookClass(
+ Configuration conf) {
+ return (Class<V>) conf.getClass(PregelixJob.ITERATION_COMPLETE_CLASS,
+ DefaultIterationCompleteReporterHook.class, IIterationCompleteReporterHook.class);
+ }
+
+ /**
* Get the job configuration parameter whether the vertex states will increase dynamically
*
* @param conf
@@ -671,15 +725,16 @@
public static int getRecoveryCount(Configuration conf) {
return conf.getInt(PregelixJob.RECOVERY_COUNT, 0);
}
-
+
/***
* Get enable dynamic optimization
*
- * @param conf Configuration
+ * @param conf
+ * Configuration
* @return true if enabled; otherwise false
*/
- public static boolean getEnableDynamicOptimization(Configuration conf){
- return conf.getBoolean(PregelixJob.DYNAMIC_OPTIMIZATION, true);
+ public static boolean getEnableDynamicOptimization(Configuration conf) {
+ return conf.getBoolean(PregelixJob.DYNAMIC_OPTIMIZATION, false);
}
/***
@@ -691,4 +746,115 @@
public static int getCheckpointingInterval(Configuration conf) {
return conf.getInt(PregelixJob.CKP_INTERVAL, -1);
}
+
+ public static Writable readGlobalAggregateValue(Configuration conf, String jobId, String aggClassName)
+ throws HyracksDataException {
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ String pathStr = TMP_DIR + jobId + "agg";
+ Path path = new Path(pathStr);
+ FSDataInputStream input = dfs.open(path);
+ int numOfAggs = createFinalAggregateValues(conf).size();
+ for (int i = 0; i < numOfAggs; i++) {
+ String aggName = input.readUTF();
+ Writable agg = createFinalAggregateValue(conf, aggName);
+ if (aggName.equals(aggClassName)) {
+ agg.readFields(input);
+ input.close();
+ return agg;
+ } else {
+ agg.readFields(input);
+ }
+ }
+ throw new IllegalStateException("Cannot find the aggregate value for " + aggClassName);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public static HashMap<String, Writable> readAllGlobalAggregateValues(Configuration conf, String jobId)
+ throws HyracksDataException {
+ String pathStr = TMP_DIR + jobId + "agg";
+ Path path = new Path(pathStr);
+ List<Writable> aggValues = createFinalAggregateValues(conf);
+ HashMap<String, Writable> finalAggs = new HashMap<>();
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ FSDataInputStream input = dfs.open(path);
+ for (int i = 0; i < aggValues.size(); i++) {
+ String aggName = input.readUTF();
+ aggValues.get(i).readFields(input);
+ finalAggs.put(aggName, aggValues.get(i));
+ }
+ input.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ return finalAggs;
+ }
+
+ public static Counters getCounters(PregelixJob job) throws HyracksDataException {
+ Configuration conf = job.getConfiguration();
+ String jobId = getJobId(conf);
+ int lastIter = BspUtils.readCountersLastIteration(conf, jobId);
+ return BspUtils.readCounters(lastIter, conf, jobId);
+ }
+
+ static Counters readCounters(int superstep, Configuration conf, String jobId) throws HyracksDataException {
+ String pathStr = TMP_DIR + jobId + BspUtils.COUNTERS_VALUE_ON_ITERATION + superstep;
+ Path path = new Path(pathStr);
+ Counters savedCounters = new Counters();
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ FSDataInputStream input = dfs.open(path);
+ savedCounters.readFields(input);
+ input.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ return savedCounters;
+ }
+
+ static void writeCounters(Counters toWrite, int superstep, Configuration conf, String jobId)
+ throws HyracksDataException {
+ String pathStr = TMP_DIR + jobId + BspUtils.COUNTERS_VALUE_ON_ITERATION + superstep;
+ Path path = new Path(pathStr);
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ FSDataOutputStream output = dfs.create(path, true);
+ toWrite.write(output);
+ output.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ static int readCountersLastIteration(Configuration conf, String jobId) throws HyracksDataException {
+ String pathStr = TMP_DIR + jobId + BspUtils.COUNTERS_LAST_ITERATION_COMPLETED;
+ Path path = new Path(pathStr);
+ IntWritable lastIter = new IntWritable();
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ FSDataInputStream input = dfs.open(path);
+ lastIter.readFields(input);
+ input.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ return lastIter.get();
+ }
+
+ static void writeCountersLastIteration(int superstep, Configuration conf, String jobId) throws HyracksDataException {
+ String pathStr = TMP_DIR + jobId + BspUtils.COUNTERS_LAST_ITERATION_COMPLETED;
+ Path path = new Path(pathStr);
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ FSDataOutputStream output = dfs.create(path, true);
+ new IntWritable(superstep).write(output);
+ output.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java
index 4f5fef0..9721589 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java
@@ -17,7 +17,7 @@
import edu.uci.ics.pregelix.api.job.ICheckpointHook;
/**
- * A conservative checkpoint hook which does checkpoint every 5 supersteps
+ * A conservative checkpoint hook which does checkpoint every 2 supersteps
*
* @author yingyib
*/
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultIterationCompleteReporterHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultIterationCompleteReporterHook.java
new file mode 100644
index 0000000..7b004d8
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultIterationCompleteReporterHook.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import edu.uci.ics.pregelix.api.job.IIterationCompleteReporterHook;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+
+/**
+ * The default iteration complete reporter hook does nothing
+ *
+ * @author wbiesing
+ */
+public class DefaultIterationCompleteReporterHook implements IIterationCompleteReporterHook {
+
+ private static final Log LOG = LogFactory.getLog(DefaultIterationCompleteReporterHook.class);
+
+ @Override
+ public void completeIteration(int superstep, PregelixJob job) {
+ LOG.debug("iteration complete reporter for " + superstep + " job " + job.getJobName());
+ }
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/HadoopCountersAggregator.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/HadoopCountersAggregator.java
new file mode 100644
index 0000000..b0814d9
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/HadoopCountersAggregator.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Counters;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * A global aggregator that produces a Hadoop mapreduce.Counters object
+ */
+@SuppressWarnings("rawtypes")
+public abstract class HadoopCountersAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable, P extends Writable>
+ extends GlobalAggregator<I, V, E, M, Counters, Counters> {
+ private ResettableCounters counters = new ResettableCounters();
+
+ public Counters getCounters() {
+ return counters;
+ }
+
+ @Override
+ public void init() {
+ counters.reset();
+ }
+
+ @Override
+ public void step(Counters partialResult) {
+ counters.incrAllCounters(partialResult);
+ }
+
+ @Override
+ public Counters finishPartial() {
+ return counters;
+ }
+
+ @Override
+ public Counters finishFinal() {
+ return counters;
+ }
+
+ /**
+ * mapreduce.Counters object that is resettable via .reset()
+ */
+ public static class ResettableCounters extends Counters {
+ private static final DataInputStream zeroStream = new DataInputStream(new InputStream() {
+ @Override
+ public int read() throws IOException {
+ return 0;
+ }
+ });
+
+ /**
+ * Reset this Counters object
+ *
+ * The reset is done by simulating a readFields() from a stream of 0's,
+ * indicating a serialized length of 0 groups. The Counters' cache is not changed.
+ */
+ public void reset() {
+ try {
+ this.readFields(zeroStream);
+ } catch (IOException e) {
+ throw new RuntimeException("Unexpected failure when trying to reset Counters object!", e);
+ }
+ }
+ }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/HadoopCountersGlobalAggregateHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/HadoopCountersGlobalAggregateHook.java
new file mode 100644
index 0000000..35ff22c
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/HadoopCountersGlobalAggregateHook.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counters;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.job.IIterationCompleteReporterHook;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+
+/**
+ * Hook that aggregates a mapreduce.Counters object across all
+ * iterations of a job, saving to HDFS
+ *
+ * @author wbiesing
+ */
+public class HadoopCountersGlobalAggregateHook implements IIterationCompleteReporterHook {
+
+ @Override
+ public void completeIteration(int superstep, PregelixJob job) throws HyracksDataException {
+ Configuration conf = job.getConfiguration();
+ String jobId = BspUtils.getJobId(conf);
+ Class<?> aggClass = conf.getClass(PregelixJob.COUNTERS_AGGREGATOR_CLASS, null);
+ if (aggClass == null)
+ throw new HyracksDataException(
+ "A subclass of HadoopCountersAggregator must active for GlobalAggregateCountersHook to operate!");
+ Counters curIterCounters;
+ try {
+ curIterCounters = (Counters) BspUtils.readGlobalAggregateValue(conf, jobId, aggClass.getName());
+ } catch (IllegalStateException e) {
+ throw new HyracksDataException(
+ "A subclass of HadoopCountersAggregator must active for GlobalAggregateCountersHook to operate!", e);
+ }
+ if (superstep > 1) {
+ Counters prevCounters = BspUtils.readCounters(superstep - 1, conf, jobId); // the counters from the previous iterations, all aggregated together
+ curIterCounters.incrAllCounters(prevCounters); // add my counters to previous ones
+ }
+ BspUtils.writeCounters(curIterCounters, superstep, conf, jobId);
+ BspUtils.writeCountersLastIteration(superstep, conf, jobId);
+ }
+}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index b3a90e9..a71ea3d 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -46,6 +46,7 @@
import edu.uci.ics.hyracks.client.stats.Counters;
import edu.uci.ics.hyracks.client.stats.impl.ClientCounterContext;
import edu.uci.ics.pregelix.api.job.ICheckpointHook;
+import edu.uci.ics.pregelix.api.job.IIterationCompleteReporterHook;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.core.base.IDriver;
@@ -54,6 +55,7 @@
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
import edu.uci.ics.pregelix.core.optimizer.DynamicOptimizer;
import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
+import edu.uci.ics.pregelix.core.optimizer.NoOpOptimizer;
import edu.uci.ics.pregelix.core.util.ExceptionUtilities;
import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
@@ -97,7 +99,7 @@
PregelixJob currentJob = jobs.get(0);
PregelixJob lastJob = currentJob;
addHadoopConfiguration(currentJob, ipAddress, port, true);
- ClientCounterContext counterContext = new ClientCounterContext(ipAddress, 16001,
+ ClientCounterContext counterContext = new ClientCounterContext(ipAddress, ClusterConfig.getCCHTTPort(),
Arrays.asList(ClusterConfig.getNCNames()));
JobGen jobGen = null;
@@ -110,8 +112,11 @@
boolean failed = false;
int retryCount = 0;
int maxRetryCount = 3;
- jobGen = selectJobGen(planChoice, currentJob);
- IOptimizer dynamicOptimzier = new DynamicOptimizer();
+
+ IOptimizer dynamicOptimizer = BspUtils.getEnableDynamicOptimization(currentJob.getConfiguration()) == false ? new NoOpOptimizer()
+ : new DynamicOptimizer(counterContext);
+ jobGen = selectJobGen(planChoice, currentJob, dynamicOptimizer);
+ jobGen = dynamicOptimizer.optimize(jobGen, 0);
do {
try {
@@ -139,9 +144,7 @@
}
/** run loop-body jobs with dynamic optimizer if it is enabled */
- if (BspUtils.getEnableDynamicOptimization(currentJob.getConfiguration())) {
- jobGen = dynamicOptimzier.optimize(counterContext, jobGen, i);
- }
+ jobGen = dynamicOptimizer.optimize(jobGen, i);
runLoopBody(deploymentId, currentJob, jobGen, i, lastSnapshotJobIndex, lastSnapshotSuperstep,
ckpHook, failed);
runClearState(deploymentId, jobGen);
@@ -196,8 +199,8 @@
.equals(currentInputPaths[0])));
}
- private JobGen selectJobGen(Plan planChoice, PregelixJob currentJob) {
- return JobGenFactory.createJobGen(planChoice, currentJob);
+ private JobGen selectJobGen(Plan planChoice, PregelixJob currentJob, IOptimizer optimizer) {
+ return JobGenFactory.createJobGen(planChoice, currentJob, optimizer);
}
private long loadData(PregelixJob currentJob, JobGen jobGen, DeploymentId deploymentId) throws IOException,
@@ -277,6 +280,9 @@
loadData(job, jobGen, deploymentId);
}
}
+ // TODO how long should the hook persist? One per job? Or one per recovery attempt?
+ // since the hook shouldn't be stateful, we do one per recovery attempt
+ IIterationCompleteReporterHook itCompleteHook = BspUtils.createIterationCompleteHook(job.getConfiguration());
int i = doRecovery ? snapshotSuperstep.get() + 1 : 1;
int ckpInterval = BspUtils.getCheckpointingInterval(job.getConfiguration());
boolean terminate = false;
@@ -294,6 +300,7 @@
snapshotJobIndex.set(currentJobIndex);
snapshotSuperstep.set(i);
}
+ itCompleteHook.completeIteration(i, job);
i++;
} while (!terminate);
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index c1f6aae..163e476 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -47,6 +47,7 @@
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -100,6 +101,7 @@
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ClearStateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -140,6 +142,7 @@
protected String jobId = UUID.randomUUID().toString();;
protected int frameSize = ClusterConfig.getFrameSize();
protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
+ protected IOptimizer optimizer;
private static final Map<String, String> MERGE_POLICY_PROPERTIES;
static {
@@ -150,18 +153,19 @@
protected static final String SECONDARY_INDEX_ODD = "secondary1";
protected static final String SECONDARY_INDEX_EVEN = "secondary2";
- public JobGen(PregelixJob job) {
- init(job);
- }
-
- public JobGen(PregelixJob job, String jobId) {
- if(jobId!=null){
- this.jobId = jobId;
- }
- init(job);
+ public JobGen(PregelixJob job, IOptimizer optimizer) {
+ init(job, optimizer);
}
- private void init(PregelixJob job) {
+ public JobGen(PregelixJob job, String jobId, IOptimizer optimizer) {
+ if (jobId != null) {
+ this.jobId = jobId;
+ }
+ init(job, optimizer);
+ }
+
+ private void init(PregelixJob job, IOptimizer optimizer) {
+ this.optimizer = optimizer;
conf = job.getConfiguration();
pregelixJob = job;
initJobConfiguration();
@@ -178,7 +182,7 @@
}
public void reset(PregelixJob job) {
- init(job);
+ init(job, this.optimizer);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -232,12 +236,12 @@
int[] keyFields = new int[1];
keyFields[0] = 0;
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(spec,
storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
keyFields, getIndexDataflowHelperFactory(), new TransientLocalResourceFactoryProvider(),
NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, btreeCreate);
+ setLocationConstraint(spec, btreeCreate);
spec.setFrameSize(frameSize);
return spec;
}
@@ -258,7 +262,7 @@
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
JobSpecification spec = new JobSpecification();
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
/**
* the graph file scan operator and use count constraint first, will use
@@ -277,7 +281,7 @@
String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
readSchedule, confFactory);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
/**
* construct sort operator
@@ -289,7 +293,7 @@
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameLimit, sortFields,
nkmFactory, comparatorFactories, recordDescriptor);
- ClusterConfig.setLocationConstraint(spec, sorter);
+ setLocationConstraint(spec, sorter);
/**
* construct write file operator
@@ -336,7 +340,7 @@
RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/**
* construct btree search operator
@@ -346,14 +350,14 @@
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
null, null, true, true, getIndexDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
/**
* construct write file operator
@@ -463,7 +467,7 @@
public JobSpecification generateClearState() throws HyracksException {
JobSpecification spec = new JobSpecification();
ClearStateOperatorDescriptor clearState = new ClearStateOperatorDescriptor(spec, jobId);
- ClusterConfig.setLocationConstraint(spec, clearState);
+ setLocationConstraint(spec, clearState);
spec.addRoot(clearState);
return spec;
}
@@ -477,11 +481,11 @@
protected JobSpecification dropIndex(String indexName) throws HyracksException {
JobSpecification spec = new JobSpecification();
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, indexName);
IndexDropOperatorDescriptor drop = new IndexDropOperatorDescriptor(spec, storageManagerInterface,
lcManagerProvider, fileSplitProvider, getIndexDataflowHelperFactory());
- ClusterConfig.setLocationConstraint(spec, drop);
+ setLocationConstraint(spec, drop);
spec.addRoot(drop);
spec.setFrameSize(frameSize);
return spec;
@@ -515,7 +519,7 @@
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
JobSpecification spec = new JobSpecification();
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
/**
* the graph file scan operator and use count constraint first, will use
@@ -537,7 +541,7 @@
String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
readSchedule, confFactory);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
/**
* construct sort operator
@@ -549,7 +553,7 @@
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, sortFields,
nkmFactory, comparatorFactories, recordDescriptor);
- ClusterConfig.setLocationConstraint(spec, sorter);
+ setLocationConstraint(spec, sorter);
/**
* construct tree bulk load operator
@@ -564,7 +568,7 @@
storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
sortFields, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, true, 0, false,
getIndexDataflowHelperFactory(), NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+ setLocationConstraint(spec, btreeBulkLoad);
/**
* connect operator descriptors
@@ -596,7 +600,7 @@
RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/**
* construct btree search operator
@@ -606,7 +610,7 @@
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
@@ -615,7 +619,7 @@
BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
null, null, true, true, getIndexDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
ExternalSortOperatorDescriptor sort = null;
if (!ckpointing) {
@@ -625,7 +629,7 @@
sortCmpFactories[0] = JobGenUtil.getFinalBinaryComparatorFactory(vertexIdClass);
sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields, nkmFactory, sortCmpFactories,
recordDescriptor);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
}
/**
@@ -636,7 +640,7 @@
conf, vertexIdClass.getName(), vertexClass.getName());
VertexFileWriteOperatorDescriptor writer = new VertexFileWriteOperatorDescriptor(spec, confFactory,
inputRdFactory, preHookFactory);
- ClusterConfig.setLocationConstraint(spec, writer);
+ setLocationConstraint(spec, writer);
/**
* connect operator descriptors
@@ -681,14 +685,14 @@
* construct empty tuple operator
*/
EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/**
* construct the materializing write operator
*/
MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
false, jobId, lastSuccessfulIteration + 1);
- ClusterConfig.setLocationConstraint(spec, materializeRead);
+ setLocationConstraint(spec, materializeRead);
String checkpointPath = BspUtils.getMessageCheckpointPath(conf, lastSuccessfulIteration);;
PregelixJob tmpJob = createCloneJob("State checkpointing for job " + jobId, pregelixJob);
@@ -700,7 +704,7 @@
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
conf, vertexIdClass.getName(), MsgList.class.getName());
HDFSFileWriteOperatorDescriptor hdfsWriter = new HDFSFileWriteOperatorDescriptor(spec, tmpJob, inputRdFactory);
- ClusterConfig.setLocationConstraint(spec, hdfsWriter);
+ setLocationConstraint(spec, hdfsWriter);
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, materializeRead, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, hdfsWriter, 0);
@@ -743,7 +747,7 @@
String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
HDFSReadOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(spec, recordDescriptor, tmpJob, splits,
readSchedule, new KeyValueParserFactory());
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
/** construct the sort operator to sort message states */
int[] keyFields = new int[] { 0 };
@@ -752,24 +756,24 @@
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastCheckpointedIteration, vertexIdClass);
ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, recordDescriptor, Algorithm.QUICK_SORT);
- ClusterConfig.setLocationConstraint(spec, sort);
+ setLocationConstraint(spec, sort);
/**
* construct the materializing write operator
*/
MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec,
recordDescriptor, jobId, lastCheckpointedIteration);
- ClusterConfig.setLocationConstraint(spec, materialize);
+ setLocationConstraint(spec, materialize);
/** construct runtime hook */
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
new RecoveryRuntimeHookFactory(jobId, lastCheckpointedIteration, new ConfigurationFactory(
pregelixJob.getConfiguration())));
- ClusterConfig.setLocationConstraint(spec, postSuperStep);
+ setLocationConstraint(spec, postSuperStep);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink);
+ setLocationConstraint(spec, emptySink);
/**
* connect operator descriptors
@@ -807,7 +811,7 @@
*/
List<JobSpecification> list = new ArrayList<JobSpecification>();
list.add(bulkLoadLiveVertexBTree(iteration));
- JobGen jobGen = new JobGenInnerJoin(pregelixJob, jobId);
+ JobGen jobGen = new JobGenInnerJoin(pregelixJob, jobId, optimizer);
return Pair.of(list, jobGen);
}
@@ -827,12 +831,12 @@
* construct empty tuple operator
*/
EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/**
* construct btree search and function call update operator
*/
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
@@ -852,12 +856,12 @@
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
getIndexDataflowHelperFactory(), inputRdFactory, 1, new ExtractLiveVertexIdFunctionFactory(),
preHookFactory, null, rdFinal);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
/**
* construct bulk-load index operator
*/
- IFileSplitProvider secondaryFileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
+ IFileSplitProvider secondaryFileSplitProvider = getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
int[] fieldPermutation = new int[] { 0, 1 };
int[] keyFields = new int[] { 0 };
IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
@@ -866,7 +870,7 @@
TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
storageManagerInterface, lcManagerProvider, secondaryFileSplitProvider, typeTraits, indexCmpFactories,
fieldPermutation, keyFields, DEFAULT_BTREE_FILL_FACTOR, getIndexDataflowHelperFactory());
- ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+ setLocationConstraint(spec, btreeBulkLoad);
/** connect job spec */
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
@@ -876,4 +880,25 @@
return spec;
}
+ /**
+ * set the location constraint for operators
+ *
+ * @param spec
+ * @param operator
+ */
+ public void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator) {
+ optimizer.setOptimizedLocationConstraints(spec, operator);
+ }
+
+ /**
+ * get the file split provider
+ *
+ * @param jobId
+ * @param indexName
+ * @return the IFileSplitProvider instance
+ */
+ public IFileSplitProvider getFileSplitProvider(String jobId, String indexName){
+ return optimizer.getOptimizedFileSplitProvider(jobId, indexName);
+ }
+
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java
index ed580de..cbc9c81 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java
@@ -17,26 +17,27 @@
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.core.base.IDriver.Plan;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
public class JobGenFactory {
- public static JobGen createJobGen(Plan planChoice, PregelixJob currentJob) {
+ public static JobGen createJobGen(Plan planChoice, PregelixJob currentJob, IOptimizer optimizer) {
JobGen jobGen = null;
switch (planChoice) {
case INNER_JOIN:
- jobGen = new JobGenInnerJoin(currentJob);
+ jobGen = new JobGenInnerJoin(currentJob, optimizer);
break;
case OUTER_JOIN:
- jobGen = new JobGenOuterJoin(currentJob);
+ jobGen = new JobGenOuterJoin(currentJob, optimizer);
break;
case OUTER_JOIN_SORT:
- jobGen = new JobGenOuterJoinSort(currentJob);
+ jobGen = new JobGenOuterJoinSort(currentJob, optimizer);
break;
case OUTER_JOIN_SINGLE_SORT:
- jobGen = new JobGenOuterJoinSingleSort(currentJob);
+ jobGen = new JobGenOuterJoinSingleSort(currentJob, optimizer);
break;
default:
- jobGen = new JobGenInnerJoin(currentJob);
+ jobGen = new JobGenInnerJoin(currentJob, optimizer);
}
return jobGen;
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 7bdb069..f838fb8 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -63,6 +63,7 @@
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -93,12 +94,12 @@
public class JobGenInnerJoin extends JobGen {
private static final Logger LOGGER = Logger.getLogger(JobGen.class.getName());
- public JobGenInnerJoin(PregelixJob job) {
- super(job);
+ public JobGenInnerJoin(PregelixJob job, IOptimizer optimizer) {
+ super(job, optimizer);
}
- public JobGenInnerJoin(PregelixJob job, String jobId) {
- super(job, jobId);
+ public JobGenInnerJoin(PregelixJob job, String jobId, IOptimizer optimizer) {
+ super(job, jobId, optimizer);
}
protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
@@ -113,18 +114,18 @@
* construct empty tuple operator
*/
EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/** construct runtime hook */
RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PreSuperStepRuntimeHookFactory(jobId, confFactory));
- ClusterConfig.setLocationConstraint(spec, preSuperStep);
+ setLocationConstraint(spec, preSuperStep);
/**
* construct drop index operator
*/
- IFileSplitProvider secondaryFileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider secondaryFileSplitProvider = getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
/**
* construct btree search and function call update operator
@@ -159,7 +160,7 @@
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
getIndexDataflowHelperFactory(), inputRdFactory, 6, new StartComputeUpdateFunctionFactory(confFactory),
preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
/**
* termination state write operator
@@ -188,7 +189,7 @@
TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
storageManagerInterface, lcManagerProvider, secondaryFileSplitProvider, typeTraits, indexCmpFactories,
fieldPermutation, keyFields, DEFAULT_BTREE_FILL_FACTOR, getIndexDataflowHelperFactory());
- ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+ setLocationConstraint(spec, btreeBulkLoad);
/**
* construct local sort operator
@@ -199,7 +200,7 @@
.getClass());
ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
- ClusterConfig.setLocationConstraint(spec, localSort);
+ setLocationConstraint(spec, localSort);
/**
* construct local pre-clustered group-by operator
@@ -208,7 +209,7 @@
false, false);
ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
- ClusterConfig.setLocationConstraint(spec, localGby);
+ setLocationConstraint(spec, localGby);
/**
* construct global group-by operator
@@ -217,25 +218,25 @@
conf, true, true);
ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
- ClusterConfig.setLocationConstraint(spec, globalGby);
+ setLocationConstraint(spec, globalGby);
/**
* construct the materializing write operator
*/
MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
jobId, iteration);
- ClusterConfig.setLocationConstraint(spec, materialize);
+ setLocationConstraint(spec, materialize);
/**
* do pre- & post- super step
*/
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PostSuperStepRuntimeHookFactory(jobId));
- ClusterConfig.setLocationConstraint(spec, postSuperStep);
+ setLocationConstraint(spec, postSuperStep);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink);
+ setLocationConstraint(spec, emptySink);
/**
* add the insert operator to insert vertexes
@@ -244,7 +245,7 @@
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, insertOp);
+ setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
@@ -254,15 +255,15 @@
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, deleteOp);
+ setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink3);
+ setLocationConstraint(spec, emptySink3);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink4);
+ setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
@@ -330,7 +331,7 @@
* construct empty tuple operator
*/
EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/**
* construct pre-superstep
@@ -338,20 +339,20 @@
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PreSuperStepRuntimeHookFactory(jobId, confFactory));
- ClusterConfig.setLocationConstraint(spec, preSuperStep);
+ setLocationConstraint(spec, preSuperStep);
/**
* construct the materializing write operator
*/
MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
true, jobId, iteration);
- ClusterConfig.setLocationConstraint(spec, materializeRead);
+ setLocationConstraint(spec, materializeRead);
/**
* construct the index-set-union operator
*/
String readFile = iteration % 2 == 0 ? SECONDARY_INDEX_ODD : SECONDARY_INDEX_EVEN;
- IFileSplitProvider secondaryFileSplitProviderRead = ClusterConfig.getFileSplitProvider(jobId, readFile);
+ IFileSplitProvider secondaryFileSplitProviderRead = getFileSplitProvider(jobId, readFile);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
@@ -359,12 +360,12 @@
IndexNestedLoopJoinOperatorDescriptor setUnion = new IndexNestedLoopJoinOperatorDescriptor(spec, rdFinal,
storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, typeTraits,
comparatorFactories, true, keyFields, keyFields, true, true, getIndexDataflowHelperFactory(), true);
- ClusterConfig.setLocationConstraint(spec, setUnion);
+ setLocationConstraint(spec, setUnion);
/**
* construct index-join-function-update operator
*/
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
VLongWritable.class.getName());
RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
@@ -379,7 +380,7 @@
JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
getIndexDataflowHelperFactory(), inputRdFactory, 6, new ComputeUpdateFunctionFactory(confFactory),
preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
- ClusterConfig.setLocationConstraint(spec, join);
+ setLocationConstraint(spec, join);
/**
* construct bulk-load index operator
@@ -389,12 +390,12 @@
indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
WritableComparator.get(vertexIdClass).getClass());
String writeFile = iteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
- IFileSplitProvider secondaryFileSplitProviderWrite = ClusterConfig.getFileSplitProvider(jobId, writeFile);
+ IFileSplitProvider secondaryFileSplitProviderWrite = getFileSplitProvider(jobId, writeFile);
TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderWrite, typeTraits,
indexCmpFactories, fieldPermutation, keyFields, DEFAULT_BTREE_FILL_FACTOR,
getIndexDataflowHelperFactory());
- ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+ setLocationConstraint(spec, btreeBulkLoad);
/**
* construct local sort operator
@@ -405,7 +406,7 @@
.getClass());
ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
- ClusterConfig.setLocationConstraint(spec, localSort);
+ setLocationConstraint(spec, localSort);
/**
* construct local pre-clustered group-by operator
@@ -414,7 +415,7 @@
false, false);
ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
- ClusterConfig.setLocationConstraint(spec, localGby);
+ setLocationConstraint(spec, localGby);
/**
* construct global group-by operator
@@ -423,23 +424,23 @@
conf, true, true);
ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
- ClusterConfig.setLocationConstraint(spec, globalGby);
+ setLocationConstraint(spec, globalGby);
/**
* construct the materializing write operator
*/
MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
jobId, iteration);
- ClusterConfig.setLocationConstraint(spec, materialize);
+ setLocationConstraint(spec, materialize);
/** construct runtime hook */
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PostSuperStepRuntimeHookFactory(jobId));
- ClusterConfig.setLocationConstraint(spec, postSuperStep);
+ setLocationConstraint(spec, postSuperStep);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink);
+ setLocationConstraint(spec, emptySink);
/**
* termination state write operator
@@ -464,7 +465,7 @@
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, insertOp);
+ setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
@@ -474,15 +475,15 @@
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, deleteOp);
+ setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink3);
+ setLocationConstraint(spec, emptySink3);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink4);
+ setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
@@ -596,7 +597,7 @@
String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
HDFSReadOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(spec, recordDescriptor, tmpJob, splits,
readSchedule, new KeyValueParserFactory());
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
/** construct the sort operator to sort message states */
int[] keyFields = new int[] { 0 };
@@ -605,7 +606,7 @@
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration, vertexIdClass);
ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, recordDescriptor, Algorithm.QUICK_SORT);
- ClusterConfig.setLocationConstraint(spec, sort);
+ setLocationConstraint(spec, sort);
/**
* construct bulk-load index operator
@@ -618,12 +619,12 @@
indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration + 1, WritableComparator
.get(vertexIdClass).getClass());
String writeFile = lastSuccessfulIteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
- IFileSplitProvider secondaryFileSplitProviderWrite = ClusterConfig.getFileSplitProvider(jobId, writeFile);
+ IFileSplitProvider secondaryFileSplitProviderWrite = getFileSplitProvider(jobId, writeFile);
TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderWrite, typeTraits,
indexCmpFactories, fieldPermutation, new int[] { 0 }, DEFAULT_BTREE_FILL_FACTOR,
getIndexDataflowHelperFactory());
- ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+ setLocationConstraint(spec, btreeBulkLoad);
/**
* connect operator descriptors
@@ -648,7 +649,7 @@
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
Class<? extends Writable> msgListClass = MsgList.class;
String readFile = lastSuccessfulIteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
- IFileSplitProvider secondaryFileSplitProviderRead = ClusterConfig.getFileSplitProvider(jobId, readFile);
+ IFileSplitProvider secondaryFileSplitProviderRead = getFileSplitProvider(jobId, readFile);
JobSpecification spec = new JobSpecification();
/**
* construct empty tuple operator
@@ -663,7 +664,7 @@
RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/**
* construct btree search operator
@@ -681,7 +682,7 @@
storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, typeTraits,
comparatorFactories, null, null, null, true, true, getIndexDataflowHelperFactory(), false,
NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
/**
* construct write file operator
@@ -689,7 +690,7 @@
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
conf, vertexIdClass.getName(), MsgList.class.getName());
HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, job, inputRdFactory);
- ClusterConfig.setLocationConstraint(spec, writer);
+ setLocationConstraint(spec, writer);
/**
* connect operator descriptors
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 68e6706..39a56bf 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -41,7 +41,7 @@
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
-import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -69,12 +69,12 @@
public class JobGenOuterJoin extends JobGen {
- public JobGenOuterJoin(PregelixJob job) {
- super(job);
+ public JobGenOuterJoin(PregelixJob job, IOptimizer optimizer) {
+ super(job, optimizer);
}
-
- public JobGenOuterJoin(PregelixJob job, String jobId) {
- super(job, jobId);
+
+ public JobGenOuterJoin(PregelixJob job, String jobId, IOptimizer optimizer) {
+ super(job, jobId, optimizer);
}
@Override
@@ -90,12 +90,12 @@
* construct empty tuple operator
*/
EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/** construct runtime hook */
RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PreSuperStepRuntimeHookFactory(jobId, confFactory));
- ClusterConfig.setLocationConstraint(spec, preSuperStep);
+ setLocationConstraint(spec, preSuperStep);
/**
* construct btree search function update operator
@@ -104,7 +104,7 @@
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);;
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
@@ -129,7 +129,7 @@
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
getIndexDataflowHelperFactory(), inputRdFactory, 5, new StartComputeUpdateFunctionFactory(confFactory),
preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
/**
* construct local sort operator
@@ -140,7 +140,7 @@
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
- ClusterConfig.setLocationConstraint(spec, localSort);
+ setLocationConstraint(spec, localSort);
/**
* construct local pre-clustered group-by operator
@@ -149,7 +149,7 @@
false, false);
ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
- ClusterConfig.setLocationConstraint(spec, localGby);
+ setLocationConstraint(spec, localGby);
/**
* construct global group-by operator
@@ -160,22 +160,22 @@
conf, true, true);
ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
- ClusterConfig.setLocationConstraint(spec, globalGby);
+ setLocationConstraint(spec, globalGby);
/**
* construct the materializing write operator
*/
MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
jobId, iteration);
- ClusterConfig.setLocationConstraint(spec, materialize);
+ setLocationConstraint(spec, materialize);
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PostSuperStepRuntimeHookFactory(jobId));
- ClusterConfig.setLocationConstraint(spec, postSuperStep);
+ setLocationConstraint(spec, postSuperStep);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink2 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink2);
+ setLocationConstraint(spec, emptySink2);
/**
* termination state write operator
@@ -202,7 +202,7 @@
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, insertOp);
+ setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
@@ -212,14 +212,14 @@
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, deleteOp);
+ setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink4);
+ setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
/** connect all operators **/
@@ -286,7 +286,7 @@
* construct empty tuple operator
*/
EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/**
* construct pre-superstep hook
@@ -294,19 +294,19 @@
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PreSuperStepRuntimeHookFactory(jobId, confFactory));
- ClusterConfig.setLocationConstraint(spec, preSuperStep);
+ setLocationConstraint(spec, preSuperStep);
/**
* construct the materializing write operator
*/
MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
true, jobId, iteration);
- ClusterConfig.setLocationConstraint(spec, materializeRead);
+ setLocationConstraint(spec, materializeRead);
/**
* construct index join function update operator
*/
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
@@ -329,7 +329,7 @@
getIndexDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
rdPartialAggregate, rdInsert, rdDelete);
- ClusterConfig.setLocationConstraint(spec, join);
+ setLocationConstraint(spec, join);
/**
* construct local sort operator
@@ -339,7 +339,7 @@
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
- ClusterConfig.setLocationConstraint(spec, localSort);
+ setLocationConstraint(spec, localSort);
/**
* construct local pre-clustered group-by operator
@@ -348,7 +348,7 @@
false, false);
ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
- ClusterConfig.setLocationConstraint(spec, localGby);
+ setLocationConstraint(spec, localGby);
/**
* construct global group-by operator
@@ -357,23 +357,23 @@
conf, true, true);
ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
- ClusterConfig.setLocationConstraint(spec, globalGby);
+ setLocationConstraint(spec, globalGby);
/**
* construct the materializing write operator
*/
MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
jobId, iteration);
- ClusterConfig.setLocationConstraint(spec, materialize);
+ setLocationConstraint(spec, materialize);
/** construct runtime hook */
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PostSuperStepRuntimeHookFactory(jobId));
- ClusterConfig.setLocationConstraint(spec, postSuperStep);
+ setLocationConstraint(spec, postSuperStep);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink);
+ setLocationConstraint(spec, emptySink);
/**
* termination state write operator
@@ -399,7 +399,7 @@
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, insertOp);
+ setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
@@ -409,15 +409,15 @@
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, deleteOp);
+ setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink3);
+ setLocationConstraint(spec, emptySink3);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink4);
+ setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 3e4b213..c10e6c2 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -41,7 +41,7 @@
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
-import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -69,12 +69,12 @@
public class JobGenOuterJoinSingleSort extends JobGen {
- public JobGenOuterJoinSingleSort(PregelixJob job) {
- super(job);
+ public JobGenOuterJoinSingleSort(PregelixJob job, IOptimizer optimizer) {
+ super(job, optimizer);
}
-
- public JobGenOuterJoinSingleSort(PregelixJob job, String jobId) {
- super(job, jobId);
+
+ public JobGenOuterJoinSingleSort(PregelixJob job, String jobId, IOptimizer optimizer) {
+ super(job, jobId, optimizer);
}
@Override
@@ -90,12 +90,12 @@
* construct empty tuple operator
*/
EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/** construct runtime hook */
RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PreSuperStepRuntimeHookFactory(jobId, confFactory));
- ClusterConfig.setLocationConstraint(spec, preSuperStep);
+ setLocationConstraint(spec, preSuperStep);
/**
* construct btree search operator
@@ -104,7 +104,7 @@
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
@@ -132,7 +132,7 @@
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
getIndexDataflowHelperFactory(), inputRdFactory, 5, new StartComputeUpdateFunctionFactory(confFactory),
preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
/**
* construct global sort operator
@@ -144,7 +144,7 @@
.getClass());
ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
- ClusterConfig.setLocationConstraint(spec, globalSort);
+ setLocationConstraint(spec, globalSort);
/**
* construct global group-by operator
@@ -155,22 +155,22 @@
conf, true, false);
ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
- ClusterConfig.setLocationConstraint(spec, globalGby);
+ setLocationConstraint(spec, globalGby);
/**
* construct the materializing write operator
*/
MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
jobId, iteration);
- ClusterConfig.setLocationConstraint(spec, materialize);
+ setLocationConstraint(spec, materialize);
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PostSuperStepRuntimeHookFactory(jobId));
- ClusterConfig.setLocationConstraint(spec, postSuperStep);
+ setLocationConstraint(spec, postSuperStep);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink2 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink2);
+ setLocationConstraint(spec, emptySink2);
/**
* termination state write operator
@@ -196,7 +196,7 @@
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, insertOp);
+ setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
@@ -206,15 +206,15 @@
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, deleteOp);
+ setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink3);
+ setLocationConstraint(spec, emptySink3);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink4);
+ setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
@@ -277,7 +277,7 @@
* construct empty tuple operator
*/
EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/**
* construct pre-superstep hook
@@ -285,19 +285,19 @@
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PreSuperStepRuntimeHookFactory(jobId, confFactory));
- ClusterConfig.setLocationConstraint(spec, preSuperStep);
+ setLocationConstraint(spec, preSuperStep);
/**
* construct the materializing write operator
*/
MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
true, jobId, iteration);
- ClusterConfig.setLocationConstraint(spec, materializeRead);
+ setLocationConstraint(spec, materializeRead);
/**
* construct index join function update operator
*/
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
@@ -320,7 +320,7 @@
getIndexDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
rdPartialAggregate, rdInsert, rdDelete);
- ClusterConfig.setLocationConstraint(spec, join);
+ setLocationConstraint(spec, join);
/**
* construct global sort operator
@@ -331,7 +331,7 @@
.getClass());
ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
- ClusterConfig.setLocationConstraint(spec, globalSort);
+ setLocationConstraint(spec, globalSort);
/**
* construct global group-by operator
@@ -340,23 +340,23 @@
conf, true, false);
ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
- ClusterConfig.setLocationConstraint(spec, globalGby);
+ setLocationConstraint(spec, globalGby);
/**
* construct the materializing write operator
*/
MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
jobId, iteration);
- ClusterConfig.setLocationConstraint(spec, materialize);
+ setLocationConstraint(spec, materialize);
/** construct runtime hook */
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PostSuperStepRuntimeHookFactory(jobId));
- ClusterConfig.setLocationConstraint(spec, postSuperStep);
+ setLocationConstraint(spec, postSuperStep);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink);
+ setLocationConstraint(spec, emptySink);
/**
* termination state write operator
@@ -379,7 +379,7 @@
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, insertOp);
+ setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
@@ -389,15 +389,15 @@
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, deleteOp);
+ setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink3);
+ setLocationConstraint(spec, emptySink3);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink4);
+ setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 7ca771c..953d82c 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -41,7 +41,7 @@
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
-import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -69,8 +69,8 @@
public class JobGenOuterJoinSort extends JobGen {
- public JobGenOuterJoinSort(PregelixJob job) {
- super(job);
+ public JobGenOuterJoinSort(PregelixJob job, IOptimizer optimizer) {
+ super(job, optimizer);
}
@Override
@@ -86,12 +86,12 @@
* construct empty tuple operator
*/
EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/** construct runtime hook */
RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PreSuperStepRuntimeHookFactory(jobId, confFactory));
- ClusterConfig.setLocationConstraint(spec, preSuperStep);
+ setLocationConstraint(spec, preSuperStep);
/**
* construct btree search function update operator
@@ -100,7 +100,7 @@
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
@@ -125,7 +125,7 @@
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
getIndexDataflowHelperFactory(), inputRdFactory, 5, new StartComputeUpdateFunctionFactory(confFactory),
preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ setLocationConstraint(spec, scanner);
/**
* construct local sort operator
@@ -137,7 +137,7 @@
.getClass());
ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
- ClusterConfig.setLocationConstraint(spec, localSort);
+ setLocationConstraint(spec, localSort);
/**
* construct local pre-clustered group-by operator
@@ -146,14 +146,14 @@
false, false);
ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
- ClusterConfig.setLocationConstraint(spec, localGby);
+ setLocationConstraint(spec, localGby);
/**
* construct global sort operator
*/
ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
- ClusterConfig.setLocationConstraint(spec, globalSort);
+ setLocationConstraint(spec, globalSort);
/**
* construct global group-by operator
@@ -164,22 +164,22 @@
conf, true, true);
ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
- ClusterConfig.setLocationConstraint(spec, globalGby);
+ setLocationConstraint(spec, globalGby);
/**
* construct the materializing write operator
*/
MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
jobId, iteration);
- ClusterConfig.setLocationConstraint(spec, materialize);
+ setLocationConstraint(spec, materialize);
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PostSuperStepRuntimeHookFactory(jobId));
- ClusterConfig.setLocationConstraint(spec, postSuperStep);
+ setLocationConstraint(spec, postSuperStep);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink2 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink2);
+ setLocationConstraint(spec, emptySink2);
/**
* termination state write operator
@@ -206,7 +206,7 @@
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, insertOp);
+ setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
@@ -216,15 +216,15 @@
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, deleteOp);
+ setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink3);
+ setLocationConstraint(spec, emptySink3);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink4);
+ setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
/** connect all operators **/
@@ -287,7 +287,7 @@
* construct empty tuple operator
*/
EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ setLocationConstraint(spec, emptyTupleSource);
/**
* construct pre-superstep hook
@@ -295,19 +295,19 @@
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PreSuperStepRuntimeHookFactory(jobId, confFactory));
- ClusterConfig.setLocationConstraint(spec, preSuperStep);
+ setLocationConstraint(spec, preSuperStep);
/**
* construct the materializing write operator
*/
MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
true, jobId, iteration);
- ClusterConfig.setLocationConstraint(spec, materializeRead);
+ setLocationConstraint(spec, materializeRead);
/**
* construct index join function update operator
*/
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
@@ -330,7 +330,7 @@
getIndexDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
rdPartialAggregate, rdInsert, rdDelete);
- ClusterConfig.setLocationConstraint(spec, join);
+ setLocationConstraint(spec, join);
/**
* construct local sort operator
@@ -341,7 +341,7 @@
.getClass());
ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
- ClusterConfig.setLocationConstraint(spec, localSort);
+ setLocationConstraint(spec, localSort);
/**
* construct local pre-clustered group-by operator
@@ -350,14 +350,14 @@
false, false);
ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
- ClusterConfig.setLocationConstraint(spec, localGby);
+ setLocationConstraint(spec, localGby);
/**
* construct global sort operator
*/
ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
- ClusterConfig.setLocationConstraint(spec, globalSort);
+ setLocationConstraint(spec, globalSort);
/**
* construct global group-by operator
@@ -366,23 +366,23 @@
conf, true, true);
ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
- ClusterConfig.setLocationConstraint(spec, globalGby);
+ setLocationConstraint(spec, globalGby);
/**
* construct the materializing write operator
*/
MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
jobId, iteration);
- ClusterConfig.setLocationConstraint(spec, materialize);
+ setLocationConstraint(spec, materialize);
/** construct runtime hook */
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PostSuperStepRuntimeHookFactory(jobId));
- ClusterConfig.setLocationConstraint(spec, postSuperStep);
+ setLocationConstraint(spec, postSuperStep);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink);
+ setLocationConstraint(spec, emptySink);
/**
* termination state write operator
@@ -408,7 +408,7 @@
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, insertOp);
+ setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
@@ -418,15 +418,15 @@
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, deleteOp);
+ setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink3);
+ setLocationConstraint(spec, emptySink3);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink4);
+ setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
index 5c1a4b8..d1f8b65 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -57,6 +57,7 @@
private static Scheduler hdfsScheduler;
private static Set<String> blackListNodes = new HashSet<String>();
private static IHyracksClientConnection hcc;
+ private static final int DEFAULT_CC_HTTP_PORT = 16001;
/**
* let tests set config path to be whatever
@@ -84,7 +85,7 @@
}
/**
- * get file split provider
+ * get file split provider, for test only
*
* @param jobId
* @return
@@ -126,6 +127,14 @@
return Integer.parseInt(clusterProperties.getProperty("FRAME_SIZE"));
}
+ public static int getCCHTTPort() {
+ try { // TODO should we really provide a default value?
+ return Integer.parseInt(clusterProperties.getProperty("CC_HTTPPORT"));
+ } catch (NumberFormatException e) {
+ return DEFAULT_CC_HTTP_PORT;
+ }
+ }
+
/**
* set location constraint
*
@@ -175,26 +184,6 @@
* @param operator
* @throws HyracksDataException
*/
- public static void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator)
- throws HyracksException {
- int count = 0;
- String[] locations = new String[NCs.length * stores.length];
- for (String nc : NCs) {
- for (int i = 0; i < stores.length; i++) {
- locations[count] = nc;
- count++;
- }
- }
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, locations);
- }
-
- /**
- * set location constraint
- *
- * @param spec
- * @param operator
- * @throws HyracksDataException
- */
public static void setCountConstraint(JobSpecification spec, IOperatorDescriptor operator) throws HyracksException {
int count = NCs.length * stores.length;
PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, count);
@@ -255,11 +244,35 @@
}
return locations;
}
+
+ /**
+ * set the default location constraint
+ *
+ * @param spec
+ * @param operator
+ * @throws HyracksDataException
+ */
+ public static void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator)
+ throws HyracksException {
+ int count = 0;
+ String[] locations = new String[NCs.length * stores.length];
+ for (String nc : NCs) {
+ for (int i = 0; i < stores.length; i++) {
+ locations[count] = nc;
+ count++;
+ }
+ }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, locations);
+ }
public static String[] getNCNames() {
return NCs;
}
+ public static String[] getStores() {
+ return stores;
+ }
+
public static void addToBlackListNodes(Collection<String> nodes) {
blackListNodes.addAll(nodes);
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java
index 01fc81b..064ca42 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java
@@ -15,14 +15,105 @@
package edu.uci.ics.pregelix.core.optimizer;
-import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.io.IntWritable;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.client.stats.Counters;
+import edu.uci.ics.hyracks.client.stats.IClusterCounterContext;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.pregelix.core.jobgen.JobGen;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
public class DynamicOptimizer implements IOptimizer {
+ private IClusterCounterContext counterContext;
+ private Map<String, IntWritable> machineToDegreeOfParallelism = new HashMap<String, IntWritable>();
+ private int dop = 0;
+
+ public DynamicOptimizer(IClusterCounterContext counterContext) {
+ this.counterContext = counterContext;
+ }
+
@Override
- public JobGen optimize(ICounterContext counterContext, JobGen jobGen, int iteration) {
- return jobGen;
+ public JobGen optimize(JobGen jobGen, int iteration) {
+ try {
+ initializeLoadPerMachine();
+ return jobGen;
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public void setOptimizedLocationConstraints(JobSpecification spec, IOperatorDescriptor operator) {
+ try {
+ String[] constraints = new String[dop];
+ int index = 0;
+ for (Entry<String, IntWritable> entry : machineToDegreeOfParallelism.entrySet()) {
+ String loc = entry.getKey();
+ IntWritable count = machineToDegreeOfParallelism.get(loc);
+ for (int j = 0; j < count.get(); j++) {
+ constraints[index++] = loc;
+ }
+ }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, constraints);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public IFileSplitProvider getOptimizedFileSplitProvider(String jobId, String indexName) {
+ FileSplit[] fileSplits = new FileSplit[dop];
+ String[] stores = ClusterConfig.getStores();
+ int splitIndex = 0;
+ for (Entry<String, IntWritable> entry : machineToDegreeOfParallelism.entrySet()) {
+ String ncName = entry.getKey();
+ IntWritable count = machineToDegreeOfParallelism.get(ncName);
+ for (int j = 0; j < count.get(); j++) {
+ //cycles stores, each machine has the number of stores = the number of cores
+ int storeCursor = j % stores.length;
+ String st = stores[storeCursor];
+ FileSplit split = new FileSplit(ncName, st + File.separator + ncName + "-data" + File.separator + jobId
+ + File.separator + indexName + (j / stores.length));
+ fileSplits[splitIndex++] = split;
+ }
+ }
+ return new ConstantFileSplitProvider(fileSplits);
+ }
+
+ /**
+ * initialize the load-per-machine map
+ *
+ * @return the degree of parallelism
+ * @throws HyracksException
+ */
+ private int initializeLoadPerMachine() throws HyracksException {
+ machineToDegreeOfParallelism.clear();
+ String[] locationConstraints = ClusterConfig.getLocationConstraint();
+ for (String loc : locationConstraints) {
+ machineToDegreeOfParallelism.put(loc, new IntWritable(0));
+ }
+ dop = 0;
+ for (Entry<String, IntWritable> entry : machineToDegreeOfParallelism.entrySet()) {
+ String loc = entry.getKey();
+ //reserve one core for heartbeat
+ int load = (int) counterContext.getCounter(Counters.NUM_PROCESSOR, false).get() - 1;
+ IntWritable count = machineToDegreeOfParallelism.get(loc);
+ count.set(load);
+ dop += load;
+ }
+ return dop;
}
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java
index b5913c4..c8856aa 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java
@@ -15,11 +15,17 @@
package edu.uci.ics.pregelix.core.optimizer;
-import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.pregelix.core.jobgen.JobGen;
public interface IOptimizer {
- public JobGen optimize(ICounterContext counterContext, JobGen jobGen, int iteration);
-
+ public JobGen optimize(JobGen jobGen, int iteration);
+
+ public void setOptimizedLocationConstraints(JobSpecification spec, IOperatorDescriptor operator);
+
+ public IFileSplitProvider getOptimizedFileSplitProvider(String jobId, String indexName);
+
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/NoOpOptimizer.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/NoOpOptimizer.java
new file mode 100644
index 0000000..cd0ca37
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/NoOpOptimizer.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.core.optimizer;
+
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.pregelix.core.jobgen.JobGen;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+
+public class NoOpOptimizer implements IOptimizer {
+
+ @Override
+ public JobGen optimize(JobGen jobGen, int iteration) {
+ return jobGen;
+ }
+
+ @Override
+ public void setOptimizedLocationConstraints(JobSpecification spec, IOperatorDescriptor operator) {
+ try {
+ ClusterConfig.setLocationConstraint(spec, operator);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public IFileSplitProvider getOptimizedFileSplitProvider(String jobId, String indexName) {
+ try {
+ return ClusterConfig.getFileSplitProvider(jobId, indexName);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index 6de65ca..d834868 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -15,6 +15,7 @@
package edu.uci.ics.pregelix.dataflow.util;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -36,7 +37,7 @@
import edu.uci.ics.pregelix.dataflow.context.TaskIterationID;
public class IterationUtils {
- public static final String TMP_DIR = "/tmp/";
+ public static final String TMP_DIR = BspUtils.TMP_DIR;
public static void setIterationState(IHyracksTaskContext ctx, String pregelixJobId, int partition, int iteration,
IStateObject state) {
@@ -143,28 +144,13 @@
}
public static Writable readGlobalAggregateValue(Configuration conf, String jobId, String aggClassName)
- throws HyracksDataException {
- try {
- FileSystem dfs = FileSystem.get(conf);
- String pathStr = IterationUtils.TMP_DIR + jobId + "agg";
- Path path = new Path(pathStr);
- FSDataInputStream input = dfs.open(path);
- int numOfAggs = BspUtils.createFinalAggregateValues(conf).size();
- for (int i = 0; i < numOfAggs; i++) {
- String aggName = input.readUTF();
- Writable agg = BspUtils.createFinalAggregateValue(conf, aggName);
- if (aggName.equals(aggClassName)) {
- agg.readFields(input);
- input.close();
- return agg;
- } else {
- agg.readFields(input);
- }
- }
- throw new IllegalStateException("Cannot find the aggregate value for " + aggClassName);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
+ throws HyracksDataException {
+ return BspUtils.readGlobalAggregateValue(conf, jobId, aggClassName);
+ }
+
+ public static HashMap<String, Writable> readAllGlobalAggregateValues(Configuration conf, String jobId)
+ throws HyracksDataException {
+ return BspUtils.readAllGlobalAggregateValues(conf, jobId);
}
}
diff --git a/pregelix/pregelix-dist/src/main/resources/conf/cluster.properties b/pregelix/pregelix-dist/src/main/resources/conf/cluster.properties
index f5d12a1..79f42ed 100644
--- a/pregelix/pregelix-dist/src/main/resources/conf/cluster.properties
+++ b/pregelix/pregelix-dist/src/main/resources/conf/cluster.properties
@@ -18,6 +18,9 @@
#The CC port for Hyracks cluster management
CC_CLUSTERPORT=1099
+#The CC port for REST communication
+CC_HTTPPORT=16001
+
#The tmp directory for cc to install jars
CCTMP_DIR=/tmp/t1
@@ -39,12 +42,15 @@
#The frame size of the internal dataflow engine
FRAME_SIZE=65536
+#The number of jobs whose logs are kept in-memory on the CC
+JOB_HISTORY_SIZE=0
+
#CC JAVA_OPTS
-CCJAVA_OPTS="-Xmx1g -Djava.util.logging.config.file=logging.properties"
-# debug option: CCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7001,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
+CCJAVA_OPTS="-Djava.util.logging.config.file=logging.properties"
+# debug option: CCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7001,server=y,suspend=n -Djava.util.logging.config.file=logging.properties"
# Yourkit option: -agentpath:/grid/0/dev/vborkar/tools/yjp-10.0.4/bin/linux-x86-64/libyjpagent.so=port=20001"
#NC JAVA_OPTS
-NCJAVA_OPTS="-Xmx1g -Djava.util.logging.config.file=logging.properties"
-# debug option: NCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
+NCJAVA_OPTS="-Djava.util.logging.config.file=logging.properties"
+# debug option: NCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Djava.util.logging.config.file=logging.properties"
# Yourkit option: -agentpath:/grid/0/dev/vborkar/tools/yjp-10.0.4/bin/linux-x86-64/libyjpagent.so=port=20001"
diff --git a/pregelix/pregelix-dist/src/main/resources/scripts/startcc.sh b/pregelix/pregelix-dist/src/main/resources/scripts/startcc.sh
index d7a0ead..41026a6 100644
--- a/pregelix/pregelix-dist/src/main/resources/scripts/startcc.sh
+++ b/pregelix/pregelix-dist/src/main/resources/scripts/startcc.sh
@@ -49,17 +49,50 @@
#Export JAVA_HOME and JAVA_OPTS
export JAVA_HOME=$JAVA_HOME
-export JAVA_OPTS=$CCJAVA_OPTS
+
+#get the OS
+OS_NAME=`uname -a|awk '{print $1}'`
+LINUX_OS='Linux'
+
+if [ $OS_NAME = $LINUX_OS ];
+then
+ MEM_SIZE=`cat /proc/meminfo |grep MemTotal|awk '{print $2}'`
+ MEM_SIZE=$(($MEM_SIZE * 1000))
+else
+ MEM_SIZE=`sysctl -a | grep "hw.memsize ="|awk '{print $3}'`
+fi
+
+MEM_SIZE=$(($MEM_SIZE * 3 / 4))
+
+#Set JAVA_OPTS
+export JAVA_OPTS=$CCJAVA_OPTS" -Xmx"$MEM_SIZE
PREGELIX_HOME=`pwd`
#Enter the temp dir
cd $CCTMP_DIR
-if [ -f "conf/topology.xml" ]; then
-#Launch hyracks cc script with topology
-${PREGELIX_HOME}/bin/pregelixcc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -heartbeat-period 5000 -max-heartbeat-lapse-periods 4 -default-max-job-attempts 0 -job-history-size 0 -cluster-topology "conf/topology.xml" &> $CCLOGS_DIR/cc.log &
-else
-#Launch hyracks cc script without toplogy
-${PREGELIX_HOME}/bin/pregelixcc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -heartbeat-period 5000 -max-heartbeat-lapse-periods 4 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &
+cmd=( "${PREGELIX_HOME}/bin/pregelixcc" )
+cmd+=( -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST
+ -heartbeat-period 5000 -max-heartbeat-lapse-periods 4
+ -default-max-job-attempts 0 )
+
+if [ -n "$CC_CLIENTPORT" ]; then
+ cmd+=( -client-net-port $CC_CLIENTPORT )
fi
+if [ -n "$CC_CLUSTERPORT" ]; then
+ cmd+=( -cluster-net-port $CC_CLUSTERPORT )
+fi
+if [ -n "$CC_HTTPPORT" ]; then
+ cmd+=( -http-port $CC_HTTPPORT )
+fi
+if [ -n "$JOB_HISTORY_SIZE" ]; then
+ cmd+=( -job-history-size $JOB_HISTORY_SIZE )
+fi
+if [ -f "${PREGELIX_HOME}/conf/topology.xml" ]; then
+ cmd+=( -cluster-topology "${PREGELIX_HOME}/conf/topology.xml" )
+fi
+
+printf "\n\n\n********************************************\nStarting CC with command %s\n\n" "${cmd[*]}" >> "$CCLOGS_DIR/cc.log"
+#Start the pregelix CC
+${cmd[@]} >> "$CCLOGS_DIR/cc.log" 2>&1 &
diff --git a/pregelix/pregelix-dist/src/main/resources/scripts/startnc.sh b/pregelix/pregelix-dist/src/main/resources/scripts/startnc.sh
index 6727bd5..8e742ea 100644
--- a/pregelix/pregelix-dist/src/main/resources/scripts/startnc.sh
+++ b/pregelix/pregelix-dist/src/main/resources/scripts/startnc.sh
@@ -68,13 +68,33 @@
#Get node ID
NODEID=`hostname | cut -d '.' -f 1`
-#Set JAVA_OPTS
-export JAVA_OPTS=$NCJAVA_OPTS
-
PREGELIX_HOME=`pwd`
#Enter the temp dir
cd $NCTMP_DIR
+#get the OS
+OS_NAME=`uname -a|awk '{print $1}'`
+LINUX_OS='Linux'
+
+if [ $OS_NAME = $LINUX_OS ];
+then
+ MEM_SIZE=`cat /proc/meminfo |grep MemTotal|awk '{print $2}'`
+ MEM_SIZE=$(($MEM_SIZE * 1000))
+else
+ MEM_SIZE=`sysctl -a | grep "hw.memsize ="|awk '{print $3}'`
+fi
+
+MEM_SIZE=$(($MEM_SIZE * 3 / 4))
+
+#Set JAVA_OPTS
+export JAVA_OPTS=$NCJAVA_OPTS" -Xmx"$MEM_SIZE
+
#Launch hyracks nc
-${PREGELIX_HOME}/bin/pregelixnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
+cmd=( "${PREGELIX_HOME}/bin/pregelixnc" )
+cmd+=( -cc-host $CCHOST -cc-port $CC_CLUSTERPORT
+ -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -result-ip-address $IPADDR
+ -node-id $NODEID -iodevices "${IO_DIRS}" );
+
+printf "\n\n\n********************************************\nStarting NC with command %s\n\n" "${cmd[*]}" >> "$NCLOGS_DIR/$NODEID.log"
+${cmd[@]} >> "$NCLOGS_DIR/$NODEID.log" 2>&1 &
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
index 393c8c9..9fb0958 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
@@ -70,6 +70,9 @@
@Option(name = "-ckp-interval", usage = "checkpointing interval -- for fault-tolerance", required = false)
public int ckpInterval = -1;
+
+ @Option(name = "-dyn-opt", usage = "whether to enable dynamic optimization -- for better performance", required = false)
+ public String dynamicOptimization = "false";
}
public static void run(String[] args, PregelixJob job) throws Exception {
@@ -121,6 +124,7 @@
}
private static void setJobSpecificSettings(PregelixJob job, Options options) {
+ job.setEnableDynamicOptimization(Boolean.parseBoolean(options.dynamicOptimization));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, options.numVertices);
job.getConfiguration().setLong(PregelixJob.NUM_EDGES, options.numEdges);
job.getConfiguration().setLong(ShortestPathsVertex.SOURCE_ID, options.sourceId);
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingVertex.java
index cf753bb..1fe7616 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingVertex.java
@@ -133,7 +133,7 @@
}
- private static long readTriangleCountingResult(Configuration conf) {
+ protected static long readTriangleCountingResult(Configuration conf) {
try {
VLongWritable count = (VLongWritable) IterationUtils.readGlobalAggregateValue(conf,
BspUtils.getJobId(conf), TriangleCountingAggregator.class.getName());
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingWithAggregateHadoopCountersVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingWithAggregateHadoopCountersVertex.java
new file mode 100644
index 0000000..7cf2840
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingWithAggregateHadoopCountersVertex.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.trianglecounting;
+
+import org.apache.hadoop.mapreduce.Counters;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.api.util.HadoopCountersAggregator;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * The triangle counting example -- counting the triangles in an undirected graph.
+ */
+public class TriangleCountingWithAggregateHadoopCountersVertex extends TriangleCountingVertex {
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(TriangleCountingWithAggregateHadoopCountersVertex.class.getSimpleName());
+ job.setVertexClass(TriangleCountingWithAggregateHadoopCountersVertex.class);
+ job.addGlobalAggregatorClass(TriangleCountingAggregator.class);
+ job.setCounterAggregatorClass(TriangleHadoopCountersAggregator.class);
+ job.setVertexInputFormatClass(TextTriangleCountingInputFormat.class);
+ job.setVertexOutputFormatClass(TriangleCountingVertexOutputFormat.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ Client.run(args, job);
+ System.out.println("triangle count in last iteration: " + readTriangleCountingResult(job.getConfiguration()));
+ System.out.println("aggregate counter (including all iterations):\n" + BspUtils.getCounters(job));
+ }
+
+ public static class TriangleHadoopCountersAggregator extends
+ HadoopCountersAggregator<VLongWritable, VLongWritable, VLongWritable, VLongWritable, Counters> {
+ private Counters counters;
+
+ @Override
+ public void init() {
+ counters = new Counters();
+ }
+
+ @Override
+ public void step(Vertex<VLongWritable, VLongWritable, VLongWritable, VLongWritable> v)
+ throws HyracksDataException {
+ counters.findCounter("TriangleCounting", "total-ids").increment(1);
+ counters.findCounter("TriangleCounting", "total-values").increment(v.getVertexValue().get());
+ }
+
+ @Override
+ public void step(Counters partialResult) {
+ counters.incrAllCounters(partialResult);
+ }
+
+ @Override
+ public Counters finishPartial() {
+ return counters;
+ }
+
+ @Override
+ public Counters finishFinal() {
+ return counters;
+ }
+ }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
index b4e17b6..5855fd3 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
@@ -35,6 +35,8 @@
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoin;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
+import edu.uci.ics.pregelix.core.optimizer.NoOpOptimizer;
import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
import edu.uci.ics.pregelix.example.PageRankVertex;
import edu.uci.ics.pregelix.example.PageRankVertex.SimulatedPageRankVertexInputFormat;
@@ -83,7 +85,9 @@
FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
FileUtils.cleanDirectory(new File(EXPECT_RESULT_DIR));
FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
- giraphTestJobGen = new JobGenOuterJoin(job);
+
+ IOptimizer dynamicOptimizer = new NoOpOptimizer();
+ giraphTestJobGen = new JobGenOuterJoin(job, dynamicOptimizer);
}
private void cleanupStores() throws IOException {
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index 6ccefd2..c7eff1e 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -79,6 +79,7 @@
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
job.setCheckpointHook(ConservativeCheckpointHook.class);
+ job.setEnableDynamicOptimization(true);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -95,6 +96,7 @@
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
job.setCheckpointHook(ConservativeCheckpointHook.class);
+ job.setEnableDynamicOptimization(true);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -109,6 +111,7 @@
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
job.getConfiguration().setLong(ShortestPathsVertex.SOURCE_ID, 0);
+ job.setDynamicVertexValueSize(true);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -123,6 +126,7 @@
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
job.setCheckpointHook(ConservativeCheckpointHook.class);
+ job.setEnableDynamicOptimization(true);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -137,6 +141,7 @@
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job.setEnableDynamicOptimization(true);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -152,6 +157,7 @@
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
+ job.setEnableDynamicOptimization(true);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -211,6 +217,7 @@
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
job.getConfiguration().setLong(ShortestPathsVertex.SOURCE_ID, 0);
+ job.setDynamicVertexValueSize(true);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -225,6 +232,7 @@
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job.setEnableDynamicOptimization(true);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -237,6 +245,7 @@
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH3);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
+ job.setDynamicVertexValueSize(true);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/Record.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/Record.java
new file mode 100644
index 0000000..10f5829
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/Record.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.util;
+
+@SuppressWarnings("rawtypes")
+public class Record implements Comparable {
+
+ private String recordText;
+
+ public Record(String text) {
+ recordText = text;
+ }
+
+ @Override
+ public int hashCode() {
+ return recordText.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return recordText;
+ }
+
+ @Override
+ public int compareTo(Object o) {
+ if (!(o instanceof Record)) {
+ throw new IllegalStateException("uncomparable items");
+ }
+ Record record = (Record) o;
+ boolean equal = equalStrings(recordText, record.recordText);
+ if (equal) {
+ return 0;
+ } else {
+ return recordText.compareTo(record.recordText);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof Record)) {
+ return false;
+ }
+ Record record = (Record) o;
+ return equalStrings(recordText, record.recordText);
+ }
+
+ private boolean equalStrings(String s1, String s2) {
+ String[] rowsOne = s1.split("\n");
+ String[] rowsTwo = s2.split("\n");
+
+ if (rowsOne.length != rowsTwo.length)
+ return false;
+
+ for (int i = 0; i < rowsOne.length; i++) {
+ String row1 = rowsOne[i];
+ String row2 = rowsTwo[i];
+
+ if (row1.equals(row2))
+ continue;
+
+ boolean spaceOrTab = false;
+ spaceOrTab = row1.contains(" ");
+ String[] fields1 = spaceOrTab ? row1.split(" ") : row1.split("\t");
+ String[] fields2 = spaceOrTab ? row2.split(" ") : row2.split("\t");
+
+ for (int j = 0; j < fields1.length; j++) {
+ if (j >= fields2.length) {
+ return false;
+ }
+ if (fields1[j].equals(fields2[j])) {
+ continue;
+ } else if (fields1[j].indexOf('.') < 0) {
+ return false;
+ } else {
+ Double double1 = Double.parseDouble(fields1[j]);
+ Double double2 = Double.parseDouble(fields2[j]);
+ float float1 = (float) double1.doubleValue();
+ float float2 = (float) double2.doubleValue();
+
+ if (Math.abs(float1 - float2) < 1.0e-7)
+ continue;
+ else {
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java
index fe07cf5..e6347ed 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java
@@ -16,83 +16,82 @@
import java.io.BufferedReader;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.junit.Assert;
public class TestUtils {
+ private static final String PREFIX = "part";
+
public static void compareWithResultDir(File expectedFileDir, File actualFileDir) throws Exception {
- String[] fileNames = expectedFileDir.list();
- for (String fileName : fileNames) {
- compareWithResult(new File(expectedFileDir, fileName), new File(actualFileDir, fileName));
- }
+ Collection<Record> expectedRecords = loadRecords(expectedFileDir);
+ Collection<Record> actualRecords = loadRecords(actualFileDir);
+ boolean equal = collectionEqual(expectedRecords, actualRecords);
+ Assert.assertTrue(equal);
}
- public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
- BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
- BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
- String lineExpected, lineActual;
- int num = 1;
- try {
- while ((lineExpected = readerExpected.readLine()) != null) {
- lineActual = readerActual.readLine();
- if (lineActual == null) {
- throw new Exception("Actual result changed at line " + num + ":\n< " + lineExpected + "\n> ");
+ public static boolean collectionEqual(Collection<Record> c1, Collection<Record> c2) {
+ for (Record r1 : c1) {
+ boolean exists = false;
+ for (Record r2 : c2) {
+ if (r1.equals(r2)) {
+ exists = true;
+ break;
}
- if (!equalStrings(lineExpected, lineActual)) {
- throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
- + lineActual);
- }
- ++num;
}
- lineActual = readerActual.readLine();
- if (lineActual != null) {
- throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineActual);
+ if (!exists) {
+ return false;
}
- } finally {
- readerExpected.close();
- readerActual.close();
}
- }
-
- private static boolean equalStrings(String s1, String s2) {
- String[] rowsOne = s1.split("\n");
- String[] rowsTwo = s2.split("\n");
-
- if (rowsOne.length != rowsTwo.length)
- return false;
-
- for (int i = 0; i < rowsOne.length; i++) {
- String row1 = rowsOne[i];
- String row2 = rowsTwo[i];
-
- if (row1.equals(row2))
- continue;
-
- boolean spaceOrTab = false;
- spaceOrTab = row1.contains(" ");
- String[] fields1 = spaceOrTab ? row1.split(" ") : row1.split("\t");
- String[] fields2 = spaceOrTab ? row2.split(" ") : row2.split("\t");
-
- for (int j = 0; j < fields1.length; j++) {
- if (fields1[j].equals(fields2[j])) {
- continue;
- } else if (fields1[j].indexOf('.') < 0) {
- return false;
- } else {
- Double double1 = Double.parseDouble(fields1[j]);
- Double double2 = Double.parseDouble(fields2[j]);
- float float1 = (float) double1.doubleValue();
- float float2 = (float) double2.doubleValue();
-
- if (Math.abs(float1 - float2) < 1.0e-7)
- continue;
- else {
- return false;
- }
+ for (Record r2 : c2) {
+ boolean exists = false;
+ for (Record r1 : c1) {
+ if (r2.equals(r1)) {
+ exists = true;
+ break;
}
}
+ if (!exists) {
+ return false;
+ }
}
return true;
}
+ public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
+ Collection<Record> expectedRecords = new ArrayList<Record>();
+ Collection<Record> actualRecords = new ArrayList<Record>();
+ populateResultFile(expectedRecords, expectedFile);
+ populateResultFile(actualRecords, actualFile);
+ boolean equal = expectedRecords.equals(actualRecords);
+ Assert.assertTrue(equal);
+ }
+
+ private static Collection<Record> loadRecords(File dir) throws Exception {
+ String[] fileNames = dir.list();
+ Collection<Record> records = new ArrayList<Record>();
+ for (String fileName : fileNames) {
+ if (fileName.startsWith(PREFIX)) {
+ File file = new File(dir, fileName);
+ populateResultFile(records, file);
+ }
+ }
+ return records;
+ }
+
+ private static void populateResultFile(Collection<Record> records, File file) throws FileNotFoundException,
+ IOException {
+ BufferedReader reader = new BufferedReader(new FileReader(file));
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ records.add(new Record(line));
+ }
+ reader.close();
+ }
+
}
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
index 591446c..3091c83 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
@@ -80,6 +80,7 @@
<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>pregelix.dynamicopt</name><value>true</value></property>
<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
<property><name>mapred.queue.names</name><value>default</value></property>
@@ -125,7 +126,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
index 32c2a1a..b6af65c 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
@@ -29,7 +29,6 @@
<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex$SimpleMinCombiner</value></property>
<property><name>mapred.output.compress</name><value>false</value></property>
<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
@@ -86,6 +85,7 @@
<property><name>fs.checkpoint.period</name><value>3600</value></property>
<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>pregelix.dynamicopt</name><value>true</value></property>
<property><name>fs.s3.maxRetries</name><value>4</value></property>
<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml b/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml
index d06068d..d908da8 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml
@@ -124,7 +124,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml b/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml
index 01d85a5..d5ec8f1 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml
@@ -124,7 +124,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml
index 072ea9e..b4c42e6 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml
@@ -124,7 +124,7 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.maximalclique.TextMaximalCliqueInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator,class edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml
index 3ae367d..6cf075b 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml
@@ -124,7 +124,7 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.maximalclique.TextMaximalCliqueInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator,class edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
index 6cb617f..49e2e6f 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
@@ -125,7 +125,7 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.maximalclique.TextMaximalCliqueInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator,class edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml
index 76d6e87..8316c64 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml
@@ -125,7 +125,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml
index 1f52250..a894ccd 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml
@@ -124,7 +124,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml
index 9d2c9e1..a9f8925 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml
@@ -1,146 +1,145 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>pregelix.updateIntensive</name><value>true</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>pregelix.framesize</name><value>2048</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>pregelix.numVertices</name><value>20</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.job.name</name><value>Message Overflow LSM</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>pregelix.incStateLength</name><value>true</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.output.dir</name><value>/result</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
<property><name>mapred.input.dir</name><value>file:/webmap</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex$SimpleMessageOverflowVertexOutputFormat</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Message Overflow LSM</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>pregelix.updateIntensive</name><value>true</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.framesize</name><value>2048</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex$SimpleMessageOverflowVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
index f1c27ca..d29b2da 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
@@ -125,7 +125,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimulatedPageRankVertexInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>pregelix.incStateLength</name><value>false</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
index dfb4e71..6fe04fb 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
@@ -30,7 +30,6 @@
<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimpleSumCombiner</value></property>
<property><name>mapred.output.compress</name><value>false</value></property>
<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
@@ -86,6 +85,7 @@
<property><name>fs.checkpoint.period</name><value>3600</value></property>
<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>pregelix.dynamicopt</name><value>true</value></property>
<property><name>fs.s3.maxRetries</name><value>4</value></property>
<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
index 49a6e20..d0f9759 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
@@ -30,7 +30,6 @@
<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimpleSumCombiner</value></property>
<property><name>mapred.output.compress</name><value>false</value></property>
<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
@@ -87,6 +86,7 @@
<property><name>fs.checkpoint.period</name><value>3600</value></property>
<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>pregelix.dynamicopt</name><value>true</value></property>
<property><name>fs.s3.maxRetries</name><value>4</value></property>
<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
index 789ea32..0173390 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
@@ -80,6 +80,7 @@
<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>pregelix.dynamicopt</name><value>true</value></property>
<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
<property><name>mapred.queue.names</name><value>default</value></property>
@@ -125,7 +126,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
index 796b1d1..a7a38e0 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
@@ -80,6 +80,7 @@
<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>pregelix.dynamicopt</name><value>true</value></property>
<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
<property><name>mapred.queue.names</name><value>default</value></property>
@@ -125,7 +126,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>pregelix.incStateLength</name><value>false</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
index 8834ead..225429a 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
@@ -1,146 +1,145 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityCombiner</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>pregelix.numVertices</name><value>23</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.job.name</name><value>Reachibility</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>ReachibilityVertex.sourceId</name><value>1</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
-<property><name>ReachibilityVertex.destId</name><value>10</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
<property><name>mapred.input.dir</name><value>file:/webmapcomplex</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityVertexOutputFormat</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Reachibility</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>23</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>ReachibilityVertex.destId</name><value>10</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityCombiner</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>ReachibilityVertex.sourceId</name><value>1</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml
index 234dbf9..bd9da92 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml
@@ -1,146 +1,145 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityCombiner</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>pregelix.numVertices</name><value>23</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.job.name</name><value>Reachibility</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>ReachibilityVertex.sourceId</name><value>1</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
-<property><name>ReachibilityVertex.destId</name><value>25</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
<property><name>mapred.input.dir</name><value>file:/webmapcomplex</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityVertexOutputFormat</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Reachibility</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>23</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>ReachibilityVertex.destId</name><value>25</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityCombiner</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>ReachibilityVertex.sourceId</name><value>1</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
index b1c57c5..87c35f7 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
@@ -126,9 +126,9 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimulatedPageRankVertexInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
<property><name>topology.script.number.args</name><value>100</value></property>
<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
index f1d2dc6..b757514 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
@@ -126,9 +126,9 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextShortestPathsInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
<property><name>topology.script.number.args</name><value>100</value></property>
<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml b/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml
index 951ac6f..80cea20 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml
@@ -123,9 +123,10 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.trianglecounting.TextTriangleCountingInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator,class edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingAggregator</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
<property><name>topology.script.number.args</name><value>100</value></property>
<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>