Merged hyracks master back into VXQuery branch.
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
index aea04b2..55da00e 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
@@ -38,6 +38,8 @@
public class AssignPOperator extends AbstractPhysicalOperator {
+ private boolean flushFramesRapidly;
+
@Override
public PhysicalOperatorTag getOperatorTag() {
return PhysicalOperatorTag.ASSIGN;
@@ -76,7 +78,8 @@
// TODO push projections into the operator
int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
- AssignRuntimeFactory runtime = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
+ AssignRuntimeFactory runtime = new AssignRuntimeFactory(outColumns, evalFactories, projectionList,
+ flushFramesRapidly);
// contribute one Asterix framewriter
RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
@@ -92,4 +95,8 @@
return true;
}
+ public void setRapidFrameFlush(boolean flushFramesRapidly) {
+ this.flushFramesRapidly = flushFramesRapidly;
+ }
+
}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EliminateGroupByEmptyKeyRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EliminateGroupByEmptyKeyRule.java
new file mode 100644
index 0000000..e93fdd1
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EliminateGroupByEmptyKeyRule.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * This rule lift out the aggregate operator out from a group-by operator
+ * if the gby operator groups-by on empty key, e.g., the group-by variables are empty.
+ *
+ * @author yingyib
+ */
+public class EliminateGroupByEmptyKeyRule implements IAlgebraicRewriteRule {
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+ if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
+ return false;
+ }
+ GroupByOperator groupOp = (GroupByOperator) op;
+ List<LogicalVariable> groupVars = groupOp.getGbyVarList();
+ if (groupVars.size() > 0) {
+ return false;
+ }
+ List<ILogicalPlan> nestedPlans = groupOp.getNestedPlans();
+ if (nestedPlans.size() > 1) {
+ return false;
+ }
+ ILogicalPlan nestedPlan = nestedPlans.get(0);
+ if (nestedPlan.getRoots().size() > 1) {
+ return false;
+ }
+ Mutable<ILogicalOperator> topOpRef = nestedPlan.getRoots().get(0);
+ ILogicalOperator topOp = nestedPlan.getRoots().get(0).getValue();
+ Mutable<ILogicalOperator> nestedTupleSourceRef = getNestedTupleSourceReference(topOpRef);
+ /**
+ * connect nested top op into the plan
+ */
+ opRef.setValue(topOp);
+ /**
+ * connect child op into the plan
+ */
+ nestedTupleSourceRef.setValue(groupOp.getInputs().get(0).getValue());
+ return true;
+ }
+
+ private Mutable<ILogicalOperator> getNestedTupleSourceReference(Mutable<ILogicalOperator> nestedTopOperatorRef) {
+ Mutable<ILogicalOperator> currentOpRef = nestedTopOperatorRef;
+ while (currentOpRef.getValue().getInputs() != null && currentOpRef.getValue().getInputs().size() > 0) {
+ currentOpRef = currentOpRef.getValue().getInputs().get(0);
+ }
+ return currentOpRef;
+ }
+
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/LeftOuterJoinToInnerJoinRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/LeftOuterJoinToInnerJoinRule.java
new file mode 100644
index 0000000..247c10d
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/LeftOuterJoinToInnerJoinRule.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * This rule is to convert an outer join into an inner join when possible.
+ *
+ * The specific pattern this rule will invoke for is:
+ * select not(is-null($v)) // $v is from the right branch of the left outer join below
+ * left-outer-join
+ *
+ * The pattern will be rewritten to:
+ * inner-join
+ *
+ * @author yingyib
+ */
+public class LeftOuterJoinToInnerJoinRule implements IAlgebraicRewriteRule {
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+ if (op.getOperatorTag() != LogicalOperatorTag.SELECT) {
+ return false;
+ }
+ Mutable<ILogicalOperator> op2Ref = op.getInputs().get(0);
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op2Ref.getValue();
+ if (op2.getOperatorTag() != LogicalOperatorTag.LEFTOUTERJOIN) {
+ return false;
+ }
+ SelectOperator selectOp = (SelectOperator) op;
+ LeftOuterJoinOperator joinOp = (LeftOuterJoinOperator) op2;
+ ILogicalExpression condition = selectOp.getCondition().getValue();
+ if (condition.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ return false;
+ }
+ ScalarFunctionCallExpression func = (ScalarFunctionCallExpression) condition;
+ /** check if the filter condition on top of the LOJ is not(is-null($v)), where $v is from the right child of LOJ */
+ if (!convertable(func, joinOp)) {
+ return false;
+ }
+ ILogicalOperator newJoin = new InnerJoinOperator(joinOp.getCondition(), joinOp.getInputs().get(0), joinOp
+ .getInputs().get(1));
+ opRef.setValue(newJoin);
+ context.computeAndSetTypeEnvironmentForOperator(newJoin);
+ return true;
+ }
+
+ /**
+ * check if the condition is not(is-null(var)) and var is from the right branch of the join
+ */
+ private boolean convertable(ScalarFunctionCallExpression func, LeftOuterJoinOperator join)
+ throws AlgebricksException {
+ if (func.getFunctionIdentifier() != AlgebricksBuiltinFunctions.NOT) {
+ return false;
+ }
+ ILogicalExpression arg = func.getArguments().get(0).getValue();
+ if (arg.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ return false;
+ }
+ ScalarFunctionCallExpression func2 = (ScalarFunctionCallExpression) arg;
+ if (func2.getFunctionIdentifier() != AlgebricksBuiltinFunctions.IS_NULL) {
+ return false;
+ }
+ ILogicalExpression arg2 = func2.getArguments().get(0).getValue();
+ if (arg2.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ return false;
+ }
+ VariableReferenceExpression varExpr = (VariableReferenceExpression) arg2;
+ LogicalVariable var = varExpr.getVariableReference();
+ ListSet<LogicalVariable> leftVars = new ListSet<LogicalVariable>();
+ ListSet<LogicalVariable> rightVars = new ListSet<LogicalVariable>();
+ VariableUtilities.getLiveVariables(join.getInputs().get(0).getValue(), leftVars);
+ VariableUtilities.getLiveVariables(join.getInputs().get(1).getValue(), rightVars);
+ if (!rightVars.contains(var)) {
+ return false;
+ }
+ return true;
+ }
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 082e98a..6c1dd5e 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -43,6 +43,10 @@
}
protected void appendToFrameFromTupleBuilder(ArrayTupleBuilder tb) throws HyracksDataException {
+ appendToFrameFromTupleBuilder(tb, false);
+ }
+
+ protected void appendToFrameFromTupleBuilder(ArrayTupleBuilder tb, boolean flushFrame) throws HyracksDataException {
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
FrameUtils.flushFrame(frame, writer);
appender.reset(frame, true);
@@ -51,6 +55,10 @@
"Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendToFrameFromTupleBuilder).");
}
}
+ if (flushFrame) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ }
}
protected void appendProjectionToFrame(int tIndex, int[] projectionList) throws HyracksDataException {
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
index fa99c15..fb889ea 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -36,6 +36,7 @@
private int[] outColumns;
private IScalarEvaluatorFactory[] evalFactories;
+ private final boolean flushFramesRapidly;
/**
* @param outColumns
@@ -46,9 +47,15 @@
*/
public AssignRuntimeFactory(int[] outColumns, IScalarEvaluatorFactory[] evalFactories, int[] projectionList) {
+ this(outColumns, evalFactories, projectionList, false);
+ }
+
+ public AssignRuntimeFactory(int[] outColumns, IScalarEvaluatorFactory[] evalFactories, int[] projectionList,
+ boolean flushFramesRapidly) {
super(projectionList);
this.outColumns = outColumns;
this.evalFactories = evalFactories;
+ this.flushFramesRapidly = flushFramesRapidly;
}
@Override
@@ -107,9 +114,22 @@
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
tAccess.reset(buffer);
int nTuple = tAccess.getTupleCount();
- for (int t = 0; t < nTuple; t++) {
- tRef.reset(tAccess, t);
- produceTuple(tupleBuilder, tAccess, t, tRef);
+ int t = 0;
+ if (nTuple > 1) {
+ for (; t < nTuple - 1; t++) {
+ tRef.reset(tAccess, t);
+ produceTuple(tupleBuilder, tAccess, t, tRef);
+ appendToFrameFromTupleBuilder(tupleBuilder);
+ }
+ }
+
+ tRef.reset(tAccess, t);
+ produceTuple(tupleBuilder, tAccess, t, tRef);
+ if (flushFramesRapidly) {
+ // Whenever all the tuples in the incoming frame have been consumed, the assign operator
+ // will push its frame to the next operator; i.e., it won't wait until the frame gets full.
+ appendToFrameFromTupleBuilder(tupleBuilder, true);
+ } else {
appendToFrameFromTupleBuilder(tupleBuilder);
}
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
index 7f10948..3e87f31 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -34,8 +34,7 @@
}
public StreamProjectRuntimeFactory(int[] projectionList) {
- super(projectionList);
- this.flushFramesRapidly = false;
+ this(projectionList, false);
}
@Override
@@ -66,8 +65,10 @@
int nTuple = tAccess.getTupleCount();
int t = 0;
- for (; t < nTuple - 1; t++) {
- appendProjectionToFrame(t, projectionList);
+ if (nTuple > 1) {
+ for (; t < nTuple - 1; t++) {
+ appendProjectionToFrame(t, projectionList);
+ }
}
if (flushFramesRapidly) {
// Whenever all the tuples in the incoming frame have been consumed, the project operator
@@ -76,10 +77,8 @@
} else {
appendProjectionToFrame(t, projectionList);
}
-
}
};
}
-
}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParser.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParser.java
index 1bf4abe..833daf4 100644
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParser.java
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParser.java
@@ -166,7 +166,7 @@
}
@Override
- public void parse(K key, V value, IFrameWriter writer) throws HyracksDataException {
+ public void parse(K key, V value, IFrameWriter writer, String fileString) throws HyracksDataException {
try {
tb.reset();
if (parser != null) {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ExceptionFilterUtils.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ExceptionFilterUtils.java
new file mode 100644
index 0000000..44f3bec
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ExceptionFilterUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.job;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author yingyib
+ */
+public class ExceptionFilterUtils {
+
+ public static List<Exception> getActualExceptions(List<Exception> allExceptions) {
+ List<Exception> exceptions = new ArrayList<Exception>();
+ for (Exception exception : allExceptions) {
+ if (possibleRootCause(exception)) {
+ exceptions.add(exception);
+ }
+ }
+ return exceptions;
+ }
+
+ private static boolean possibleRootCause(Throwable exception) {
+ Throwable cause = exception;
+ while ((cause = cause.getCause()) != null) {
+ if (cause instanceof java.lang.InterruptedException
+ || cause instanceof java.nio.channels.ClosedChannelException) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 9cd7886..2166620 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -347,7 +347,9 @@
taskAttempt.put("end-time", ta.getEndTime());
List<Exception> exceptions = ta.getExceptions();
if (exceptions != null && !exceptions.isEmpty()) {
- for(Exception exception : exceptions){
+ List<Exception> filteredExceptions = ExceptionFilterUtils
+ .getActualExceptions(exceptions);
+ for (Exception exception : filteredExceptions) {
StringWriter exceptionWriter = new StringWriter();
exception.printStackTrace(new PrintWriter(exceptionWriter));
taskAttempt.put("failure-details", exceptionWriter.toString());
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index 3b29b52..6e8ddf0 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -59,7 +59,9 @@
}
Set<String> targetNodes = run.getParticipatingNodeIds();
run.getCleanupPendingNodeIds().addAll(targetNodes);
- run.setPendingStatus(status, exceptions);
+ if (run.getPendingStatus() != JobStatus.FAILURE && run.getPendingStatus() != JobStatus.TERMINATED) {
+ run.setPendingStatus(status, exceptions);
+ }
if (targetNodes != null && !targetNodes.isEmpty()) {
for (String n : targetNodes) {
NodeControllerState ncs = ccs.getNodeMap().get(n);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index d02d65c..60e9c40 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -119,7 +119,7 @@
accessorBuild.reset(buffers.get(bIndex));
int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
if (c == 0) {
- boolean predEval = ( (predEvaluator == null) || predEvaluator.evaluate(accessorProbe, i, accessorBuild, tIndex) );
+ boolean predEval = evaluatePredicate(i, tIndex);
if(predEval){
matchFound = true;
appendToResult(i, tIndex, writer);
@@ -155,6 +155,15 @@
buffer.position(0);
buffer.limit(buffer.capacity());
}
+
+ private boolean evaluatePredicate(int tIx1, int tIx2){
+ if(reverseOutputOrder){ //Role Reversal Optimization is triggered
+ return ( (predEvaluator == null) || predEvaluator.evaluate(accessorBuild, tIx2, accessorProbe, tIx1) );
+ }
+ else {
+ return ( (predEvaluator == null) || predEvaluator.evaluate(accessorProbe, tIx1, accessorBuild, tIx2) );
+ }
+ }
private void appendToResult(int probeSidetIx, int buildSidetIx, IFrameWriter writer) throws HyracksDataException {
if (!reverseOutputOrder) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
index 2f719fa..979ef59 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -49,6 +49,8 @@
private final boolean isLeftOuter;
private final ArrayTupleBuilder nullTupleBuilder;
private final IPredicateEvaluator predEvaluator;
+ private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls (in OptimizedHybridHashJoin) that cause role-reversal
+
public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessor0, FrameTupleAccessor accessor1,
ITuplePairComparator comparators, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter, INullWriter[] nullWriters1)
@@ -63,6 +65,7 @@
this.outBuffers = new ArrayList<ByteBuffer>();
this.memSize = memSize;
this.predEvaluator = predEval;
+ this.isReversed = false;
this.ctx = ctx;
this.isLeftOuter = isLeftOuter;
@@ -133,7 +136,7 @@
boolean matchFound = false;
for (int j = 0; j < tupleCount1; ++j) {
int c = compare(accessorOuter, i, accessorInner, j);
- boolean prdEval = (predEvaluator == null) || (predEvaluator.evaluate(accessorOuter, i, accessorInner, j));
+ boolean prdEval = evaluatePredicate(i, j);
if (c == 0 && prdEval) {
matchFound = true;
if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
@@ -165,6 +168,15 @@
}
}
}
+
+ private boolean evaluatePredicate(int tIx1, int tIx2){
+ if(isReversed){ //Role Reversal Optimization is triggered
+ return ( (predEvaluator == null) || predEvaluator.evaluate(accessorInner, tIx2, accessorOuter, tIx1) );
+ }
+ else {
+ return ( (predEvaluator == null) || predEvaluator.evaluate(accessorOuter, tIx1, accessorInner, tIx2) );
+ }
+ }
public void closeCache() throws HyracksDataException {
if (runFileWriter != null) {
@@ -206,4 +218,8 @@
}
return 0;
}
+
+ public void setIsReversed(boolean b){
+ this.isReversed = b;
+ }
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index cd32c81..6bc810e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -99,6 +99,7 @@
private int freeFramesCounter; //Used for partition tuning
private boolean isTableEmpty; //Added for handling the case, where build side is empty (tableSize is 0)
+ private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls that cause role-reversal
public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String rel0Name,
String rel1Name, int[] keys0, int[] keys1, IBinaryComparator[] comparators, RecordDescriptor buildRd,
@@ -125,6 +126,7 @@
this.predEvaluator = predEval;
this.isLeftOuter = false;
this.nullWriters1 = null;
+ this.isReversed = false;
}
@@ -153,7 +155,8 @@
this.predEvaluator = predEval;
this.isLeftOuter = isLeftOuter;
-
+ this.isReversed = false;
+
this.nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
if (isLeftOuter) {
for (int i = 0; i < nullWriterFactories1.length; i++) {
@@ -441,7 +444,7 @@
this.inMemJoiner = new InMemoryHashJoin(ctx, inMemTupCount,
new FrameTupleAccessor(ctx.getFrameSize(), probeRd), probeHpc, new FrameTupleAccessor(
ctx.getFrameSize(), buildRd), buildHpc, new FrameTuplePairComparator(probeKeys, buildKeys,
- comparators), isLeftOuter, nullWriters1, table, predEvaluator);
+ comparators), isLeftOuter, nullWriters1, table, predEvaluator, isReversed);
}
private void cacheInMemJoin() throws HyracksDataException {
@@ -639,4 +642,8 @@
public boolean isTableEmpty() {
return this.isTableEmpty;
}
+
+ public void setIsReversed(boolean b){
+ this.isReversed = b;
+ }
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 95b7a3c..4e9376d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -19,6 +19,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.BitSet;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
@@ -117,6 +119,8 @@
private final boolean isLeftOuter;
private final INullWriterFactory[] nullWriterFactories1;
+
+ private static final Logger LOGGER = Logger.getLogger(OptimizedHybridHashJoinOperatorDescriptor.class.getName());
public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories,
@@ -139,8 +143,6 @@
this.predEvaluatorFactory = predEvaluatorFactory;
this.isLeftOuter = isLeftOuter;
this.nullWriterFactories1 = nullWriterFactories1;
-
-
}
public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
@@ -207,7 +209,7 @@
}
public static class BuildAndPartitionTaskState extends AbstractStateObject {
-
+
private int memForJoin;
private int numOfPartitions;
private OptimizedHybridHashJoin hybridHJ;
@@ -303,6 +305,7 @@
public void close() throws HyracksDataException {
state.hybridHJ.closeBuild();
ctx.setStateObject(state);
+ LOGGER.log(Level.FINE, "OptimizedHybridHashJoin closed its build phase");
}
@Override
@@ -323,7 +326,7 @@
* Hybrid Hash Join recursively on them.
*/
private class ProbeAndJoinActivityNode extends AbstractActivityNode {
-
+
private static final long serialVersionUID = 1L;
private final ActivityId buildAid;
@@ -423,9 +426,11 @@
hashFunctionGeneratorFactories).createPartitioner(level);
ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys,
hashFunctionGeneratorFactories).createPartitioner(level);
-
+
long buildPartSize = ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize();
long probePartSize = ohhj.getProbePartitionSize(pid) / ctx.getFrameSize();
+
+ LOGGER.log(Level.FINE,"Joining Partition Pairs (pid "+pid+") - (level "+level+") - BuildSize:\t"+buildPartSize+"\tProbeSize:\t"+probePartSize+" - MemForJoin "+(state.memForJoin));
//Apply in-Mem HJ if possible
if ((buildPartSize < state.memForJoin) || (probePartSize < state.memForJoin)) {
@@ -460,7 +465,7 @@
else {
OptimizedHybridHashJoin rHHj;
if (isLeftOuter || buildPartSize < probePartSize) { //Build Side is smaller
-
+ LOGGER.log(Level.FINE,"\tApply RecursiveHHJ for (pid "+pid+") - (level "+level+") [buildSize is smaller]");
int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor,
nPartitions);
@@ -503,6 +508,7 @@
}
} else { //Switch to NLJ (Further recursion seems not to be useful)
+ LOGGER.log(Level.FINE,"\tSwitched to NLJ for (pid "+pid+") - (level "+level+") (reverse false) [coming from buildSize was smaller]");
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -515,19 +521,21 @@
int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
if (isLeftOuter || buildSideInTups < probeSideInTups) {
applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rbrfw, rprfw,
- nljComparator0);
+ nljComparator0, false);
} else {
applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rprfw, rbrfw,
- nljComparator1);
+ nljComparator1, false);
}
}
}
} else { //Role Reversal (Probe Side is smaller)
+ LOGGER.log(Level.FINE,"\tApply RecursiveHHJ for (pid "+pid+") - (level "+level+") WITH REVERSAL [probeSize is smaller]");
int n = getNumberOfPartitions(state.memForJoin, (int) probePartSize, fudgeFactor,
nPartitions);
rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, BUILD_REL, PROBE_REL,
buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc, predEvaluator);
+ rHHj.setIsReversed(true); //Added to use predicateEvaluator (for inMemoryHashJoin) correctly
probeSideReader.open();
rHHj.initBuild();
@@ -561,7 +569,8 @@
joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1));
}
} else { //Switch to NLJ (Further recursion seems not to be effective)
- for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+ LOGGER.log(Level.FINE,"\tSwitched to NLJ for (pid "+pid+") - (level "+level+") (reverse true) [coming from probeSize was smaller]");
+ for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -573,10 +582,10 @@
long probeSideSize = rprfw.getFileSize();
if (buildSideSize > probeSideSize) {
applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rbrfw, rprfw,
- nljComparator1);
+ nljComparator1, true);
} else {
applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rprfw, rbrfw,
- nljComparator0);
+ nljComparator0, true);
}
}
}
@@ -590,7 +599,7 @@
RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepLarger,
ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader, boolean reverse, int pid)
throws HyracksDataException {
-
+ LOGGER.log(Level.FINE,"\t(pid "+pid+") - applyInMemHashJoin (reversal "+reverse+")");
ISerializableTable table = new SerializableHashTable(tabSize, ctx);
InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(
ctx.getFrameSize(), probeRDesc), hpcRepLarger, new FrameTupleAccessor(ctx.getFrameSize(),
@@ -619,9 +628,9 @@
}
private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
- RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljComparator)
+ RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljComparator, boolean reverse)
throws HyracksDataException {
-
+
NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize, predEvaluator, false, null);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
index 76f411b..f71ee1d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
@@ -17,7 +17,6 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.Random;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -47,10 +46,9 @@
private int dataFrameCount;
private int[] tPointers;
+ private int[] tPointersTemp;
private int tupleCount;
- private Random rand = new Random();
-
public FrameSorter(IHyracksTaskContext ctx, int[] sortFields,
INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDescriptor) throws HyracksDataException {
@@ -119,8 +117,8 @@
}
}
if (tupleCount > 0) {
- shuffle(tPointers, 4, tupleCount);
- sort(tPointers, 0, tupleCount);
+ tPointersTemp = new int[tPointers.length];
+ sort(0, tupleCount);
}
}
@@ -146,75 +144,73 @@
}
}
- private void sort(int[] tPointers, int offset, int length) {
- int m = offset + (length >> 1);
- int mi = tPointers[m * 4];
- int mj = tPointers[m * 4 + 1];
- int mv = tPointers[m * 4 + 3];
-
- int a = offset;
- int b = a;
- int c = offset + length - 1;
- int d = c;
- while (true) {
- while (b <= c) {
- int cmp = compare(tPointers, b, mi, mj, mv);
- if (cmp > 0) {
- break;
+ private void sort(int offset, int length) {
+ int step = 1;
+ int len = length;
+ int end = offset + len;
+ /** bottom-up merge */
+ while (step < len) {
+ /** merge */
+ for (int i = offset; i < end; i += 2 * step) {
+ int next = i + step;
+ if (next < end) {
+ merge(i, next, step, Math.min(step, end - next));
+ } else {
+ System.arraycopy(tPointers, i * 4, tPointersTemp, i * 4, (end - i) * 4);
}
- if (cmp == 0) {
- swap(tPointers, a++, b);
- }
- ++b;
}
- while (c >= b) {
- int cmp = compare(tPointers, c, mi, mj, mv);
- if (cmp < 0) {
- break;
- }
- if (cmp == 0) {
- swap(tPointers, c, d--);
- }
- --c;
+ /** prepare next phase merge */
+ step *= 2;
+ int[] tmp = tPointersTemp;
+ tPointersTemp = tPointers;
+ tPointers = tmp;
+ }
+ }
+
+ /** Merge two subarrays into one*/
+ private void merge(int start1, int start2, int len1, int len2) {
+ int targetPos = start1;
+ int pos1 = start1;
+ int pos2 = start2;
+ int end1 = start1 + len1 - 1;
+ int end2 = start2 + len2 - 1;
+ while (pos1 <= end1 && pos2 <= end2) {
+ int cmp = compare(pos1, pos2);
+ if (cmp <= 0) {
+ copy(pos1, targetPos);
+ pos1++;
+ } else {
+ copy(pos2, targetPos);
+ pos2++;
}
- if (b > c)
- break;
- swap(tPointers, b++, c--);
+ targetPos++;
}
-
- int s;
- int n = offset + length;
- s = Math.min(a - offset, b - a);
- vecswap(tPointers, offset, b - s, s);
- s = Math.min(d - c, n - d - 1);
- vecswap(tPointers, b, n - s, s);
-
- if ((s = b - a) > 1) {
- sort(tPointers, offset, s);
+ if (pos1 <= end1) {
+ int rest = end1 - pos1 + 1;
+ System.arraycopy(tPointers, pos1 * 4, tPointersTemp, targetPos * 4, rest * 4);
}
- if ((s = d - c) > 1) {
- sort(tPointers, n - s, s);
+ if (pos2 <= end2) {
+ int rest = end2 - pos2 + 1;
+ System.arraycopy(tPointers, pos2 * 4, tPointersTemp, targetPos * 4, rest * 4);
}
}
- private void swap(int x[], int a, int b) {
- for (int i = 0; i < 4; ++i) {
- int t = x[a * 4 + i];
- x[a * 4 + i] = x[b * 4 + i];
- x[b * 4 + i] = t;
- }
+ private void copy(int src, int dest) {
+ tPointersTemp[dest * 4] = tPointers[src * 4];
+ tPointersTemp[dest * 4 + 1] = tPointers[src * 4 + 1];
+ tPointersTemp[dest * 4 + 2] = tPointers[src * 4 + 2];
+ tPointersTemp[dest * 4 + 3] = tPointers[src * 4 + 3];
}
- private void vecswap(int x[], int a, int b, int n) {
- for (int i = 0; i < n; i++, a++, b++) {
- swap(x, a, b);
- }
- }
-
- private int compare(int[] tPointers, int tp1, int tp2i, int tp2j, int tp2v) {
+ private int compare(int tp1, int tp2) {
int i1 = tPointers[tp1 * 4];
int j1 = tPointers[tp1 * 4 + 1];
int v1 = tPointers[tp1 * 4 + 3];
+
+ int tp2i = tPointers[tp2 * 4];
+ int tp2j = tPointers[tp2 * 4 + 1];
+ int tp2v = tPointers[tp2 * 4 + 3];
+
if (v1 != tp2v) {
return ((((long) v1) & 0xffffffffL) < (((long) tp2v) & 0xffffffffL)) ? -1 : 1;
}
@@ -244,18 +240,6 @@
return 0;
}
- private void shuffle(int[] tPointers, int interval, int tupleCount) {
- for (int i = tupleCount; i > 1; i--) {
- int next = rand.nextInt(i) * interval;
- int target = (i - 1) * interval;
- for (int j = 0; j < interval; j++) {
- int drawn = tPointers[next + j];
- tPointers[next + j] = tPointers[target + j];
- tPointers[target + j] = drawn;
- }
- }
- }
-
public void close() {
this.buffers.clear();
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
index 032d50d..b51d41d 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
@@ -1,19 +1,15 @@
<?xml version="1.0"?>
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!-- ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License"); ! you may
+ not use this file except in compliance with the License. ! you may obtain
+ a copy of the License from ! ! http://www.apache.org/licenses/LICENSE-2.0
+ ! ! Unless required by applicable law or agreed to in writing, software !
+ distributed under the License is distributed on an "AS IS" BASIS, ! WITHOUT
+ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ! See the
+ License for the specific language governing permissions and ! limitations
+ under the License. ! -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>hyracks-hdfs-core</artifactId>
<name>hyracks-hdfs-core</name>
@@ -66,6 +62,18 @@
</filesets>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.2</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
index 1852a6f..57c20e0 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
@@ -38,18 +38,13 @@
public void open(IFrameWriter writer) throws HyracksDataException;
/**
- * Parse a key-value pair returned by HDFS record reader to a tuple.
- * when the parsers' internal buffer is full, it can flush the buffer to the writer
- *
* @param key
- * The key returned from Hadoop's InputReader.
* @param value
- * The value returned from Hadoop's InputReader.
* @param writer
- * The hyracks writer for outputting data.
+ * @param fileName
* @throws HyracksDataException
*/
- public void parse(K key, V value, IFrameWriter writer) throws HyracksDataException;
+ public void parse(K key, V value, IFrameWriter writer, String fileString) throws HyracksDataException;
/**
* Flush the residual tuples in the internal buffer to the writer.
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
index 674873d..7e6e4dc 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
@@ -30,6 +30,6 @@
* the IHyracksTaskContext
* @return a tuple writer instance
*/
- public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException;
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx, int partition, int nPartition) throws HyracksDataException;
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
index 2cff534..a45992c 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
@@ -128,7 +128,7 @@
Object key = reader.createKey();
Object value = reader.createValue();
while (reader.next(key, value) == true) {
- parser.parse(key, value, writer);
+ parser.parse(key, value, writer, inputSplits[i].toString());
}
}
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
index 432849b..4e48e9b 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
@@ -89,7 +89,7 @@
String outputDirPath = FileOutputFormat.getOutputPath(conf).toString();
String fileName = outputDirPath + File.separator + "part-" + partition;
- tupleWriter = tupleWriterFactory.getTupleWriter(ctx);
+ tupleWriter = tupleWriterFactory.getTupleWriter(ctx, partition, nPartitions);
try {
FileSystem dfs = FileSystem.get(conf);
dos = dfs.create(new Path(fileName), true);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
index fbac95b..92cde9d 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
@@ -49,7 +49,8 @@
}
@Override
- public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
+ public void parse(LongWritable key, Text value, IFrameWriter writer, String fileString)
+ throws HyracksDataException {
tb.reset();
tb.addField(value.getBytes(), 0, value.getLength());
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
index 92a427e..60be1f7 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
@@ -27,7 +27,7 @@
private static final long serialVersionUID = 1L;
@Override
- public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) {
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx, int partition, int nPartition) {
return new ITupleWriter() {
private byte newLine = "\n".getBytes()[0];
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
index 3f01d77..43ca4ac 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -144,7 +144,8 @@
RecordReader reader = inputFormat.createRecordReader(inputSplits.get(i), context);
reader.initialize(inputSplits.get(i), context);
while (reader.nextKeyValue() == true) {
- parser.parse(reader.getCurrentKey(), reader.getCurrentValue(), writer);
+ parser.parse(reader.getCurrentKey(), reader.getCurrentValue(), writer,
+ inputSplits.get(i).toString());
}
}
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
index 77b8c7e..068cdfc 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -88,7 +88,7 @@
String outputPath = FileOutputFormat.getOutputPath(conf).toString();
String fileName = outputPath + File.separator + "part-" + partition;
- tupleWriter = tupleWriterFactory.getTupleWriter(ctx);
+ tupleWriter = tupleWriterFactory.getTupleWriter(ctx, partition, nPartitions);
try {
FileSystem dfs = FileSystem.get(conf.getConfiguration());
dos = dfs.create(new Path(fileName), true);
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeNonExistentKeyException.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeNonExistentKeyException.java
deleted file mode 100644
index 7237046..0000000
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeNonExistentKeyException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.storage.am.btree.exceptions;
-
-public class BTreeNonExistentKeyException extends BTreeException {
-
- private static final long serialVersionUID = 1L;
-
- public BTreeNonExistentKeyException(Exception e) {
- super(e);
- }
-
- public BTreeNonExistentKeyException(String message) {
- super(message);
- }
-}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeFieldPrefixNSMLeafFrame.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeFieldPrefixNSMLeafFrame.java
index b503c8b..93cde3d 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeFieldPrefixNSMLeafFrame.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeFieldPrefixNSMLeafFrame.java
@@ -25,8 +25,6 @@
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
import edu.uci.ics.hyracks.storage.am.btree.api.IPrefixSlotManager;
import edu.uci.ics.hyracks.storage.am.btree.compressors.FieldPrefixCompressor;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeOpContext.PageValidationInfo;
import edu.uci.ics.hyracks.storage.am.btree.impls.FieldPrefixPrefixTupleReference;
import edu.uci.ics.hyracks.storage.am.btree.impls.FieldPrefixSlotManager;
@@ -37,6 +35,8 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.FindTupleMode;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
@@ -370,7 +370,7 @@
int tupleIndex = slotManager.decodeSecondSlotField(slot);
// Error indicator is set if there is an exact match.
if (tupleIndex == slotManager.getErrorIndicator()) {
- throw new BTreeDuplicateKeyException("Trying to insert duplicate key into leaf node.");
+ throw new TreeIndexDuplicateKeyException("Trying to insert duplicate key into leaf node.");
}
return slot;
}
@@ -382,7 +382,7 @@
int tupleIndex = slotManager.decodeSecondSlotField(slot);
// Error indicator is set if there is an exact match.
if (tupleIndex == slotManager.getErrorIndicator()) {
- throw new BTreeDuplicateKeyException("Trying to insert duplicate key into leaf node.");
+ throw new TreeIndexDuplicateKeyException("Trying to insert duplicate key into leaf node.");
}
return slot;
}
@@ -411,7 +411,7 @@
int tupleIndex = slotManager.decodeSecondSlotField(slot);
// Error indicator is set if there is no exact match.
if (tupleIndex == slotManager.getErrorIndicator()) {
- throw new BTreeNonExistentKeyException("Trying to update a tuple with a nonexistent key in leaf node.");
+ throw new TreeIndexNonExistentKeyException("Trying to update a tuple with a nonexistent key in leaf node.");
}
return slot;
}
@@ -423,7 +423,7 @@
int tupleIndex = slotManager.decodeSecondSlotField(slot);
// Error indicator is set if there is no exact match.
if (tupleIndex == slotManager.getErrorIndicator()) {
- throw new BTreeNonExistentKeyException("Trying to delete a tuple with a nonexistent key in leaf node.");
+ throw new TreeIndexNonExistentKeyException("Trying to delete a tuple with a nonexistent key in leaf node.");
}
return slot;
}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMLeafFrame.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMLeafFrame.java
index 1974989..187cd52 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMLeafFrame.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMLeafFrame.java
@@ -20,14 +20,14 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeOpContext.PageValidationInfo;
import edu.uci.ics.hyracks.storage.am.common.api.ISplitKey;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.common.frames.TreeIndexNSMFrame;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.FindTupleMode;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
@@ -72,7 +72,7 @@
FindTupleNoExactMatchPolicy.HIGHER_KEY);
// Error indicator is set if there is an exact match.
if (tupleIndex == slotManager.getErrorIndicator()) {
- throw new BTreeDuplicateKeyException("Trying to insert duplicate key into leaf node.");
+ throw new TreeIndexDuplicateKeyException("Trying to insert duplicate key into leaf node.");
}
return tupleIndex;
}
@@ -83,7 +83,7 @@
FindTupleNoExactMatchPolicy.HIGHER_KEY);
// Error indicator is set if there is no exact match.
if (tupleIndex == slotManager.getErrorIndicator() || tupleIndex == slotManager.getGreatestKeyIndicator()) {
- throw new BTreeNonExistentKeyException("Trying to update a tuple with a nonexistent key in leaf node.");
+ throw new TreeIndexNonExistentKeyException("Trying to update a tuple with a nonexistent key in leaf node.");
}
return tupleIndex;
}
@@ -121,7 +121,7 @@
FindTupleNoExactMatchPolicy.HIGHER_KEY);
// Error indicator is set if there is no exact match.
if (tupleIndex == slotManager.getErrorIndicator() || tupleIndex == slotManager.getGreatestKeyIndicator()) {
- throw new BTreeNonExistentKeyException("Trying to delete a tuple with a nonexistent key in leaf node.");
+ throw new TreeIndexNonExistentKeyException("Trying to delete a tuple with a nonexistent key in leaf node.");
}
return tupleIndex;
}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
index 689843b..ff94040 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
@@ -33,7 +33,6 @@
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
import edu.uci.ics.hyracks.storage.am.btree.api.ITupleAcceptor;
import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeException;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNotUpdateableException;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrame;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeOpContext.PageValidationInfo;
@@ -53,6 +52,8 @@
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
import edu.uci.ics.hyracks.storage.am.common.api.UnsortedInputException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
import edu.uci.ics.hyracks.storage.am.common.impls.AbstractTreeIndex;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -562,7 +563,7 @@
// This means that there could be underflow, even an empty page that is
// pointed to by an interior node.
if (ctx.leafFrame.getTupleCount() == 0) {
- throw new BTreeNonExistentKeyException("Trying to delete a tuple with a nonexistent key in leaf node.");
+ throw new TreeIndexNonExistentKeyException("Trying to delete a tuple with a nonexistent key in leaf node.");
}
int tupleIndex = ctx.leafFrame.findDeleteTupleIndex(tuple);
ITupleReference beforeTuple = ctx.leafFrame.getMatchingKeyTuple(tuple, tupleIndex);
@@ -1024,9 +1025,12 @@
protected void verifyInputTuple(ITupleReference tuple, ITupleReference prevTuple) throws IndexException,
HyracksDataException {
// New tuple should be strictly greater than last tuple.
- if (cmp.compare(tuple, prevTuple) <= 0) {
- throw new UnsortedInputException(
- "Input stream given to BTree bulk load is not sorted or has duplicates.");
+ int cmpResult = cmp.compare(tuple, prevTuple);
+ if (cmpResult < 0) {
+ throw new UnsortedInputException("Input stream given to BTree bulk load is not sorted.");
+ }
+ if (cmpResult == 0) {
+ throw new TreeIndexDuplicateKeyException("Input stream given to BTree bulk load has duplicates.");
}
}
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
index 4a9cfb4..092fada 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
@@ -30,6 +30,8 @@
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilter;
import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilterFactory;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
@@ -97,7 +99,11 @@
switch (op) {
case INSERT: {
- indexAccessor.insert(tuple);
+ try {
+ indexAccessor.insert(tuple);
+ } catch (TreeIndexDuplicateKeyException e) {
+ // ingnore that exception to allow inserting existing keys which becomes an NoOp
+ }
break;
}
case UPDATE: {
@@ -109,7 +115,11 @@
break;
}
case DELETE: {
- indexAccessor.delete(tuple);
+ try {
+ indexAccessor.delete(tuple);
+ } catch (TreeIndexNonExistentKeyException e) {
+ // ingnore that exception to allow deletions of non-existing keys
+ }
break;
}
default: {
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeDuplicateKeyException.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/exceptions/TreeIndexDuplicateKeyException.java
similarity index 70%
rename from hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeDuplicateKeyException.java
rename to hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/exceptions/TreeIndexDuplicateKeyException.java
index cde5022..1767504 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeDuplicateKeyException.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/exceptions/TreeIndexDuplicateKeyException.java
@@ -13,16 +13,18 @@
* limitations under the License.
*/
-package edu.uci.ics.hyracks.storage.am.btree.exceptions;
+package edu.uci.ics.hyracks.storage.am.common.exceptions;
-public class BTreeDuplicateKeyException extends BTreeException {
+import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+
+public class TreeIndexDuplicateKeyException extends TreeIndexException {
private static final long serialVersionUID = 1L;
- public BTreeDuplicateKeyException(Exception e) {
+ public TreeIndexDuplicateKeyException(Exception e) {
super(e);
}
- public BTreeDuplicateKeyException(String message) {
+ public TreeIndexDuplicateKeyException(String message) {
super(message);
}
}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeDuplicateKeyException.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/exceptions/TreeIndexNonExistentKeyException.java
similarity index 61%
copy from hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeDuplicateKeyException.java
copy to hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/exceptions/TreeIndexNonExistentKeyException.java
index cde5022..8b62063 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeDuplicateKeyException.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/exceptions/TreeIndexNonExistentKeyException.java
@@ -13,16 +13,19 @@
* limitations under the License.
*/
-package edu.uci.ics.hyracks.storage.am.btree.exceptions;
+package edu.uci.ics.hyracks.storage.am.common.exceptions;
-public class BTreeDuplicateKeyException extends BTreeException {
- private static final long serialVersionUID = 1L;
+import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
- public BTreeDuplicateKeyException(Exception e) {
- super(e);
- }
+public class TreeIndexNonExistentKeyException extends TreeIndexException {
- public BTreeDuplicateKeyException(String message) {
- super(message);
- }
+ private static final long serialVersionUID = 1L;
+
+ public TreeIndexNonExistentKeyException(Exception e) {
+ super(e);
+ }
+
+ public TreeIndexNonExistentKeyException(String message) {
+ super(message);
+ }
}
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index 19d40a0..a85a174 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -12,352 +12,356 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package edu.uci.ics.hyracks.storage.am.common.impls;
-
-import java.util.ArrayList;
-
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
-import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
-import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
-import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
-import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
-import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
-
-public abstract class AbstractTreeIndex implements ITreeIndex {
-
- protected final static int rootPage = 1;
-
- protected final IBufferCache bufferCache;
- protected final IFileMapProvider fileMapProvider;
- protected final IFreePageManager freePageManager;
-
- protected final ITreeIndexFrameFactory interiorFrameFactory;
- protected final ITreeIndexFrameFactory leafFrameFactory;
-
- protected final IBinaryComparatorFactory[] cmpFactories;
- protected final int fieldCount;
-
- protected FileReference file;
- protected int fileId = -1;
-
- private boolean isActivated = false;
-
- public AbstractTreeIndex(IBufferCache bufferCache, IFileMapProvider fileMapProvider,
- IFreePageManager freePageManager, ITreeIndexFrameFactory interiorFrameFactory,
- ITreeIndexFrameFactory leafFrameFactory, IBinaryComparatorFactory[] cmpFactories, int fieldCount,
- FileReference file) {
- this.bufferCache = bufferCache;
- this.fileMapProvider = fileMapProvider;
- this.freePageManager = freePageManager;
- this.interiorFrameFactory = interiorFrameFactory;
- this.leafFrameFactory = leafFrameFactory;
- this.cmpFactories = cmpFactories;
- this.fieldCount = fieldCount;
- this.file = file;
- }
-
- public synchronized void create() throws HyracksDataException {
- if (isActivated) {
- throw new HyracksDataException("Failed to create the index since it is activated.");
- }
-
- boolean fileIsMapped = false;
- synchronized (fileMapProvider) {
- fileIsMapped = fileMapProvider.isMapped(file);
- if (!fileIsMapped) {
- bufferCache.createFile(file);
- }
- fileId = fileMapProvider.lookupFileId(file);
- try {
- // Also creates the file if it doesn't exist yet.
- bufferCache.openFile(fileId);
- } catch (HyracksDataException e) {
- // Revert state of buffer cache since file failed to open.
- if (!fileIsMapped) {
- bufferCache.deleteFile(fileId, false);
- }
- throw e;
- }
- }
-
- freePageManager.open(fileId);
- initEmptyTree();
- freePageManager.close();
- bufferCache.closeFile(fileId);
- }
-
- private void initEmptyTree() throws HyracksDataException {
- ITreeIndexFrame frame = leafFrameFactory.createFrame();
- ITreeIndexMetaDataFrame metaFrame = freePageManager.getMetaDataFrameFactory().createFrame();
- freePageManager.init(metaFrame, rootPage);
-
- ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), true);
- rootNode.acquireWriteLatch();
- try {
- frame.setPage(rootNode);
- frame.initBuffer((byte) 0);
- } finally {
- rootNode.releaseWriteLatch();
- bufferCache.unpin(rootNode);
- }
- }
-
- public synchronized void activate() throws HyracksDataException {
- if (isActivated) {
- throw new HyracksDataException("Failed to activate the index since it is already activated.");
- }
-
- boolean fileIsMapped = false;
- synchronized (fileMapProvider) {
- fileIsMapped = fileMapProvider.isMapped(file);
- if (!fileIsMapped) {
- bufferCache.createFile(file);
- }
- fileId = fileMapProvider.lookupFileId(file);
- try {
- // Also creates the file if it doesn't exist yet.
- bufferCache.openFile(fileId);
- } catch (HyracksDataException e) {
- // Revert state of buffer cache since file failed to open.
- if (!fileIsMapped) {
- bufferCache.deleteFile(fileId, false);
- }
- throw e;
- }
- }
- freePageManager.open(fileId);
-
- // TODO: Should probably have some way to check that the tree is physically consistent
- // or that the file we just opened actually is a tree
-
- isActivated = true;
- }
-
- public synchronized void deactivate() throws HyracksDataException {
- if (!isActivated) {
- throw new HyracksDataException("Failed to deactivate the index since it is already deactivated.");
- }
-
- bufferCache.closeFile(fileId);
- freePageManager.close();
-
- isActivated = false;
- }
-
- public synchronized void destroy() throws HyracksDataException {
- if (isActivated) {
- throw new HyracksDataException("Failed to destroy the index since it is activated.");
- }
-
- file.delete();
- if (fileId == -1) {
- return;
- }
-
- bufferCache.deleteFile(fileId, false);
- fileId = -1;
- }
-
- public synchronized void clear() throws HyracksDataException {
- if (!isActivated) {
- throw new HyracksDataException("Failed to clear the index since it is not activated.");
- }
- initEmptyTree();
- }
-
- public boolean isEmptyTree(ITreeIndexFrame frame) throws HyracksDataException {
- ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), false);
- rootNode.acquireReadLatch();
- try {
- frame.setPage(rootNode);
- if (frame.getLevel() == 0 && frame.getTupleCount() == 0) {
- return true;
- } else {
- return false;
- }
- } finally {
- rootNode.releaseReadLatch();
- bufferCache.unpin(rootNode);
- }
- }
-
- public byte getTreeHeight(ITreeIndexFrame frame) throws HyracksDataException {
- ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), false);
- rootNode.acquireReadLatch();
- try {
- frame.setPage(rootNode);
- return frame.getLevel();
- } finally {
- rootNode.releaseReadLatch();
- bufferCache.unpin(rootNode);
- }
- }
-
- public int getFileId() {
- return fileId;
- }
-
- public FileReference getFileReference() {
- return file;
- }
-
- public IBufferCache getBufferCache() {
- return bufferCache;
- }
-
- public ITreeIndexFrameFactory getInteriorFrameFactory() {
- return interiorFrameFactory;
- }
-
- public ITreeIndexFrameFactory getLeafFrameFactory() {
- return leafFrameFactory;
- }
-
- public IBinaryComparatorFactory[] getComparatorFactories() {
- return cmpFactories;
- }
-
- public IFreePageManager getFreePageManager() {
- return freePageManager;
- }
-
- public int getRootPageId() {
- return rootPage;
- }
-
- public int getFieldCount() {
- return fieldCount;
- }
-
- public abstract class AbstractTreeIndexBulkLoader implements IIndexBulkLoader {
- protected final MultiComparator cmp;
- protected final int slotSize;
- protected final int leafMaxBytes;
- protected final int interiorMaxBytes;
- protected final ArrayList<NodeFrontier> nodeFrontiers = new ArrayList<NodeFrontier>();
- protected final ITreeIndexMetaDataFrame metaFrame;
- protected final ITreeIndexTupleWriter tupleWriter;
- protected ITreeIndexFrame leafFrame;
- protected ITreeIndexFrame interiorFrame;
-
- public AbstractTreeIndexBulkLoader(float fillFactor) throws TreeIndexException, HyracksDataException {
- leafFrame = leafFrameFactory.createFrame();
- interiorFrame = interiorFrameFactory.createFrame();
- metaFrame = freePageManager.getMetaDataFrameFactory().createFrame();
-
- if (!isEmptyTree(leafFrame)) {
- throw new TreeIndexException("Cannot bulk-load a non-empty tree.");
- }
-
- this.cmp = MultiComparator.createIgnoreFieldLength(cmpFactories);
-
- leafFrame.setMultiComparator(cmp);
- interiorFrame.setMultiComparator(cmp);
-
- tupleWriter = leafFrame.getTupleWriter();
-
- NodeFrontier leafFrontier = new NodeFrontier(leafFrame.createTupleReference());
- leafFrontier.pageId = freePageManager.getFreePage(metaFrame);
- leafFrontier.page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, leafFrontier.pageId), true);
- leafFrontier.page.acquireWriteLatch();
-
- interiorFrame.setPage(leafFrontier.page);
- interiorFrame.initBuffer((byte) 0);
- interiorMaxBytes = (int) ((float) interiorFrame.getBuffer().capacity() * fillFactor);
-
- leafFrame.setPage(leafFrontier.page);
- leafFrame.initBuffer((byte) 0);
- leafMaxBytes = (int) ((float) leafFrame.getBuffer().capacity() * fillFactor);
- slotSize = leafFrame.getSlotSize();
-
- nodeFrontiers.add(leafFrontier);
- }
-
- public abstract void add(ITupleReference tuple) throws IndexException, HyracksDataException;
-
- protected void handleException() throws HyracksDataException {
- // Unlatch and unpin pages.
- for (NodeFrontier nodeFrontier : nodeFrontiers) {
- nodeFrontier.page.releaseWriteLatch();
- bufferCache.unpin(nodeFrontier.page);
- }
- }
-
- @Override
- public void end() throws HyracksDataException {
- // copy the root generated from the bulk-load to *the* root page location
- ICachedPage newRoot = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), true);
- newRoot.acquireWriteLatch();
- NodeFrontier lastNodeFrontier = nodeFrontiers.get(nodeFrontiers.size() - 1);
- try {
- System.arraycopy(lastNodeFrontier.page.getBuffer().array(), 0, newRoot.getBuffer().array(), 0,
- lastNodeFrontier.page.getBuffer().capacity());
- } finally {
- newRoot.releaseWriteLatch();
- bufferCache.unpin(newRoot);
-
- // register old root as a free page
- freePageManager.addFreePage(metaFrame, lastNodeFrontier.pageId);
-
- for (int i = 0; i < nodeFrontiers.size(); i++) {
- nodeFrontiers.get(i).page.releaseWriteLatch();
- bufferCache.unpin(nodeFrontiers.get(i).page);
- }
- }
- }
-
- protected void addLevel() throws HyracksDataException {
- NodeFrontier frontier = new NodeFrontier(tupleWriter.createTupleReference());
- frontier.pageId = freePageManager.getFreePage(metaFrame);
- frontier.page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, frontier.pageId), true);
- frontier.page.acquireWriteLatch();
- frontier.lastTuple.setFieldCount(cmp.getKeyFieldCount());
- interiorFrame.setPage(frontier.page);
- interiorFrame.initBuffer((byte) nodeFrontiers.size());
- nodeFrontiers.add(frontier);
- }
- }
-
- public class TreeIndexInsertBulkLoader implements IIndexBulkLoader {
- ITreeIndexAccessor accessor;
-
- public TreeIndexInsertBulkLoader() throws HyracksDataException {
- accessor = (ITreeIndexAccessor) createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
- }
-
- @Override
- public void add(ITupleReference tuple) throws HyracksDataException {
- try {
- accessor.insert(tuple);
- } catch (IndexException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void end() throws HyracksDataException {
- // do nothing
- }
-
- }
-
- @Override
- public long getMemoryAllocationSize() {
- return 0;
- }
+
+package edu.uci.ics.hyracks.storage.am.common.impls;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public abstract class AbstractTreeIndex implements ITreeIndex {
+
+ protected final static int rootPage = 1;
+
+ protected final IBufferCache bufferCache;
+ protected final IFileMapProvider fileMapProvider;
+ protected final IFreePageManager freePageManager;
+
+ protected final ITreeIndexFrameFactory interiorFrameFactory;
+ protected final ITreeIndexFrameFactory leafFrameFactory;
+
+ protected final IBinaryComparatorFactory[] cmpFactories;
+ protected final int fieldCount;
+
+ protected FileReference file;
+ protected int fileId = -1;
+
+ private boolean isActivated = false;
+
+ public AbstractTreeIndex(IBufferCache bufferCache, IFileMapProvider fileMapProvider,
+ IFreePageManager freePageManager, ITreeIndexFrameFactory interiorFrameFactory,
+ ITreeIndexFrameFactory leafFrameFactory, IBinaryComparatorFactory[] cmpFactories, int fieldCount,
+ FileReference file) {
+ this.bufferCache = bufferCache;
+ this.fileMapProvider = fileMapProvider;
+ this.freePageManager = freePageManager;
+ this.interiorFrameFactory = interiorFrameFactory;
+ this.leafFrameFactory = leafFrameFactory;
+ this.cmpFactories = cmpFactories;
+ this.fieldCount = fieldCount;
+ this.file = file;
+ }
+
+ public synchronized void create() throws HyracksDataException {
+ if (isActivated) {
+ throw new HyracksDataException("Failed to create the index since it is activated.");
+ }
+
+ boolean fileIsMapped = false;
+ synchronized (fileMapProvider) {
+ fileIsMapped = fileMapProvider.isMapped(file);
+ if (!fileIsMapped) {
+ bufferCache.createFile(file);
+ }
+ fileId = fileMapProvider.lookupFileId(file);
+ try {
+ // Also creates the file if it doesn't exist yet.
+ bufferCache.openFile(fileId);
+ } catch (HyracksDataException e) {
+ // Revert state of buffer cache since file failed to open.
+ if (!fileIsMapped) {
+ bufferCache.deleteFile(fileId, false);
+ }
+ throw e;
+ }
+ }
+
+ freePageManager.open(fileId);
+ initEmptyTree();
+ freePageManager.close();
+ bufferCache.closeFile(fileId);
+ }
+
+ private void initEmptyTree() throws HyracksDataException {
+ ITreeIndexFrame frame = leafFrameFactory.createFrame();
+ ITreeIndexMetaDataFrame metaFrame = freePageManager.getMetaDataFrameFactory().createFrame();
+ freePageManager.init(metaFrame, rootPage);
+
+ ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), true);
+ rootNode.acquireWriteLatch();
+ try {
+ frame.setPage(rootNode);
+ frame.initBuffer((byte) 0);
+ } finally {
+ rootNode.releaseWriteLatch();
+ bufferCache.unpin(rootNode);
+ }
+ }
+
+ public synchronized void activate() throws HyracksDataException {
+ if (isActivated) {
+ throw new HyracksDataException("Failed to activate the index since it is already activated.");
+ }
+
+ boolean fileIsMapped = false;
+ synchronized (fileMapProvider) {
+ fileIsMapped = fileMapProvider.isMapped(file);
+ if (!fileIsMapped) {
+ bufferCache.createFile(file);
+ }
+ fileId = fileMapProvider.lookupFileId(file);
+ try {
+ // Also creates the file if it doesn't exist yet.
+ bufferCache.openFile(fileId);
+ } catch (HyracksDataException e) {
+ // Revert state of buffer cache since file failed to open.
+ if (!fileIsMapped) {
+ bufferCache.deleteFile(fileId, false);
+ }
+ throw e;
+ }
+ }
+ freePageManager.open(fileId);
+
+ // TODO: Should probably have some way to check that the tree is physically consistent
+ // or that the file we just opened actually is a tree
+
+ isActivated = true;
+ }
+
+ public synchronized void deactivate() throws HyracksDataException {
+ if (!isActivated) {
+ throw new HyracksDataException("Failed to deactivate the index since it is already deactivated.");
+ }
+
+ bufferCache.closeFile(fileId);
+ freePageManager.close();
+
+ isActivated = false;
+ }
+
+ public synchronized void destroy() throws HyracksDataException {
+ if (isActivated) {
+ throw new HyracksDataException("Failed to destroy the index since it is activated.");
+ }
+
+ file.delete();
+ if (fileId == -1) {
+ return;
+ }
+
+ bufferCache.deleteFile(fileId, false);
+ fileId = -1;
+ }
+
+ public synchronized void clear() throws HyracksDataException {
+ if (!isActivated) {
+ throw new HyracksDataException("Failed to clear the index since it is not activated.");
+ }
+ initEmptyTree();
+ }
+
+ public boolean isEmptyTree(ITreeIndexFrame frame) throws HyracksDataException {
+ ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), false);
+ rootNode.acquireReadLatch();
+ try {
+ frame.setPage(rootNode);
+ if (frame.getLevel() == 0 && frame.getTupleCount() == 0) {
+ return true;
+ } else {
+ return false;
+ }
+ } finally {
+ rootNode.releaseReadLatch();
+ bufferCache.unpin(rootNode);
+ }
+ }
+
+ public byte getTreeHeight(ITreeIndexFrame frame) throws HyracksDataException {
+ ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), false);
+ rootNode.acquireReadLatch();
+ try {
+ frame.setPage(rootNode);
+ return frame.getLevel();
+ } finally {
+ rootNode.releaseReadLatch();
+ bufferCache.unpin(rootNode);
+ }
+ }
+
+ public int getFileId() {
+ return fileId;
+ }
+
+ public FileReference getFileReference() {
+ return file;
+ }
+
+ public IBufferCache getBufferCache() {
+ return bufferCache;
+ }
+
+ public ITreeIndexFrameFactory getInteriorFrameFactory() {
+ return interiorFrameFactory;
+ }
+
+ public ITreeIndexFrameFactory getLeafFrameFactory() {
+ return leafFrameFactory;
+ }
+
+ public IBinaryComparatorFactory[] getComparatorFactories() {
+ return cmpFactories;
+ }
+
+ public IFreePageManager getFreePageManager() {
+ return freePageManager;
+ }
+
+ public int getRootPageId() {
+ return rootPage;
+ }
+
+ public int getFieldCount() {
+ return fieldCount;
+ }
+
+ public abstract class AbstractTreeIndexBulkLoader implements IIndexBulkLoader {
+ protected final MultiComparator cmp;
+ protected final int slotSize;
+ protected final int leafMaxBytes;
+ protected final int interiorMaxBytes;
+ protected final ArrayList<NodeFrontier> nodeFrontiers = new ArrayList<NodeFrontier>();
+ protected final ITreeIndexMetaDataFrame metaFrame;
+ protected final ITreeIndexTupleWriter tupleWriter;
+ protected ITreeIndexFrame leafFrame;
+ protected ITreeIndexFrame interiorFrame;
+ private boolean releasedLatches;
+
+ public AbstractTreeIndexBulkLoader(float fillFactor) throws TreeIndexException, HyracksDataException {
+ leafFrame = leafFrameFactory.createFrame();
+ interiorFrame = interiorFrameFactory.createFrame();
+ metaFrame = freePageManager.getMetaDataFrameFactory().createFrame();
+
+ if (!isEmptyTree(leafFrame)) {
+ throw new TreeIndexException("Cannot bulk-load a non-empty tree.");
+ }
+
+ this.cmp = MultiComparator.createIgnoreFieldLength(cmpFactories);
+
+ leafFrame.setMultiComparator(cmp);
+ interiorFrame.setMultiComparator(cmp);
+
+ tupleWriter = leafFrame.getTupleWriter();
+
+ NodeFrontier leafFrontier = new NodeFrontier(leafFrame.createTupleReference());
+ leafFrontier.pageId = freePageManager.getFreePage(metaFrame);
+ leafFrontier.page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, leafFrontier.pageId), true);
+ leafFrontier.page.acquireWriteLatch();
+
+ interiorFrame.setPage(leafFrontier.page);
+ interiorFrame.initBuffer((byte) 0);
+ interiorMaxBytes = (int) ((float) interiorFrame.getBuffer().capacity() * fillFactor);
+
+ leafFrame.setPage(leafFrontier.page);
+ leafFrame.initBuffer((byte) 0);
+ leafMaxBytes = (int) ((float) leafFrame.getBuffer().capacity() * fillFactor);
+ slotSize = leafFrame.getSlotSize();
+
+ nodeFrontiers.add(leafFrontier);
+ }
+
+ public abstract void add(ITupleReference tuple) throws IndexException, HyracksDataException;
+
+ protected void handleException() throws HyracksDataException {
+ // Unlatch and unpin pages.
+ for (NodeFrontier nodeFrontier : nodeFrontiers) {
+ nodeFrontier.page.releaseWriteLatch();
+ bufferCache.unpin(nodeFrontier.page);
+ }
+ releasedLatches = true;
+ }
+
+ @Override
+ public void end() throws HyracksDataException {
+ // copy the root generated from the bulk-load to *the* root page location
+ ICachedPage newRoot = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), true);
+ newRoot.acquireWriteLatch();
+ NodeFrontier lastNodeFrontier = nodeFrontiers.get(nodeFrontiers.size() - 1);
+ try {
+ System.arraycopy(lastNodeFrontier.page.getBuffer().array(), 0, newRoot.getBuffer().array(), 0,
+ lastNodeFrontier.page.getBuffer().capacity());
+ } finally {
+ newRoot.releaseWriteLatch();
+ bufferCache.unpin(newRoot);
+
+ // register old root as a free page
+ freePageManager.addFreePage(metaFrame, lastNodeFrontier.pageId);
+
+ if (!releasedLatches) {
+ for (int i = 0; i < nodeFrontiers.size(); i++) {
+ nodeFrontiers.get(i).page.releaseWriteLatch();
+ bufferCache.unpin(nodeFrontiers.get(i).page);
+ }
+ }
+ }
+ }
+
+ protected void addLevel() throws HyracksDataException {
+ NodeFrontier frontier = new NodeFrontier(tupleWriter.createTupleReference());
+ frontier.pageId = freePageManager.getFreePage(metaFrame);
+ frontier.page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, frontier.pageId), true);
+ frontier.page.acquireWriteLatch();
+ frontier.lastTuple.setFieldCount(cmp.getKeyFieldCount());
+ interiorFrame.setPage(frontier.page);
+ interiorFrame.initBuffer((byte) nodeFrontiers.size());
+ nodeFrontiers.add(frontier);
+ }
+ }
+
+ public class TreeIndexInsertBulkLoader implements IIndexBulkLoader {
+ ITreeIndexAccessor accessor;
+
+ public TreeIndexInsertBulkLoader() throws HyracksDataException {
+ accessor = (ITreeIndexAccessor) createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ }
+
+ @Override
+ public void add(ITupleReference tuple) throws HyracksDataException {
+ try {
+ accessor.insert(tuple);
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void end() throws HyracksDataException {
+ // do nothing
+ }
+
+ }
+
+ @Override
+ public long getMemoryAllocationSize() {
+ return 0;
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 3557d73..59af99e 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -27,7 +27,6 @@
import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilter;
import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeBulkLoader;
@@ -46,6 +45,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
@@ -279,7 +279,7 @@
memCursor.next();
LSMBTreeTupleReference lsmbtreeTuple = (LSMBTreeTupleReference) memCursor.getTuple();
if (!lsmbtreeTuple.isAntimatter()) {
- throw new BTreeDuplicateKeyException("Failed to insert key since key already exists.");
+ throw new TreeIndexDuplicateKeyException("Failed to insert key since key already exists.");
} else {
memCursor.close();
ctx.memBTreeAccessor.upsertIfConditionElseInsert(tuple, AntimatterAwareTupleAcceptor.INSTANCE);
@@ -299,7 +299,7 @@
search(ctx, searchCursor, predicate);
try {
if (searchCursor.hasNext()) {
- throw new BTreeDuplicateKeyException("Failed to insert key since key already exists.");
+ throw new TreeIndexDuplicateKeyException("Failed to insert key since key already exists.");
}
} finally {
searchCursor.close();
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 3eae5a7..244dfa0 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -27,7 +27,6 @@
import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomCalculations;
import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
@@ -43,6 +42,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IVirtualFreePageManager;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
@@ -305,7 +305,7 @@
ctx.keysOnlyTuple.reset(tuple);
try {
ctx.deletedKeysBTreeAccessor.insert(ctx.keysOnlyTuple);
- } catch (BTreeDuplicateKeyException e) {
+ } catch (TreeIndexDuplicateKeyException e) {
// Key has already been deleted.
}
break;
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
index 222f4de..c70c2d5 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
@@ -19,9 +19,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeException;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
@@ -33,6 +31,8 @@
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
@@ -117,7 +117,7 @@
ITupleReference insertTuple = ctx.tupleIter.getTuple();
try {
btreeAccessor.insert(insertTuple);
- } catch (BTreeDuplicateKeyException e) {
+ } catch (TreeIndexDuplicateKeyException e) {
// This exception may be caused by duplicate tokens in the same insert "document".
// We ignore such duplicate tokens in all inverted-index implementations, hence
// we can safely ignore this exception.
@@ -134,7 +134,7 @@
ITupleReference deleteTuple = ctx.tupleIter.getTuple();
try {
btreeAccessor.delete(deleteTuple);
- } catch (BTreeNonExistentKeyException e) {
+ } catch (TreeIndexNonExistentKeyException e) {
// Ignore this exception, since a document may have duplicate tokens.
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index e3e690b..285010c 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -23,8 +23,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
@@ -36,6 +34,8 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.IVirtualFreePageManager;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -306,7 +306,7 @@
if (foundTupleInMemoryBTree) {
try {
ctx.memBTreeAccessor.delete(tuple);
- } catch (BTreeNonExistentKeyException e) {
+ } catch (TreeIndexNonExistentKeyException e) {
// Tuple has been deleted in the meantime. Do nothing.
// This normally shouldn't happen if we are dealing with
// good citizens since LSMRTree is used as a secondary
@@ -320,7 +320,7 @@
} else {
try {
ctx.memBTreeAccessor.insert(tuple);
- } catch (BTreeDuplicateKeyException e) {
+ } catch (TreeIndexDuplicateKeyException e) {
// Do nothing, because one delete tuple is enough to indicate
// that all the corresponding insert tuples are deleted
}
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java
index eafeff2..0a70185 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java
@@ -46,6 +46,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
import edu.uci.ics.hyracks.storage.am.common.api.UnsortedInputException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
import edu.uci.ics.hyracks.storage.am.common.impls.TreeIndexDiskOrderScanCursor;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
@@ -140,9 +141,9 @@
/**
* This test the btree page split. Originally this test didn't pass since
- * the btree was spliting by cardinality and not size. Thus, we might end
- * up with a situation where there is not enough space to insert the new
- * tuple after the split which will throw an error and the split won't be
+ * the btree was spliting by cardinality and not size. Thus, we might end up
+ * with a situation where there is not enough space to insert the new tuple
+ * after the split which will throw an error and the split won't be
* propagated to upper level; thus, the tree is corrupted. Now, it split
* page by size. The correct behavior on abnormally large keys/values.
*/
@@ -716,6 +717,12 @@
}
// Success.
break;
+ } catch (TreeIndexDuplicateKeyException e2) {
+ if (j != i) {
+ fail("Unexpected exception: " + e2.getMessage());
+ }
+ // Success.
+ break;
}
}
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexTestUtils.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexTestUtils.java
index 5b7b050..9de217b 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexTestUtils.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexTestUtils.java
@@ -35,7 +35,6 @@
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.btree.util.BTreeUtils;
import edu.uci.ics.hyracks.storage.am.common.CheckTuple;
@@ -44,6 +43,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
@SuppressWarnings("rawtypes")
@@ -205,7 +205,7 @@
// Set expected values. Do this only after insertion succeeds
// because we ignore duplicate keys.
ctx.insertCheckTuple(createStringCheckTuple(fieldValues, ctx.getKeyFieldCount()), ctx.getCheckTuples());
- } catch (BTreeDuplicateKeyException e) {
+ } catch (TreeIndexDuplicateKeyException e) {
// Ignore duplicate key insertions.
}
}
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/btree/multithread/BTreeTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/btree/multithread/BTreeTestWorker.java
index 262e21c..22d3e6a 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/btree/multithread/BTreeTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/btree/multithread/BTreeTestWorker.java
@@ -19,8 +19,6 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNotUpdateableException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
@@ -31,6 +29,8 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.datagen.DataGenThread;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
public class BTreeTestWorker extends AbstractIndexTestWorker {
@@ -60,7 +60,7 @@
case INSERT:
try {
accessor.insert(tuple);
- } catch (BTreeDuplicateKeyException e) {
+ } catch (TreeIndexDuplicateKeyException e) {
// Ignore duplicate keys, since we get random tuples.
}
break;
@@ -74,7 +74,7 @@
deleteTuple.reset(deleteTb.getFieldEndOffsets(), deleteTb.getByteArray());
try {
accessor.delete(deleteTuple);
- } catch (BTreeNonExistentKeyException e) {
+ } catch (TreeIndexNonExistentKeyException e) {
// Ignore non-existant keys, since we get random tuples.
}
break;
@@ -82,7 +82,7 @@
case UPDATE:
try {
accessor.update(tuple);
- } catch (BTreeNonExistentKeyException e) {
+ } catch (TreeIndexNonExistentKeyException e) {
// Ignore non-existant keys, since we get random tuples.
} catch (BTreeNotUpdateableException e) {
// Ignore not updateable exception due to numKeys == numFields.
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
index 8a37b4e..4a96131 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
@@ -19,8 +19,6 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNotUpdateableException;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.common.AbstractIndexTestWorker;
@@ -30,6 +28,8 @@
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.datagen.DataGenThread;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree;
import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree.LSMBTreeAccessor;
@@ -60,7 +60,7 @@
case INSERT:
try {
accessor.insert(tuple);
- } catch (BTreeDuplicateKeyException e) {
+ } catch (TreeIndexDuplicateKeyException e) {
// Ignore duplicate keys, since we get random tuples.
}
break;
@@ -74,7 +74,7 @@
deleteTuple.reset(deleteTb.getFieldEndOffsets(), deleteTb.getByteArray());
try {
accessor.delete(deleteTuple);
- } catch (BTreeNonExistentKeyException e) {
+ } catch (TreeIndexNonExistentKeyException e) {
// Ignore non-existant keys, since we get random tuples.
}
break;
@@ -82,7 +82,7 @@
case UPDATE:
try {
accessor.update(tuple);
- } catch (BTreeNonExistentKeyException e) {
+ } catch (TreeIndexNonExistentKeyException e) {
// Ignore non-existant keys, since we get random tuples.
} catch (BTreeNotUpdateableException e) {
// Ignore not updateable exception due to numKeys == numFields.
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index c52130d..26cb8d0 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -35,7 +36,6 @@
import edu.uci.ics.pregelix.api.io.WritableSizable;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.api.util.JobStateUtils;
-import edu.uci.ics.pregelix.api.util.SerDeUtils;
/**
* User applications should all inherit {@link Vertex}, and implement their own
@@ -270,7 +270,7 @@
delegate.setVertex(this);
}
destEdgeList.clear();
- long edgeMapSize = SerDeUtils.readVLong(in);
+ long edgeMapSize = WritableUtils.readVLong(in);
for (long i = 0; i < edgeMapSize; ++i) {
Edge<I, E> edge = allocateEdge();
edge.setConf(getContext().getConfiguration());
@@ -278,7 +278,7 @@
addEdge(edge);
}
msgList.clear();
- long msgListSize = SerDeUtils.readVLong(in);
+ long msgListSize = WritableUtils.readVLong(in);
for (long i = 0; i < msgListSize; ++i) {
M msg = allocateMessage();
msg.readFields(in);
@@ -297,11 +297,11 @@
if (vertexValue != null) {
vertexValue.write(out);
}
- SerDeUtils.writeVLong(out, destEdgeList.size());
+ WritableUtils.writeVLong(out, destEdgeList.size());
for (Edge<I, E> edge : destEdgeList) {
edge.write(out);
}
- SerDeUtils.writeVLong(out, msgList.size());
+ WritableUtils.writeVLong(out, msgList.size());
for (M msg : msgList) {
msg.write(out);
}
@@ -618,7 +618,6 @@
* Terminate the current partition where the current vertex stays in.
* This will immediately take effect and the upcoming vertice in the
* same partition cannot be processed.
- *
*/
protected final void terminatePartition() {
voteToHalt();
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexInputFormat.java
new file mode 100644
index 0000000..22d3b27
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexInputFormat.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.io.internal;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * @author yingyib
+ */
+@SuppressWarnings("rawtypes")
+public class InternalVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
+ extends VertexInputFormat<I, V, E, M> {
+ /** Uses the SequenceFileInputFormat to do everything */
+ private SequenceFileInputFormat sequenceInputFormat = new SequenceFileInputFormat();
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException, InterruptedException {
+ return sequenceInputFormat.getSplits(context);
+ }
+
+ @Override
+ public VertexReader<I, V, E, M> createVertexReader(final InputSplit split, final TaskAttemptContext context)
+ throws IOException {
+ return new VertexReader<I, V, E, M>() {
+ RecordReader recordReader = sequenceInputFormat.createRecordReader(split, context);
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ recordReader.initialize(inputSplit, context);
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return recordReader.nextKeyValue();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Vertex<I, V, E, M> getCurrentVertex() throws IOException, InterruptedException {
+ return (Vertex<I, V, E, M>) recordReader.getCurrentValue();
+ }
+
+ @Override
+ public void close() throws IOException {
+ recordReader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return 0;
+ }
+
+ };
+ }
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexOutputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexOutputFormat.java
new file mode 100644
index 0000000..b603037
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexOutputFormat.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.io.internal;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+
+/**
+ * @author yingyib
+ */
+@SuppressWarnings("rawtypes")
+public class InternalVertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable> extends
+ VertexOutputFormat<I, V, E> {
+ private SequenceFileOutputFormat sequenceOutputFormat = new SequenceFileOutputFormat();
+
+ @Override
+ public VertexWriter<I, V, E> createVertexWriter(final TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ return new VertexWriter<I, V, E>() {
+ private RecordWriter recordWriter = sequenceOutputFormat.getRecordWriter(context);
+ private NullWritable key = NullWritable.get();
+
+ @Override
+ public void initialize(TaskAttemptContext context) throws IOException, InterruptedException {
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException, InterruptedException {
+ recordWriter.write(key, vertex);
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ recordWriter.close(context);
+ }
+
+ };
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+ return new OutputCommitter() {
+
+ @Override
+ public void abortTask(TaskAttemptContext arg0) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void cleanupJob(JobContext arg0) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext arg0) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void setupJob(JobContext arg0) throws IOException {
+
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext arg0) throws IOException {
+
+ }
+
+ };
+ }
+
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeDuplicateKeyException.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/ICheckpointHook.java
similarity index 64%
copy from hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeDuplicateKeyException.java
copy to pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/ICheckpointHook.java
index cde5022..9d6eb5a 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeDuplicateKeyException.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/ICheckpointHook.java
@@ -12,17 +12,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package edu.uci.ics.pregelix.api.job;
-package edu.uci.ics.hyracks.storage.am.btree.exceptions;
+/**
+ * @author yingyib
+ */
+public interface ICheckpointHook {
-public class BTreeDuplicateKeyException extends BTreeException {
- private static final long serialVersionUID = 1L;
+ public boolean checkpoint(int superstep);
- public BTreeDuplicateKeyException(Exception e) {
- super(e);
- }
-
- public BTreeDuplicateKeyException(String message) {
- super(message);
- }
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index dae7818..1e0d87a 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -74,6 +74,18 @@
public static final String FRAME_SIZE = "pregelix.framesize";
/** update intensive */
public static final String UPDATE_INTENSIVE = "pregelix.updateIntensive";
+ /** the check point hook */
+ public static final String CKP_CLASS = "pregelix.checkpointHook";
+
+ /**
+ * Construct a Pregelix job from an existing configuration
+ *
+ * @param conf
+ * @throws IOException
+ */
+ public PregelixJob(Configuration conf) throws IOException {
+ super(conf);
+ }
/**
* Constructor that will instantiate the configuration
@@ -201,4 +213,18 @@
final public void setLSMStorage(boolean variableSizedUpdateHeavyFlag) {
getConfiguration().setBoolean(UPDATE_INTENSIVE, variableSizedUpdateHeavyFlag);
}
+
+ /**
+ * Users can provide an ICheckpointHook implementation to specify when to do checkpoint
+ *
+ * @param ckpClass
+ */
+ final public void setCheckpointHook(Class<?> ckpClass) {
+ getConfiguration().setClass(CKP_CLASS, ckpClass, ICheckpointHook.class);
+ }
+
+ @Override
+ public String toString() {
+ return getJobName();
+ }
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index ff4ee91..51a9ce3 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -29,6 +29,7 @@
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
import edu.uci.ics.pregelix.api.io.WritableSizable;
+import edu.uci.ics.pregelix.api.job.ICheckpointHook;
import edu.uci.ics.pregelix.api.job.PregelixJob;
/**
@@ -461,6 +462,24 @@
}
/**
+ * Create a checkpoint hook
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user aggregate value
+ */
+ public static ICheckpointHook createCheckpointHook(Configuration conf) {
+ Class<? extends ICheckpointHook> ckpClass = getCheckpointHookClass(conf);
+ try {
+ return ckpClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("createVertexPartitioner: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("createVertexPartitioner: Illegally accessed", e);
+ }
+ }
+
+ /**
* Get the user's subclassed vertex partitioner class.
*
* @param conf
@@ -475,6 +494,20 @@
}
/**
+ * Get the user's subclassed checkpoint hook class.
+ *
+ * @param conf
+ * Configuration to check
+ * @return The user defined vertex checkpoint hook class
+ */
+ @SuppressWarnings("unchecked")
+ public static <V extends ICheckpointHook> Class<V> getCheckpointHookClass(Configuration conf) {
+ if (conf == null)
+ conf = defaultConf;
+ return (Class<V>) conf.getClass(PregelixJob.CKP_CLASS, DefaultCheckpointHook.class, ICheckpointHook.class);
+ }
+
+ /**
* Get the job configuration parameter whether the vertex states will increase dynamically
*
* @param conf
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java
new file mode 100644
index 0000000..6a4a660
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import edu.uci.ics.pregelix.api.job.ICheckpointHook;
+
+/**
+ * A conservative checkpoint hook which does checkpoint every 5 supersteps
+ *
+ * @author yingyib
+ */
+public class ConservativeCheckpointHook implements ICheckpointHook {
+
+ @Override
+ public boolean checkpoint(int superstep) {
+ if (superstep % 5 == 0) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeDuplicateKeyException.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultCheckpointHook.java
similarity index 65%
copy from hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeDuplicateKeyException.java
copy to pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultCheckpointHook.java
index cde5022..c37c4ab 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeDuplicateKeyException.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultCheckpointHook.java
@@ -12,17 +12,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package edu.uci.ics.pregelix.api.util;
-package edu.uci.ics.hyracks.storage.am.btree.exceptions;
+import edu.uci.ics.pregelix.api.job.ICheckpointHook;
-public class BTreeDuplicateKeyException extends BTreeException {
- private static final long serialVersionUID = 1L;
+/**
+ * The default checkpoint hook which never does checkpointing.
+ *
+ * @author yingyib
+ */
+public class DefaultCheckpointHook implements ICheckpointHook {
- public BTreeDuplicateKeyException(Exception e) {
- super(e);
+ @Override
+ public boolean checkpoint(int superstep) {
+ return false;
}
- public BTreeDuplicateKeyException(String message) {
- super(message);
- }
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java
index 7d5f627..c7febc1 100755
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java
@@ -15,11 +15,8 @@
package edu.uci.ics.pregelix.api.util;
import java.io.InputStream;
-import java.util.logging.Level;
-import java.util.logging.Logger;
public class ResetableByteArrayInputStream extends InputStream {
- private static final Logger LOGGER = Logger.getLogger(ResetableByteArrayInputStream.class.getName());
private byte[] data;
private int position;
@@ -36,19 +33,12 @@
public int read() {
int remaining = data.length - position;
int value = remaining > 0 ? (data[position++] & 0xff) : -1;
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("read(): value: " + value + " remaining: " + remaining + " position: " + position);
- }
return value;
}
@Override
public int read(byte[] bytes, int offset, int length) {
int remaining = data.length - position;
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("read(bytes[], int, int): remaining: " + remaining + " offset: " + offset + " length: "
- + length + " position: " + position);
- }
if (remaining == 0) {
return -1;
}
@@ -57,4 +47,9 @@
position += l;
return l;
}
-}
\ No newline at end of file
+
+ @Override
+ public int available() {
+ return data.length - position;
+ }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java
index 25b07ff..a4336a3 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java
@@ -40,47 +40,4 @@
object.readFields(input);
}
- public static long readVLong(DataInput in) throws IOException {
- int vLen = 0;
- long value = 0L;
- while (true) {
- byte b = (byte) in.readByte();
- ++vLen;
- value += (((long) (b & 0x7f)) << ((vLen - 1) * 7));
- if ((b & 0x80) == 0) {
- break;
- }
- }
- return value;
- }
-
- public static void writeVLong(DataOutput out, long value) throws IOException {
- long data = value;
- do {
- byte b = (byte) (data & 0x7f);
- data >>= 7;
- if (data != 0) {
- b |= 0x80;
- }
- out.write(b);
- } while (data != 0);
- }
-
- public static long readVLong(byte[] data, int start, int length) {
- int vLen = 0;
- long value = 0L;
- while (true) {
- byte b = (byte) data[start];
- ++vLen;
- value += (((long) (b & 0x7f)) << ((vLen - 1) * 7));
- if ((b & 0x80) == 0) {
- break;
- }
- ++start;
- }
- if (vLen != length)
- throw new IllegalStateException("length mismatch -- vLen:" + vLen + " length:" + length);
- return value;
- }
-
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IDriver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IDriver.java
index bc6c0cf..c72f392 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IDriver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IDriver.java
@@ -15,6 +15,8 @@
package edu.uci.ics.pregelix.core.base;
+import java.util.List;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -29,6 +31,11 @@
public void runJob(PregelixJob job, String ipAddress, int port) throws HyracksException;
+ public void runJobs(List<PregelixJob> jobs, String ipAddress, int port) throws HyracksException;
+
public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
throws HyracksException;
+
+ public void runJobs(List<PregelixJob> jobs, Plan planChoice, String ipAddress, int port, boolean profiling)
+ throws HyracksException;
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java
index 2d58902..6bb0dea 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java
@@ -26,4 +26,10 @@
public JobSpecification generateJob(int iteration) throws HyracksException;
+ public JobSpecification[] generateCheckpointing(int lastSuccessfulIteration) throws HyracksException;
+
+ public JobSpecification[] generateLoadingCheckpoint(int lastCheckpointedIteration) throws HyracksException;
+
+ public JobSpecification generateClearState() throws HyracksException;
+
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 2d4064b..3e6e9a5 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -17,15 +17,20 @@
import java.io.File;
import java.io.FilenameFilter;
+import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
@@ -35,7 +40,9 @@
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.pregelix.api.job.ICheckpointHook;
import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.core.base.IDriver;
import edu.uci.ics.pregelix.core.jobgen.JobGen;
import edu.uci.ics.pregelix.core.jobgen.JobGenInnerJoin;
@@ -48,11 +55,9 @@
@SuppressWarnings("rawtypes")
public class Driver implements IDriver {
private static final Log LOG = LogFactory.getLog(Driver.class);
- private JobGen jobGen;
- private boolean profiling;
-
private IHyracksClientConnection hcc;
private Class exampleClass;
+ private boolean profiling = false;
public Driver(Class exampleClass) {
this.exampleClass = exampleClass;
@@ -64,93 +69,236 @@
}
@Override
+ public void runJobs(List<PregelixJob> jobs, String ipAddress, int port) throws HyracksException {
+ runJobs(jobs, Plan.OUTER_JOIN, ipAddress, port, false);
+ }
+
+ @Override
public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
throws HyracksException {
+ runJobs(Collections.singletonList(job), planChoice, ipAddress, port, profiling);
+ }
+
+ @Override
+ public void runJobs(List<PregelixJob> jobs, Plan planChoice, String ipAddress, int port, boolean profiling)
+ throws HyracksException {
try {
- /** add hadoop configurations */
- URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
- if (hadoopCore != null) {
- job.getConfiguration().addResource(hadoopCore);
+ if (jobs.size() <= 0) {
+ throw new HyracksException("Please submit at least one job for execution!");
}
- URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
- if (hadoopMapRed != null) {
- job.getConfiguration().addResource(hadoopMapRed);
- }
- URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
- if (hadoopHdfs != null) {
- job.getConfiguration().addResource(hadoopHdfs);
- }
- ClusterConfig.loadClusterConfig(ipAddress, port);
-
- LOG.info("job started");
- long start = System.currentTimeMillis();
- long end = start;
- long time = 0;
-
this.profiling = profiling;
+ PregelixJob currentJob = jobs.get(0);
+ PregelixJob lastJob = currentJob;
+ JobGen jobGen = null;
- switch (planChoice) {
- case INNER_JOIN:
- jobGen = new JobGenInnerJoin(job);
- break;
- case OUTER_JOIN:
- jobGen = new JobGenOuterJoin(job);
- break;
- case OUTER_JOIN_SORT:
- jobGen = new JobGenOuterJoinSort(job);
- break;
- case OUTER_JOIN_SINGLE_SORT:
- jobGen = new JobGenOuterJoinSingleSort(job);
- break;
- default:
- jobGen = new JobGenInnerJoin(job);
- }
+ /** prepare job -- deploy jars */
+ DeploymentId deploymentId = prepareJobs(ipAddress, port);
+ LOG.info("job started");
- if (hcc == null)
- hcc = new HyracksConnection(ipAddress, port);
+ IntWritable lastSnapshotJobIndex = new IntWritable(0);
+ IntWritable lastSnapshotSuperstep = new IntWritable(0);
+ boolean failed = false;
+ int retryCount = 0;
+ int maxRetryCount = 3;
- URLClassLoader classLoader = (URLClassLoader) exampleClass.getClassLoader();
- List<File> jars = new ArrayList<File>();
- URL[] urls = classLoader.getURLs();
- for (URL url : urls)
- if (url.toString().endsWith(".jar"))
- jars.add(new File(url.getPath()));
- DeploymentId deploymentId = installApplication(jars);
-
- start = System.currentTimeMillis();
- FileSystem dfs = FileSystem.get(job.getConfiguration());
- dfs.delete(FileOutputFormat.getOutputPath(job), true);
- runCreate(deploymentId, jobGen);
- runDataLoad(deploymentId, jobGen);
- end = System.currentTimeMillis();
- time = end - start;
- LOG.info("data loading finished " + time + "ms");
- int i = 1;
- boolean terminate = false;
do {
- start = System.currentTimeMillis();
- runLoopBodyIteration(deploymentId, jobGen, i);
- end = System.currentTimeMillis();
- time = end - start;
- LOG.info("iteration " + i + " finished " + time + "ms");
- terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId())
- || IterationUtils.readForceTerminationState(job.getConfiguration(), jobGen.getJobId());
- i++;
- } while (!terminate);
+ try {
+ for (int i = lastSnapshotJobIndex.get(); i < jobs.size(); i++) {
+ lastJob = currentJob;
+ currentJob = jobs.get(i);
- start = System.currentTimeMillis();
- runHDFSWRite(deploymentId, jobGen);
- runCleanup(deploymentId, jobGen);
- end = System.currentTimeMillis();
- time = end - start;
- LOG.info("result writing finished " + time + "ms");
- hcc.unDeployBinary(deploymentId);
+ /** add hadoop configurations */
+ addHadoopConfiguration(currentJob, ipAddress, port);
+ ICheckpointHook ckpHook = BspUtils.createCheckpointHook(currentJob.getConfiguration());
+
+ /** load the data */
+ if (i == 0 || compatible(lastJob, currentJob)) {
+ if (i != 0) {
+ finishJobs(jobGen, deploymentId);
+ /** invalidate/clear checkpoint */
+ lastSnapshotJobIndex.set(0);
+ lastSnapshotSuperstep.set(0);
+ }
+ jobGen = selectJobGen(planChoice, currentJob);
+ loadData(currentJob, jobGen, deploymentId);
+ } else {
+ jobGen.reset(currentJob);
+ }
+
+ /** run loop-body jobs */
+ runLoopBody(deploymentId, currentJob, jobGen, i, lastSnapshotJobIndex, lastSnapshotSuperstep,
+ ckpHook, failed);
+ runClearState(deploymentId, jobGen);
+ failed = false;
+ }
+
+ /** finish the jobs */
+ finishJobs(jobGen, deploymentId);
+ /** clear checkpoints if any */
+ jobGen.clearCheckpoints();
+ hcc.unDeployBinary(deploymentId);
+ } catch (IOException ioe) {
+ /** disk failures */
+ //restart from snapshot
+ failed = true;
+ retryCount++;
+ ioe.printStackTrace();
+ }
+ } while (failed && retryCount < maxRetryCount);
LOG.info("job finished");
} catch (Exception e) {
+ e.printStackTrace();
throw new HyracksException(e);
}
}
+ private boolean compatible(PregelixJob lastJob, PregelixJob currentJob) {
+ Class lastVertexIdClass = BspUtils.getVertexIndexClass(lastJob.getConfiguration());
+ Class lastVertexValueClass = BspUtils.getVertexValueClass(lastJob.getConfiguration());
+ Class lastEdgeValueClass = BspUtils.getEdgeValueClass(lastJob.getConfiguration());
+ Path lastOutputPath = FileOutputFormat.getOutputPath(lastJob);
+
+ Class currentVertexIdClass = BspUtils.getVertexIndexClass(currentJob.getConfiguration());
+ Class currentVertexValueClass = BspUtils.getVertexValueClass(currentJob.getConfiguration());
+ Class currentEdegeValueClass = BspUtils.getEdgeValueClass(currentJob.getConfiguration());
+ Path[] currentInputPaths = FileInputFormat.getInputPaths(currentJob);
+
+ return lastVertexIdClass.equals(currentVertexIdClass)
+ && lastVertexValueClass.equals(currentVertexValueClass)
+ && lastEdgeValueClass.equals(currentEdegeValueClass)
+ && (currentInputPaths.length == 0 || (currentInputPaths.length == 1 && lastOutputPath
+ .equals(currentInputPaths[0])));
+ }
+
+ private JobGen selectJobGen(Plan planChoice, PregelixJob currentJob) {
+ JobGen jobGen;
+ switch (planChoice) {
+ case INNER_JOIN:
+ jobGen = new JobGenInnerJoin(currentJob);
+ break;
+ case OUTER_JOIN:
+ jobGen = new JobGenOuterJoin(currentJob);
+ break;
+ case OUTER_JOIN_SORT:
+ jobGen = new JobGenOuterJoinSort(currentJob);
+ break;
+ case OUTER_JOIN_SINGLE_SORT:
+ jobGen = new JobGenOuterJoinSingleSort(currentJob);
+ break;
+ default:
+ jobGen = new JobGenInnerJoin(currentJob);
+ }
+ return jobGen;
+ }
+
+ private long loadData(PregelixJob currentJob, JobGen jobGen, DeploymentId deploymentId) throws IOException,
+ Exception {
+ long start;
+ long end;
+ long time;
+ start = System.currentTimeMillis();
+ FileSystem dfs = FileSystem.get(currentJob.getConfiguration());
+ Path outputPath = FileOutputFormat.getOutputPath(currentJob);
+ if (outputPath != null) {
+ dfs.delete(outputPath, true);
+ }
+ runCreate(deploymentId, jobGen);
+ runDataLoad(deploymentId, jobGen);
+ end = System.currentTimeMillis();
+ time = end - start;
+ LOG.info("data loading finished " + time + "ms");
+ return time;
+ }
+
+ private void finishJobs(JobGen jobGen, DeploymentId deploymentId) throws Exception {
+ long start;
+ long end;
+ long time;
+ start = System.currentTimeMillis();
+ runHDFSWRite(deploymentId, jobGen);
+ runCleanup(deploymentId, jobGen);
+ end = System.currentTimeMillis();
+ time = end - start;
+ LOG.info("result writing finished " + time + "ms");
+ }
+
+ private DeploymentId prepareJobs(String ipAddress, int port) throws Exception {
+ if (hcc == null)
+ hcc = new HyracksConnection(ipAddress, port);
+
+ URLClassLoader classLoader = (URLClassLoader) exampleClass.getClassLoader();
+ List<File> jars = new ArrayList<File>();
+ URL[] urls = classLoader.getURLs();
+ for (URL url : urls)
+ if (url.toString().endsWith(".jar"))
+ jars.add(new File(url.getPath()));
+ DeploymentId deploymentId = installApplication(jars);
+ return deploymentId;
+ }
+
+ private void addHadoopConfiguration(PregelixJob job, String ipAddress, int port) throws HyracksException {
+ URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
+ if (hadoopCore != null) {
+ job.getConfiguration().addResource(hadoopCore);
+ }
+ URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
+ if (hadoopMapRed != null) {
+ job.getConfiguration().addResource(hadoopMapRed);
+ }
+ URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
+ if (hadoopHdfs != null) {
+ job.getConfiguration().addResource(hadoopHdfs);
+ }
+ ClusterConfig.loadClusterConfig(ipAddress, port);
+ }
+
+ private void runLoopBody(DeploymentId deploymentId, PregelixJob job, JobGen jobGen, int currentJobIndex,
+ IntWritable snapshotJobIndex, IntWritable snapshotSuperstep, ICheckpointHook ckpHook, boolean doRecovery)
+ throws Exception {
+ if (doRecovery) {
+ /** reload the checkpoint */
+ runLoadCheckpoint(deploymentId, jobGen, snapshotSuperstep.get());
+
+ }
+ int i = doRecovery ? snapshotSuperstep.get() + 1 : 1;
+ boolean terminate = false;
+ long start, end, time;
+ do {
+ start = System.currentTimeMillis();
+ runLoopBodyIteration(deploymentId, jobGen, i);
+ end = System.currentTimeMillis();
+ time = end - start;
+ LOG.info(job + ": iteration " + i + " finished " + time + "ms");
+ terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId())
+ || IterationUtils.readForceTerminationState(job.getConfiguration(), jobGen.getJobId());
+ if (ckpHook.checkpoint(i)) {
+ runCheckpoint(deploymentId, jobGen, i);
+ snapshotSuperstep.set(i);
+ snapshotJobIndex.set(currentJobIndex);
+ }
+ i++;
+ } while (!terminate);
+ }
+
+ private void runCheckpoint(DeploymentId deploymentId, JobGen jobGen, int lastSuccessfulIteration) throws Exception {
+ try {
+ JobSpecification[] ckpJobs = jobGen.generateCheckpointing(lastSuccessfulIteration);
+ runJobArray(deploymentId, ckpJobs);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
+ private void runLoadCheckpoint(DeploymentId deploymentId, JobGen jobGen, int checkPointedIteration)
+ throws Exception {
+ try {
+ JobSpecification[] ckpJobs = jobGen.generateLoadingCheckpoint(checkPointedIteration);
+ runJobArray(deploymentId, ckpJobs);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
private void runCreate(DeploymentId deploymentId, JobGen jobGen) throws Exception {
try {
JobSpecification treeCreateSpec = jobGen.generateCreatingJob();
@@ -196,6 +344,15 @@
}
}
+ private void runClearState(DeploymentId deploymentId, JobGen jobGen) throws Exception {
+ try {
+ JobSpecification clear = jobGen.generateClearState();
+ execute(deploymentId, clear);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
private void runJobArray(DeploymentId deploymentId, JobSpecification[] jobs) throws Exception {
for (JobSpecification job : jobs) {
execute(deploymentId, job);
@@ -204,6 +361,7 @@
private void execute(DeploymentId deploymentId, JobSpecification job) throws Exception {
job.setUseConnectorPolicyForScheduling(false);
+ job.setMaxReattempts(0);
JobId jobId = hcc.startJob(deploymentId, job,
profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
hcc.waitForCompletion(jobId);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 478ac07..d37c6fd 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -15,8 +15,14 @@
package edu.uci.ics.pregelix.core.jobgen;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.io.File;
+import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
@@ -24,10 +30,15 @@
import java.util.logging.Logger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -50,6 +61,10 @@
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs2.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSWriteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
@@ -67,9 +82,12 @@
import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.internal.InternalVertexInputFormat;
+import edu.uci.ics.pregelix.api.io.internal.InternalVertexOutputFormat;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.api.util.ReflectionUtils;
@@ -79,7 +97,14 @@
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.ClearStateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.KeyValueParserFactory;
+import edu.uci.ics.pregelix.dataflow.KeyValueWriterFactory;
+import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.VertexFileScanOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
@@ -99,8 +124,8 @@
protected static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
protected static final int tableSize = 10485767;
protected static final String PRIMARY_INDEX = "primary";
- protected final Configuration conf;
- protected final PregelixJob giraphJob;
+ protected Configuration conf;
+ protected PregelixJob pregelixJob;
protected IIndexLifecycleManagerProvider lcManagerProvider = IndexLifeCycleManagerProvider.INSTANCE;
protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
@@ -110,12 +135,19 @@
protected static final String SECONDARY_INDEX_ODD = "secondary1";
protected static final String SECONDARY_INDEX_EVEN = "secondary2";
+ private String vertexCheckpointPath;
+
public JobGen(PregelixJob job) {
- this.conf = job.getConfiguration();
- this.giraphJob = job;
- this.initJobConfiguration();
+ init(job);
+ }
+
+ private void init(PregelixJob job) {
+ conf = job.getConfiguration();
+ pregelixJob = job;
+ initJobConfiguration();
job.setJobId(jobId);
+ vertexCheckpointPath = "/tmp/ckpoint/" + jobId + "/vertex";
// set the frame size to be the one user specified if the user did
// specify.
int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
@@ -128,6 +160,10 @@
}
}
+ public void reset(PregelixJob job) {
+ init(job);
+ }
+
@SuppressWarnings({ "rawtypes", "unchecked" })
private void initJobConfiguration() {
Class vertexClass = conf.getClass(PregelixJob.VERTEX_CLASS, Vertex.class);
@@ -187,74 +223,6 @@
return spec;
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public JobSpecification generateLoadingJob() throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
- Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
-
- /**
- * the graph file scan operator and use count constraint first, will use
- * absolute constraint later
- */
- VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
- List<InputSplit> splits = new ArrayList<InputSplit>();
- try {
- splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
- LOGGER.info("number of splits: " + splits.size());
- for (InputSplit split : splits)
- LOGGER.info(split.toString());
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), vertexClass.getName());
- IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
- VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
- readSchedule, confFactory);
- ClusterConfig.setLocationConstraint(spec, scanner);
-
- /**
- * construct sort operator
- */
- int[] sortFields = new int[1];
- sortFields[0] = 0;
- INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
- ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, sortFields,
- nkmFactory, comparatorFactories, recordDescriptor);
- ClusterConfig.setLocationConstraint(spec, sorter);
-
- /**
- * construct tree bulk load operator
- */
- int[] fieldPermutation = new int[2];
- fieldPermutation[0] = 0;
- fieldPermutation[1] = 1;
- ITypeTraits[] typeTraits = new ITypeTraits[2];
- typeTraits[0] = new TypeTraits(false);
- typeTraits[1] = new TypeTraits(false);
- TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
- sortFields, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, false, 0, false,
- getIndexDataflowHelperFactory(), NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
-
- /**
- * connect operator descriptors
- */
- ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sorter, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
- spec.setFrameSize(frameSize);
- return spec;
- }
-
@Override
public JobSpecification generateJob(int iteration) throws HyracksException {
if (iteration <= 0)
@@ -280,7 +248,7 @@
VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
List<InputSplit> splits = new ArrayList<InputSplit>();
try {
- splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
+ splits = inputFormat.getSplits(pregelixJob, fileSplitProvider.getFileSplits().length);
} catch (Exception e) {
throw new HyracksDataException(e);
}
@@ -398,8 +366,184 @@
return spec;
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
public JobSpecification scanIndexWriteGraph() throws HyracksException {
+ JobSpecification spec = scanIndexWriteToHDFS(conf);
+ return spec;
+ }
+
+ @Override
+ public JobSpecification[] generateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
+ try {
+ PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
+ tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
+ FileOutputFormat.setOutputPath(tmpJob, new Path(vertexCheckpointPath));
+ tmpJob.setOutputKeyClass(NullWritable.class);
+ tmpJob.setOutputValueClass(BspUtils.getVertexClass(tmpJob.getConfiguration()));
+ JobSpecification vertexCkpSpec = scanIndexWriteToHDFS(tmpJob.getConfiguration());
+ JobSpecification[] stateCkpSpecs = generateStateCheckpointing(lastSuccessfulIteration);
+ JobSpecification[] specs = new JobSpecification[1 + stateCkpSpecs.length];
+ specs[0] = vertexCkpSpec;
+ for (int i = 1; i < specs.length; i++) {
+ specs[i] = stateCkpSpecs[i - 1];
+ }
+ return specs;
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ @Override
+ public JobSpecification generateLoadingJob() throws HyracksException {
+ JobSpecification spec = loadHDFSData(conf);
+ return spec;
+ }
+
+ public void clearCheckpoints() throws IOException {
+ FileSystem dfs = FileSystem.get(conf);
+ // clear the checkpoint directory
+ dfs.delete(new Path("/tmp/ckpoint/" + jobId), true);
+ }
+
+ @Override
+ public JobSpecification[] generateLoadingCheckpoint(int lastCheckpointedIteration) throws HyracksException {
+ try {
+ PregelixJob tmpJob = this.createCloneJob("Vertex checkpoint loading for job " + jobId, pregelixJob);
+ tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
+ FileInputFormat.setInputPaths(tmpJob, new Path(vertexCheckpointPath));
+ JobSpecification vertexLoadSpec = loadHDFSData(tmpJob.getConfiguration());
+ JobSpecification[] stateLoadSpecs = generateStateCheckpointLoading(lastCheckpointedIteration, tmpJob);
+ JobSpecification[] specs = new JobSpecification[1 + stateLoadSpecs.length];
+ specs[0] = vertexLoadSpec;
+ for (int i = 1; i < specs.length; i++) {
+ specs[i] = stateLoadSpecs[i - 1];
+ }
+ return specs;
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ /***
+ * generate a "clear state" job
+ */
+ public JobSpecification generateClearState() throws HyracksException {
+ JobSpecification spec = new JobSpecification();
+ ClearStateOperatorDescriptor clearState = new ClearStateOperatorDescriptor(spec, jobId);
+ ClusterConfig.setLocationConstraint(spec, clearState);
+ spec.addRoot(clearState);
+ return spec;
+ }
+
+ /***
+ * drop the sindex
+ *
+ * @return JobSpecification
+ * @throws HyracksException
+ */
+ protected JobSpecification dropIndex(String indexName) throws HyracksException {
+ JobSpecification spec = new JobSpecification();
+
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
+ IndexDropOperatorDescriptor drop = new IndexDropOperatorDescriptor(spec, storageManagerInterface,
+ lcManagerProvider, fileSplitProvider, getIndexDataflowHelperFactory());
+
+ ClusterConfig.setLocationConstraint(spec, drop);
+ spec.addRoot(drop);
+ spec.setFrameSize(frameSize);
+ return spec;
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ protected ITuplePartitionComputerFactory getVertexPartitionComputerFactory() {
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ Class<? extends VertexPartitioner> partitionerClazz = BspUtils.getVertexPartitionerClass(conf);
+ if (partitionerClazz != null) {
+ return new VertexPartitionComputerFactory(confFactory);
+ } else {
+ return new VertexIdPartitionComputerFactory(new WritableSerializerDeserializerFactory(
+ BspUtils.getVertexIndexClass(conf)));
+ }
+ }
+
+ protected IIndexDataflowHelperFactory getIndexDataflowHelperFactory() {
+ if (BspUtils.useLSM(conf)) {
+ return new LSMBTreeDataflowHelperFactory(new VirtualBufferCacheProvider(), new ConstantMergePolicyProvider(
+ 3), NoOpOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
+ NoOpIOOperationCallback.INSTANCE, 0.01);
+ } else {
+ return new BTreeDataflowHelperFactory();
+ }
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private JobSpecification loadHDFSData(Configuration conf) throws HyracksException, HyracksDataException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ JobSpecification spec = new JobSpecification();
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+
+ /**
+ * the graph file scan operator and use count constraint first, will use
+ * absolute constraint later
+ */
+ VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ try {
+ splits = inputFormat.getSplits(pregelixJob, fileSplitProvider.getFileSplits().length);
+ LOGGER.info("number of splits: " + splits.size());
+ for (InputSplit split : splits)
+ LOGGER.info(split.toString());
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
+ VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
+ readSchedule, confFactory);
+ ClusterConfig.setLocationConstraint(spec, scanner);
+
+ /**
+ * construct sort operator
+ */
+ int[] sortFields = new int[1];
+ sortFields[0] = 0;
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, sortFields,
+ nkmFactory, comparatorFactories, recordDescriptor);
+ ClusterConfig.setLocationConstraint(spec, sorter);
+
+ /**
+ * construct tree bulk load operator
+ */
+ int[] fieldPermutation = new int[2];
+ fieldPermutation[0] = 0;
+ fieldPermutation[1] = 1;
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ sortFields, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, true, 0, false,
+ getIndexDataflowHelperFactory(), NoOpOperationCallbackFactory.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+
+ /**
+ * connect operator descriptors
+ */
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sorter, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
+ spec.setFrameSize(frameSize);
+ return spec;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private JobSpecification scanIndexWriteToHDFS(Configuration conf) throws HyracksDataException, HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
JobSpecification spec = new JobSpecification();
@@ -456,45 +600,115 @@
return spec;
}
- /***
- * drop the sindex
- *
- * @return JobSpecification
- * @throws HyracksException
- */
- protected JobSpecification dropIndex(String indexName) throws HyracksException {
- JobSpecification spec = new JobSpecification();
-
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
- IndexDropOperatorDescriptor drop = new IndexDropOperatorDescriptor(spec, storageManagerInterface,
- lcManagerProvider, fileSplitProvider, getIndexDataflowHelperFactory());
-
- ClusterConfig.setLocationConstraint(spec, drop);
- spec.addRoot(drop);
- spec.setFrameSize(frameSize);
- return spec;
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- protected ITuplePartitionComputerFactory getVertexPartitionComputerFactory() {
- IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- Class<? extends VertexPartitioner> partitionerClazz = BspUtils.getVertexPartitionerClass(conf);
- if (partitionerClazz != null) {
- return new VertexPartitionComputerFactory(confFactory);
- } else {
- return new VertexIdPartitionComputerFactory(new WritableSerializerDeserializerFactory(
- BspUtils.getVertexIndexClass(conf)));
+ protected PregelixJob createCloneJob(String newJobName, PregelixJob oldJob) throws HyracksException {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutput dos = new DataOutputStream(bos);
+ oldJob.getConfiguration().write(dos);
+ PregelixJob newJob = new PregelixJob(newJobName);
+ DataInput dis = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+ newJob.getConfiguration().readFields(dis);
+ return newJob;
+ } catch (IOException e) {
+ throw new HyracksException(e);
}
}
- protected IIndexDataflowHelperFactory getIndexDataflowHelperFactory() {
- if (BspUtils.useLSM(conf)) {
- return new LSMBTreeDataflowHelperFactory(new VirtualBufferCacheProvider(), new ConstantMergePolicyProvider(
- 3), NoOpOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
- NoOpIOOperationCallback.INSTANCE, 0.01);
- } else {
- return new BTreeDataflowHelperFactory();
- }
+ /** generate plan specific state checkpointing */
+ protected JobSpecification[] generateStateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ JobSpecification spec = new JobSpecification();
+
+ /**
+ * source aggregate
+ */
+ RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ MsgList.class.getName());
+
+ /**
+ * construct empty tuple operator
+ */
+ EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+ /**
+ * construct the materializing write operator
+ */
+ MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, materializeRead);
+
+ String checkpointPath = "/tmp/ckpoint/" + jobId + "/message";
+ PregelixJob tmpJob = createCloneJob("State checkpointing for job " + jobId, pregelixJob);
+ tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
+ FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
+ tmpJob.setOutputKeyClass(vertexIdClass);
+ tmpJob.setOutputValueClass(MsgList.class);
+
+ ITupleWriterFactory writerFactory = new KeyValueWriterFactory(new ConfFactory(tmpJob));
+ HDFSWriteOperatorDescriptor hdfsWriter = new HDFSWriteOperatorDescriptor(spec, tmpJob, writerFactory);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, materializeRead, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, hdfsWriter, 0);
+ spec.setFrameSize(frameSize);
+ return new JobSpecification[] { spec };
+ }
+
+ /** load plan specific state checkpoints */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ protected JobSpecification[] generateStateCheckpointLoading(int lastCheckpointedIteration, PregelixJob job)
+ throws HyracksException {
+ String checkpointPath = "/tmp/ckpoint/" + jobId + "/message";
+ PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
+ tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
+ try {
+ FileInputFormat.setInputPaths(tmpJob, checkpointPath);
+ } catch (IOException e) {
+ throw new HyracksException(e);
+ }
+ Configuration conf = job.getConfiguration();
+ Class vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ JobSpecification spec = new JobSpecification();
+
+ /***
+ * HDFS read operator
+ */
+ VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ try {
+ splits = inputFormat.getSplits(tmpJob, ClusterConfig.getLocationConstraint().length);
+ LOGGER.info("number of splits: " + splits.size());
+ for (InputSplit split : splits)
+ LOGGER.info(split.toString());
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), MsgList.class.getName());
+ String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
+ HDFSReadOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(spec, recordDescriptor, tmpJob, splits,
+ readSchedule, new KeyValueParserFactory());
+ ClusterConfig.setLocationConstraint(spec, scanner);
+
+ /**
+ * construct the materializing write operator
+ */
+ MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec,
+ recordDescriptor);
+ ClusterConfig.setLocationConstraint(spec, materialize);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink);
+
+ /**
+ * connect operator descriptors
+ */
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0,
+ materialize, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, emptySink, 0);
+ spec.setFrameSize(frameSize);
+ return new JobSpecification[] { spec };
}
/** generate non-first iteration job */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index c144ddd..58384b2 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -15,28 +15,51 @@
package edu.uci.ics.pregelix.core.jobgen;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs2.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.internal.InternalVertexInputFormat;
+import edu.uci.ics.pregelix.api.io.internal.InternalVertexOutputFormat;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.core.data.TypeTraits;
@@ -48,6 +71,8 @@
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.FinalAggregateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.KeyValueParserFactory;
+import edu.uci.ics.pregelix.dataflow.KeyValueWriterFactory;
import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
@@ -69,6 +94,7 @@
import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
public class JobGenInnerJoin extends JobGen {
+ private static final Logger LOGGER = Logger.getLogger(JobGen.class.getName());
public JobGenInnerJoin(PregelixJob job) {
super(job);
@@ -496,6 +522,171 @@
return spec;
}
+ /** generate plan specific state checkpointing */
+ protected JobSpecification[] generateStateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
+ JobSpecification[] msgCkpSpecs = super.generateStateCheckpointing(lastSuccessfulIteration);
+ PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
+ tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
+
+ /** generate secondary index checkpoint */
+ String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary";
+ FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
+ tmpJob.setOutputKeyClass(BspUtils.getVertexIndexClass(tmpJob.getConfiguration()));
+ tmpJob.setOutputValueClass(MsgList.class);
+ JobSpecification secondaryBTreeCkp = generateSecondaryBTreeCheckpoint(lastSuccessfulIteration, tmpJob);
+
+ JobSpecification[] specs = new JobSpecification[msgCkpSpecs.length + 1];
+ for (int i = 0; i < msgCkpSpecs.length; i++) {
+ specs[i] = msgCkpSpecs[i];
+ }
+ specs[specs.length - 1] = secondaryBTreeCkp;
+ return specs;
+ }
+
+ /**
+ * generate plan specific checkpoint loading
+ */
+ @Override
+ protected JobSpecification[] generateStateCheckpointLoading(int lastSuccessfulIteration, PregelixJob job)
+ throws HyracksException {
+ JobSpecification[] msgCkpSpecs = generateStateCheckpointLoading(lastSuccessfulIteration, job);
+ PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
+ tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
+
+ /** generate secondary index checkpoint load */
+ JobSpecification secondaryBTreeCkpLoad = generateSecondaryBTreeCheckpointLoad(lastSuccessfulIteration, tmpJob);
+ JobSpecification[] specs = new JobSpecification[msgCkpSpecs.length + 1];
+ for (int i = 0; i < msgCkpSpecs.length; i++) {
+ specs[i] = msgCkpSpecs[i];
+ }
+ specs[specs.length - 1] = secondaryBTreeCkpLoad;
+ return specs;
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private JobSpecification generateSecondaryBTreeCheckpointLoad(int lastSuccessfulIteration, PregelixJob job)
+ throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
+ JobSpecification spec = new JobSpecification();
+
+ String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary";
+ PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
+ tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
+ try {
+ FileInputFormat.setInputPaths(tmpJob, checkpointPath);
+ } catch (IOException e) {
+ throw new HyracksException(e);
+ }
+ Configuration conf = job.getConfiguration();
+
+ /***
+ * construct HDFS read operator
+ */
+ VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ try {
+ splits = inputFormat.getSplits(tmpJob, ClusterConfig.getLocationConstraint().length);
+ LOGGER.info("number of splits: " + splits.size());
+ for (InputSplit split : splits)
+ LOGGER.info(split.toString());
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), MsgList.class.getName());
+ String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
+ HDFSReadOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(spec, recordDescriptor, tmpJob, splits,
+ readSchedule, new KeyValueParserFactory());
+ ClusterConfig.setLocationConstraint(spec, scanner);
+
+ /**
+ * construct bulk-load index operator
+ */
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ int fieldPermutation[] = new int[] { 0, 1 };
+ IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
+ indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration + 1, WritableComparator
+ .get(vertexIdClass).getClass());
+ String writeFile = lastSuccessfulIteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
+ IFileSplitProvider secondaryFileSplitProviderWrite = ClusterConfig.getFileSplitProvider(jobId, writeFile);
+ TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
+ storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderWrite, typeTraits,
+ indexCmpFactories, fieldPermutation, new int[] { 0 }, DEFAULT_BTREE_FILL_FACTOR,
+ getIndexDataflowHelperFactory());
+ ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+
+ /**
+ * connect operator descriptors
+ */
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0,
+ btreeBulkLoad, 0);
+ spec.setFrameSize(frameSize);
+
+ return spec;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private JobSpecification generateSecondaryBTreeCheckpoint(int lastSuccessfulIteration, PregelixJob job)
+ throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
+ Class<? extends Writable> msgListClass = MsgList.class;
+ String readFile = lastSuccessfulIteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
+ IFileSplitProvider secondaryFileSplitProviderRead = ClusterConfig.getFileSplitProvider(jobId, readFile);
+ JobSpecification spec = new JobSpecification();
+ /**
+ * construct empty tuple operator
+ */
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+ tb.addFieldEndOffset();
+ ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+ ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
+ keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+ /**
+ * construct btree search operator
+ */
+ ConfFactory confFactory = new ConfFactory(job);
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), msgListClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+
+ BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
+ storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, typeTraits,
+ comparatorFactories, null, null, null, true, true, getIndexDataflowHelperFactory(), false,
+ NoOpOperationCallbackFactory.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, scanner);
+
+ /**
+ * construct write file operator
+ */
+ HDFSWriteOperatorDescriptor writer = new HDFSWriteOperatorDescriptor(spec, job, new KeyValueWriterFactory(
+ confFactory));
+ ClusterConfig.setLocationConstraint(spec, writer);
+
+ /**
+ * connect operator descriptors
+ */
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer, 0);
+ spec.setFrameSize(frameSize);
+ return spec;
+ }
+
@Override
public JobSpecification[] generateCleanup() throws HyracksException {
JobSpecification[] cleanups = new JobSpecification[3];
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
index 49309ec..ea6cc8a 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Random;
+import java.util.TreeMap;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -197,7 +198,8 @@
public static void loadClusterConfig(String ipAddress, int port) throws HyracksException {
try {
IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
- Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();
+ Map<String, NodeControllerInfo> ncNameToNcInfos = new TreeMap<String, NodeControllerInfo>();
+ ncNameToNcInfos.putAll(hcc.getNodeControllerInfos());
NCs = new String[ncNameToNcInfos.size()];
ipToNcMapping = new HashMap<String, List<String>>();
int i = 0;
@@ -226,4 +228,16 @@
public static Scheduler getHdfsScheduler() {
return hdfsScheduler;
}
+
+ public static String[] getLocationConstraint() throws HyracksException {
+ int count = 0;
+ String[] locations = new String[NCs.length * stores.length];
+ for (String nc : NCs) {
+ for (int i = 0; i < stores.length; i++) {
+ locations[count] = nc;
+ count++;
+ }
+ }
+ return locations;
+ }
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
index 8709301..392f728 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
@@ -34,7 +34,7 @@
int srcLen = fieldEndOffsets[1] - fieldEndOffsets[0]; // the updated vertex size
int frSize = frameTuple.getFieldLength(1); // the vertex binary size in the leaf page
if (srcLen <= frSize) {
- //doing in-place update if possible, save the "real update" overhead
+ //doing in-place update if the vertex size is not larger than the original size, save the "real update" overhead
System.arraycopy(cloneUpdateTb.getByteArray(), srcStart, frameTuple.getFieldData(1),
frameTuple.getFieldStart(1), srcLen);
cloneUpdateTb.reset();
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/ResetableByteArrayInputStream.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/ResetableByteArrayInputStream.java
index b697466..5be9ffc 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/ResetableByteArrayInputStream.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/ResetableByteArrayInputStream.java
@@ -15,11 +15,8 @@
package edu.uci.ics.pregelix.dataflow.util;
import java.io.InputStream;
-import java.util.logging.Level;
-import java.util.logging.Logger;
public class ResetableByteArrayInputStream extends InputStream {
- private static final Logger LOGGER = Logger.getLogger(ResetableByteArrayInputStream.class.getName());
private byte[] data;
private int position;
@@ -36,19 +33,12 @@
public int read() {
int remaining = data.length - position;
int value = remaining > 0 ? (data[position++] & 0xff) : -1;
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("read(): value: " + value + " remaining: " + remaining + " position: " + position);
- }
return value;
}
@Override
public int read(byte[] bytes, int offset, int length) {
int remaining = data.length - position;
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("read(bytes[], int, int): remaining: " + remaining + " offset: " + offset + " length: "
- + length + " position: " + position);
- }
if (remaining == 0) {
return -1;
}
@@ -57,4 +47,9 @@
position += l;
return l;
}
+
+ @Override
+ public int available() {
+ return data.length - position;
+ }
}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
index 150bd8b..2fa1a4b 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
@@ -17,19 +17,15 @@
import java.io.DataInputStream;
import java.io.IOException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameConstants;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
public class TupleDeserializer {
- private static final Logger LOGGER = Logger.getLogger(TupleDeserializer.class.getName());
-
+ private static String ERROR_MSG = "Out-of-bound read in your Writable implementations of types for vertex id, vertex value, edge value or message --- check your readFields and write implmenetation";
private Object[] record;
private RecordDescriptor recordDescriptor;
private ResetableByteArrayInputStream bbis;
@@ -43,132 +39,120 @@
}
public Object[] deserializeRecord(ITupleReference tupleRef) throws HyracksDataException {
- for (int i = 0; i < tupleRef.getFieldCount(); ++i) {
- byte[] data = tupleRef.getFieldData(i);
- int offset = tupleRef.getFieldStart(i);
- bbis.setByteArray(data, offset);
+ try {
+ for (int i = 0; i < tupleRef.getFieldCount(); ++i) {
+ byte[] data = tupleRef.getFieldData(i);
+ int offset = tupleRef.getFieldStart(i);
+ int len = tupleRef.getFieldLength(i);
+ bbis.setByteArray(data, offset);
- Object instance = recordDescriptor.getFields()[i].deserialize(di);
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest(i + " " + instance);
- }
- record[i] = instance;
- if (FrameConstants.DEBUG_FRAME_IO) {
- try {
- if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
- throw new HyracksDataException("Field magic mismatch");
- }
- } catch (IOException e) {
- e.printStackTrace();
+ int availableBefore = bbis.available();
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ int availableAfter = bbis.available();
+ if (availableBefore - availableAfter > len) {
+ throw new IllegalStateException(ERROR_MSG);
}
+
+ record[i] = instance;
}
+ return record;
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
}
- return record;
}
public Object[] deserializeRecord(IFrameTupleAccessor left, int tIndex, ITupleReference right)
throws HyracksDataException {
- byte[] data = left.getBuffer().array();
- int tStart = left.getTupleStartOffset(tIndex) + left.getFieldSlotsLength();
- int leftFieldCount = left.getFieldCount();
- int fStart = tStart;
- for (int i = 0; i < leftFieldCount; ++i) {
- /**
- * reset the input
- */
- fStart = tStart + left.getFieldStartOffset(tIndex, i);
- bbis.setByteArray(data, fStart);
+ try {
+ /** skip vertex id field in deserialization */
+ byte[] data = left.getBuffer().array();
+ int tStart = left.getTupleStartOffset(tIndex) + left.getFieldSlotsLength();
+ int leftFieldCount = left.getFieldCount();
+ int fStart = tStart;
+ for (int i = 1; i < leftFieldCount; ++i) {
+ /**
+ * reset the input
+ */
+ fStart = tStart + left.getFieldStartOffset(tIndex, i);
+ int fieldLength = left.getFieldLength(tIndex, i);
+ bbis.setByteArray(data, fStart);
- /**
- * do deserialization
- */
- Object instance = recordDescriptor.getFields()[i].deserialize(di);
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest(i + " " + instance);
- }
- record[i] = instance;
- if (FrameConstants.DEBUG_FRAME_IO) {
- try {
- if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
- throw new HyracksDataException("Field magic mismatch");
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- for (int i = leftFieldCount; i < record.length; ++i) {
- byte[] rightData = right.getFieldData(i - leftFieldCount);
- int rightOffset = right.getFieldStart(i - leftFieldCount);
- bbis.setByteArray(rightData, rightOffset);
+ /**
+ * do deserialization
+ */
+ int availableBefore = bbis.available();
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ int availableAfter = bbis.available();
+ if (availableBefore - availableAfter > fieldLength) {
+ throw new IllegalStateException(ERROR_MSG);
- Object instance = recordDescriptor.getFields()[i].deserialize(di);
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest(i + " " + instance);
- }
- record[i] = instance;
- if (FrameConstants.DEBUG_FRAME_IO) {
- try {
- if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
- throw new HyracksDataException("Field magic mismatch");
- }
- } catch (IOException e) {
- e.printStackTrace();
}
+ record[i] = instance;
}
+ /** skip vertex id field in deserialization */
+ for (int i = leftFieldCount + 1; i < record.length; ++i) {
+ byte[] rightData = right.getFieldData(i - leftFieldCount);
+ int rightOffset = right.getFieldStart(i - leftFieldCount);
+ int len = right.getFieldLength(i - leftFieldCount);
+ bbis.setByteArray(rightData, rightOffset);
+
+ int availableBefore = bbis.available();
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ int availableAfter = bbis.available();
+ if (availableBefore - availableAfter > len) {
+ throw new IllegalStateException(ERROR_MSG);
+ }
+ record[i] = instance;
+ }
+ return record;
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
}
- return record;
}
public Object[] deserializeRecord(ArrayTupleBuilder tb, ITupleReference right) throws HyracksDataException {
- byte[] data = tb.getByteArray();
- int[] offset = tb.getFieldEndOffsets();
- int start = 0;
- for (int i = 0; i < offset.length; ++i) {
- /**
- * reset the input
- */
- bbis.setByteArray(data, start);
- start = offset[i];
+ try {
+ byte[] data = tb.getByteArray();
+ int[] offset = tb.getFieldEndOffsets();
+ int start = 0;
+ /** skip vertex id fields in deserialization */
+ for (int i = 1; i < offset.length; ++i) {
+ /**
+ * reset the input
+ */
+ start = offset[i - 1];
+ bbis.setByteArray(data, start);
+ int fieldLength = i == 0 ? offset[0] : offset[i] - offset[i - 1];
- /**
- * do deserialization
- */
- Object instance = recordDescriptor.getFields()[i].deserialize(di);
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest(i + " " + instance);
- }
- record[i] = instance;
- if (FrameConstants.DEBUG_FRAME_IO) {
- try {
- if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
- throw new HyracksDataException("Field magic mismatch");
- }
- } catch (IOException e) {
- e.printStackTrace();
+ /**
+ * do deserialization
+ */
+ int availableBefore = bbis.available();
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ int availableAfter = bbis.available();
+ if (availableBefore - availableAfter > fieldLength) {
+ throw new IllegalStateException(ERROR_MSG);
}
+ record[i] = instance;
}
- }
- for (int i = offset.length; i < record.length; ++i) {
- byte[] rightData = right.getFieldData(i - offset.length);
- int rightOffset = right.getFieldStart(i - offset.length);
- bbis.setByteArray(rightData, rightOffset);
+ /** skip vertex id fields in deserialization */
+ for (int i = offset.length + 1; i < record.length; ++i) {
+ byte[] rightData = right.getFieldData(i - offset.length);
+ int rightOffset = right.getFieldStart(i - offset.length);
+ bbis.setByteArray(rightData, rightOffset);
+ int fieldLength = right.getFieldLength(i - offset.length);
- Object instance = recordDescriptor.getFields()[i].deserialize(di);
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest(i + " " + instance);
- }
- record[i] = instance;
- if (FrameConstants.DEBUG_FRAME_IO) {
- try {
- if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
- throw new HyracksDataException("Field magic mismatch");
- }
- } catch (IOException e) {
- e.printStackTrace();
+ int availableBefore = bbis.available();
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ int availableAfter = bbis.available();
+ if (availableBefore - availableAfter > fieldLength) {
+ throw new IllegalStateException(ERROR_MSG);
}
+ record[i] = instance;
}
+ return record;
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
}
- return record;
}
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
index ea1e02e..4421695 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
@@ -28,6 +28,7 @@
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
/**
* The buffer to hold updates.
@@ -96,7 +97,12 @@
fta.reset(buffer);
for (int j = 0; j < fta.getTupleCount(); j++) {
tuple.reset(fta, j);
- bta.update(tuple);
+ try {
+ bta.update(tuple);
+ } catch (TreeIndexNonExistentKeyException e) {
+ // ignore non-existent key exception
+ bta.insert(tuple);
+ }
}
}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ClearStateOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ClearStateOperatorDescriptor.java
new file mode 100644
index 0000000..d86557b
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ClearStateOperatorDescriptor.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
+
+/**
+ * Clear the state of the RuntimeContext in one slave
+ *
+ * @author yingyib
+ */
+public class ClearStateOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ private String jobId;
+
+ public ClearStateOperatorDescriptor(JobSpecification spec, String jobId) {
+ super(spec, 0, 0);
+ this.jobId = jobId;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return new IOperatorNodePushable() {
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ RuntimeContext context = (RuntimeContext) ctx.getJobletContext().getApplicationContext()
+ .getApplicationObject();
+ context.clearState(jobId);
+ }
+
+ @Override
+ public void deinitialize() throws HyracksDataException {
+
+ }
+
+ @Override
+ public int getInputArity() {
+ return 0;
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
+ throws HyracksDataException {
+
+ }
+
+ @Override
+ public IFrameWriter getInputFrameWriter(int index) {
+ return null;
+ }
+
+ @Override
+ public String getDisplayName() {
+ return "Clear State Operator";
+ }
+
+ };
+ }
+
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/KeyValueParserFactory.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/KeyValueParserFactory.java
new file mode 100644
index 0000000..a4a53e1
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/KeyValueParserFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
+
+/**
+ * @author yingyib
+ */
+public class KeyValueParserFactory<K extends Writable, V extends Writable> implements IKeyValueParserFactory<K, V> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IKeyValueParser<K, V> createKeyValueParser(IHyracksTaskContext ctx) throws HyracksDataException {
+ final ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+ final DataOutput dos = tb.getDataOutput();
+ final ByteBuffer buffer = ctx.allocateFrame();
+ final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(buffer, true);
+
+ return new IKeyValueParser<K, V>() {
+
+ @Override
+ public void open(IFrameWriter writer) throws HyracksDataException {
+
+ }
+
+ @Override
+ public void parse(K key, V value, IFrameWriter writer, String fileString) throws HyracksDataException {
+ try {
+ tb.reset();
+ key.write(dos);
+ tb.addFieldEndOffset();
+ value.write(dos);
+ tb.addFieldEndOffset();
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(buffer, writer);
+ appender.reset(buffer, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new HyracksDataException("tuple cannot be appended into the frame");
+ }
+ }
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void close(IFrameWriter writer) throws HyracksDataException {
+ FrameUtils.flushFrame(buffer, writer);
+ }
+
+ };
+ }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/KeyValueWriterFactory.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/KeyValueWriterFactory.java
new file mode 100644
index 0000000..fd407be
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/KeyValueWriterFactory.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.util.ReflectionUtils;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs2.dataflow.ConfFactory;
+import edu.uci.ics.pregelix.api.util.ResetableByteArrayInputStream;
+
+/**
+ * @author yingyib
+ */
+@SuppressWarnings("rawtypes")
+public class KeyValueWriterFactory implements ITupleWriterFactory {
+ private static final long serialVersionUID = 1L;
+ private ConfFactory confFactory;
+
+ public KeyValueWriterFactory(ConfFactory confFactory) {
+ this.confFactory = confFactory;
+ }
+
+ @Override
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx, final int partition, final int nPartition)
+ throws HyracksDataException {
+ return new ITupleWriter() {
+ private SequenceFileOutputFormat sequenceOutputFormat = new SequenceFileOutputFormat();
+ private Writable key;
+ private Writable value;
+ private ResetableByteArrayInputStream bis = new ResetableByteArrayInputStream();
+ private DataInput dis = new DataInputStream(bis);
+ private RecordWriter recordWriter;
+ private ContextFactory ctxFactory = new ContextFactory();
+ private TaskAttemptContext context;
+
+ @Override
+ public void open(DataOutput output) throws HyracksDataException {
+ try {
+ Job job = confFactory.getConf();
+ context = ctxFactory.createContext(job.getConfiguration(), partition);
+ recordWriter = sequenceOutputFormat.getRecordWriter(context);
+ Class<?> keyClass = context.getOutputKeyClass();
+ Class<?> valClass = context.getOutputValueClass();
+ key = (Writable) ReflectionUtils.createInstance(keyClass);
+ value = (Writable) ReflectionUtils.createInstance(valClass);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+ try {
+ byte[] data = tuple.getFieldData(0);
+ int fieldStart = tuple.getFieldStart(0);
+ bis.setByteArray(data, fieldStart);
+ key.readFields(dis);
+ data = tuple.getFieldData(1);
+ fieldStart = tuple.getFieldStart(1);
+ bis.setByteArray(data, fieldStart);
+ value.readFields(dis);
+ recordWriter.write(key, value);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void close(DataOutput output) throws HyracksDataException {
+ try {
+ recordWriter.close(context);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ };
+ }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java
index 48ed806..00dcbd1 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java
@@ -68,6 +68,8 @@
@Override
public void open() throws HyracksDataException {
+ /** remove last iteration's state */
+ IterationUtils.removeIterationState(ctx, partition);
state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
partition));
INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index e25a46a..496d066 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -56,11 +56,12 @@
private final IBufferCache bufferCache;
private final IVirtualBufferCache vBufferCache;
private final IFileMapManager fileMapManager;
- private final Map<StateKey, IStateObject> appStateMap = new ConcurrentHashMap<StateKey, IStateObject>();
- private final Map<String, Long> giraphJobIdToSuperStep = new ConcurrentHashMap<String, Long>();
- private final Map<String, Boolean> giraphJobIdToMove = new ConcurrentHashMap<String, Boolean>();
private final IOManager ioManager;
private final Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
+ private final Map<StateKey, IStateObject> appStateMap = new ConcurrentHashMap<StateKey, IStateObject>();
+ private final Map<String, Long> jobIdToSuperStep = new ConcurrentHashMap<String, Long>();
+ private final Map<String, Boolean> jobIdToMove = new ConcurrentHashMap<String, Boolean>();
+
private final ThreadFactory threadFactory = new ThreadFactory() {
public Thread newThread(Runnable r) {
return new Thread(r);
@@ -100,6 +101,18 @@
System.gc();
}
+ public void clearState(String jobId) throws HyracksDataException {
+ for (Entry<Long, List<FileReference>> entry : iterationToFiles.entrySet())
+ for (FileReference fileRef : entry.getValue())
+ fileRef.delete();
+
+ iterationToFiles.clear();
+ appStateMap.clear();
+ jobIdToMove.remove(jobId);
+ jobIdToSuperStep.remove(jobId);
+ System.gc();
+ }
+
public ILocalResourceRepository getLocalResourceRepository() {
return localResourceRepository;
}
@@ -132,14 +145,14 @@
return (RuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
}
- public synchronized void setVertexProperties(String giraphJobId, long numVertices, long numEdges) {
- Boolean toMove = giraphJobIdToMove.get(giraphJobId);
+ public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges) {
+ Boolean toMove = jobIdToMove.get(jobId);
if (toMove == null || toMove == true) {
- if (giraphJobIdToSuperStep.get(giraphJobId) == null) {
- giraphJobIdToSuperStep.put(giraphJobId, 0L);
+ if (jobIdToSuperStep.get(jobId) == null) {
+ jobIdToSuperStep.put(jobId, 0L);
}
- long superStep = giraphJobIdToSuperStep.get(giraphJobId);
+ long superStep = jobIdToSuperStep.get(jobId);
List<FileReference> files = iterationToFiles.remove(superStep - 1);
if (files != null) {
for (FileReference fileRef : files)
@@ -149,15 +162,15 @@
Vertex.setSuperstep(++superStep);
Vertex.setNumVertices(numVertices);
Vertex.setNumEdges(numEdges);
- giraphJobIdToSuperStep.put(giraphJobId, superStep);
- giraphJobIdToMove.put(giraphJobId, false);
+ jobIdToSuperStep.put(jobId, superStep);
+ jobIdToMove.put(jobId, false);
LOGGER.info("start iteration " + Vertex.getSuperstep());
}
System.gc();
}
public synchronized void endSuperStep(String giraphJobId) {
- giraphJobIdToMove.put(giraphJobId, true);
+ jobIdToMove.put(giraphJobId, true);
LOGGER.info("end iteration " + Vertex.getSuperstep());
}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index 75f8ed8..603a464 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -52,6 +52,11 @@
RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
Map<StateKey, IStateObject> map = context.getAppStateStore();
IStateObject state = map.get(new StateKey(lastId, partition));
+ while (state == null) {
+ /** in case the last job is a checkpointing job */
+ lastId = new JobId(lastId.getId() - 1);
+ state = map.get(new StateKey(lastId, partition));
+ }
return state;
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
index 07d2d57..a280c45 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
@@ -139,6 +139,7 @@
job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setDynamicVertexValueSize(true);
Client.run(args, job);
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/FailureVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/FailureVertex.java
new file mode 100644
index 0000000..d2464c1
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/FailureVertex.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.io.FloatWritable;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * @author yingyib
+ */
+public class FailureVertex extends Vertex<VLongWritable, VLongWritable, FloatWritable, VLongWritable> {
+
+ @Override
+ public void compute(Iterator<VLongWritable> msgIterator) throws Exception {
+ if (getVertexId().get() == 10) {
+ throw new IllegalStateException("This job is going to fail");
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
index f60387a..f99321a 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
@@ -16,6 +16,7 @@
package edu.uci.ics.pregelix.example.client;
import java.io.IOException;
+import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -74,6 +75,13 @@
driver.runJob(job, options.planChoice, options.ipAddress, options.port, Boolean.parseBoolean(options.profiling));
}
+ public static void run(String[] args, List<PregelixJob> jobs) throws Exception {
+ Options options = prepareJobs(args, jobs);
+ Driver driver = new Driver(Client.class);
+ driver.runJobs(jobs, options.planChoice, options.ipAddress, options.port,
+ Boolean.parseBoolean(options.profiling));
+ }
+
private static Options prepareJob(String[] args, PregelixJob job) throws CmdLineException, IOException {
Options options = new Options();
CmdLineParser parser = new CmdLineParser(options);
@@ -84,6 +92,32 @@
for (int i = 1; i < inputs.length; i++)
FileInputFormat.addInputPaths(job, inputs[i]);
FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
+ setJobSpecificSettings(job, options);
+ return options;
+ }
+
+ private static Options prepareJobs(String[] args, List<PregelixJob> jobs) throws CmdLineException, IOException {
+ Options options = new Options();
+ CmdLineParser parser = new CmdLineParser(options);
+ parser.parseArgument(args);
+
+ for (int j = 0; j < jobs.size(); j++) {
+ PregelixJob job = jobs.get(j);
+ String[] inputs = options.inputPaths.split(";");
+ if (j == 0) {
+ FileInputFormat.setInputPaths(job, inputs[0]);
+ for (int i = 1; i < inputs.length; i++)
+ FileInputFormat.addInputPaths(job, inputs[i]);
+ }
+ if (j == jobs.size() - 1) {
+ FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
+ }
+ setJobSpecificSettings(job, options);
+ }
+ return options;
+ }
+
+ private static void setJobSpecificSettings(PregelixJob job, Options options) {
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, options.numVertices);
job.getConfiguration().setLong(PregelixJob.NUM_EDGES, options.numEdges);
job.getConfiguration().setLong(ShortestPathsVertex.SOURCE_ID, options.sourceId);
@@ -91,7 +125,6 @@
job.getConfiguration().setLong(ReachabilityVertex.DEST_ID, options.destId);
if (options.numIteration > 0)
job.getConfiguration().setLong(PageRankVertex.ITERATIONS, options.numIteration);
- return options;
}
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
index 7d824ea..90065c2 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
@@ -14,8 +14,13 @@
*/
package edu.uci.ics.pregelix.example.data;
+import java.io.DataInput;
+import java.io.DataInputStream;
+
+import org.apache.hadoop.io.WritableUtils;
+
import edu.uci.ics.pregelix.api.graph.NormalizedKeyComputer;
-import edu.uci.ics.pregelix.api.util.SerDeUtils;
+import edu.uci.ics.pregelix.api.util.ResetableByteArrayInputStream;
/**
* @author yingyib
@@ -26,34 +31,42 @@
private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
private static final int NEGATIVE_LONG_MASK = (0 << 30);
+ private ResetableByteArrayInputStream bis = new ResetableByteArrayInputStream();
+ private DataInput dis = new DataInputStream(bis);
+
@Override
public int getNormalizedKey(byte[] bytes, int start, int length) {
- long value = SerDeUtils.readVLong(bytes, start, length);
- int highValue = (int) (value >> 32);
- if (highValue > 0) {
- /**
- * larger than Integer.MAX
- */
- int highNmk = getKey(highValue);
- highNmk >>= 2;
- highNmk |= POSTIVE_LONG_MASK;
- return highNmk;
- } else if (highValue == 0) {
- /**
- * smaller than Integer.MAX but >=0
- */
- int lowNmk = (int) value;
- lowNmk >>= 2;
- lowNmk |= NON_NEGATIVE_INT_MASK;
- return lowNmk;
- } else {
- /**
- * less than 0; TODO: have not optimized for that
- */
- int highNmk = getKey(highValue);
- highNmk >>= 2;
- highNmk |= NEGATIVE_LONG_MASK;
- return highNmk;
+ try {
+ bis.setByteArray(bytes, start);
+ long value = WritableUtils.readVLong(dis);
+ int highValue = (int) (value >> 32);
+ if (highValue > 0) {
+ /**
+ * larger than Integer.MAX
+ */
+ int highNmk = getKey(highValue);
+ highNmk >>= 2;
+ highNmk |= POSTIVE_LONG_MASK;
+ return highNmk;
+ } else if (highValue == 0) {
+ /**
+ * smaller than Integer.MAX but >=0
+ */
+ int lowNmk = (int) value;
+ lowNmk >>= 2;
+ lowNmk |= NON_NEGATIVE_INT_MASK;
+ return lowNmk;
+ } else {
+ /**
+ * less than 0; TODO: have not optimized for that
+ */
+ int highNmk = getKey(highValue);
+ highNmk >>= 2;
+ highNmk |= NEGATIVE_LONG_MASK;
+ return highNmk;
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
}
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
index ec1109f..1c5f629 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
@@ -16,14 +16,14 @@
package edu.uci.ics.pregelix.example.io;
import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
+import java.io.DataInputStream;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
import edu.uci.ics.pregelix.api.io.WritableSizable;
-import edu.uci.ics.pregelix.api.util.SerDeUtils;
+import edu.uci.ics.pregelix.api.util.ResetableByteArrayInputStream;
/**
* A WritableComparable for longs in a variable-length format. Such values take
@@ -32,17 +32,7 @@
* @see org.apache.hadoop.io.WritableUtils#readVLong(DataInput)
*/
@SuppressWarnings("rawtypes")
-public class VLongWritable implements WritableComparable, WritableSizable {
- private static long ONE_BYTE_MAX = 2 ^ 7 - 1;
- private static long TWO_BYTE_MAX = 2 ^ 14 - 1;
- private static long THREE_BYTE_MAX = 2 ^ 21 - 1;
- private static long FOUR_BYTE_MAX = 2 ^ 28 - 1;
- private static long FIVE_BYTE_MAX = 2 ^ 35 - 1;;
- private static long SIX_BYTE_MAX = 2 ^ 42 - 1;;
- private static long SEVEN_BYTE_MAX = 2 ^ 49 - 1;;
- private static long EIGHT_BYTE_MAX = 2 ^ 54 - 1;;
-
- private long value;
+public class VLongWritable extends org.apache.hadoop.io.VLongWritable implements WritableSizable {
public VLongWritable() {
}
@@ -52,78 +42,46 @@
}
public int sizeInBytes() {
- if (value >= 0 && value <= ONE_BYTE_MAX) {
+ long i = get();
+ if (i >= -112 && i <= 127) {
return 1;
- } else if (value > ONE_BYTE_MAX && value <= TWO_BYTE_MAX) {
- return 2;
- } else if (value > TWO_BYTE_MAX && value <= THREE_BYTE_MAX) {
- return 3;
- } else if (value > THREE_BYTE_MAX && value <= FOUR_BYTE_MAX) {
- return 4;
- } else if (value > FOUR_BYTE_MAX && value <= FIVE_BYTE_MAX) {
- return 5;
- } else if (value > FIVE_BYTE_MAX && value <= SIX_BYTE_MAX) {
- return 6;
- } else if (value > SIX_BYTE_MAX && value <= SEVEN_BYTE_MAX) {
- return 7;
- } else if (value > SEVEN_BYTE_MAX && value <= EIGHT_BYTE_MAX) {
- return 8;
- } else {
- return 9;
}
- }
- /** Set the value of this LongWritable. */
- public void set(long value) {
- this.value = value;
- }
+ int len = -112;
+ if (i < 0) {
+ i ^= -1L; // take one's complement'
+ len = -120;
+ }
- /** Return the value of this LongWritable. */
- public long get() {
- return value;
- }
+ long tmp = i;
+ while (tmp != 0) {
+ tmp = tmp >> 8;
+ len--;
+ }
- public void readFields(DataInput in) throws IOException {
- value = SerDeUtils.readVLong(in);
- }
-
- public void write(DataOutput out) throws IOException {
- SerDeUtils.writeVLong(out, value);
- }
-
- /** Returns true iff <code>o</code> is a VLongWritable with the same value. */
- public boolean equals(Object o) {
- if (!(o instanceof VLongWritable))
- return false;
- VLongWritable other = (VLongWritable) o;
- return this.value == other.value;
- }
-
- public int hashCode() {
- return (int) value;
- }
-
- /** Compares two VLongWritables. */
- public int compareTo(Object o) {
- long thisValue = this.value;
- long thatValue = ((VLongWritable) o).value;
- return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
- }
-
- public String toString() {
- return Long.toString(value);
+ len = (len < -120) ? -(len + 120) : -(len + 112);
+ return len + 1;
}
/** A Comparator optimized for LongWritable. */
public static class Comparator extends WritableComparator {
+ private ResetableByteArrayInputStream bis = new ResetableByteArrayInputStream();
+ private DataInput dis = new DataInputStream(bis);
+
public Comparator() {
super(VLongWritable.class);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- long thisValue = SerDeUtils.readVLong(b1, s1, l1);
- long thatValue = SerDeUtils.readVLong(b2, s2, l2);
- return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
+ try {
+ bis.setByteArray(b1, s1);
+ long thisValue = WritableUtils.readVLong(dis);
+ bis.setByteArray(b2, s2);
+ long thatValue = WritableUtils.readVLong(dis);
+ return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
}
}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index 13cec61..f6857fe 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
import edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner;
import edu.uci.ics.pregelix.example.ConnectedComponentsVertex;
import edu.uci.ics.pregelix.example.ConnectedComponentsVertex.SimpleConnectedComponentsVertexOutputFormat;
@@ -76,6 +77,7 @@
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job.setCheckpointHook(ConservativeCheckpointHook.class);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -90,6 +92,7 @@
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
+ job.setCheckpointHook(ConservativeCheckpointHook.class);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -116,6 +119,7 @@
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job.setCheckpointHook(ConservativeCheckpointHook.class);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -126,6 +130,7 @@
job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setDynamicVertexValueSize(true);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
@@ -140,6 +145,7 @@
job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
job.setVertexPartitionerClass(DefaultVertexPartitioner.class);
+ job.setDynamicVertexValueSize(true);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
index 196b114..638011b 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
@@ -41,10 +41,13 @@
public void testVLong() throws Exception {
Random rand = new Random(System.currentTimeMillis());
MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+ msgList.add(new VLongWritable(Long.MAX_VALUE));
+ msgList.add(new VLongWritable(Long.MIN_VALUE));
+ msgList.add(new VLongWritable(-1));
for (int i = 0; i < 1000000; i++) {
msgList.add(new VLongWritable(Math.abs(rand.nextLong())));
}
- verifySizeEstimation(msgList);
+ verifyExactSizeEstimation(msgList);
}
@Test
@@ -96,7 +99,7 @@
}
verifySizeEstimation(msgList);
}
-
+
@Test
public void testNull() throws Exception {
MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
@@ -105,7 +108,7 @@
}
verifySizeEstimation(msgList);
}
-
+
@Test
public void testVInt() throws Exception {
Random rand = new Random(System.currentTimeMillis());
@@ -115,7 +118,7 @@
}
verifySizeEstimation(msgList);
}
-
+
@Test
public void testInt() throws Exception {
Random rand = new Random(System.currentTimeMillis());
@@ -148,4 +151,26 @@
}
}
+ private void verifyExactSizeEstimation(MsgList<WritableSizable> msgList) throws Exception {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutput dos = new DataOutputStream(bos);
+ int accumulatedSize = 5;
+ for (int i = 0; i < msgList.size(); i++) {
+ bos.reset();
+ WritableSizable value = msgList.get(i);
+ value.write(dos);
+ if (value.sizeInBytes() != bos.size()) {
+ throw new Exception(value + " estimated size (" + value.sizeInBytes()
+ + ") is smaller than the actual size" + bos.size());
+ }
+ accumulatedSize += value.sizeInBytes();
+ }
+ bos.reset();
+ msgList.write(dos);
+ if (accumulatedSize < bos.size()) {
+ throw new Exception("Estimated list size (" + accumulatedSize + ") is smaller than the actual size"
+ + bos.size());
+ }
+ }
+
}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java
new file mode 100644
index 0000000..5a2636a
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.test;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.ConnectedComponentsVertex;
+import edu.uci.ics.pregelix.example.ConnectedComponentsVertex.SimpleConnectedComponentsVertexOutputFormat;
+import edu.uci.ics.pregelix.example.FailureVertex;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+
+/**
+ * This test case tests the error message propagation.
+ *
+ * @author yingyib
+ */
+public class FailureVertexTest {
+
+ private static String INPUT_PATH = "data/webmapcomplex";
+ private static String OUTPUT_PATH = "actual/resultcomplex";
+
+ @Test
+ public void test() throws Exception {
+ TestCluster testCluster = new TestCluster();
+ try {
+ PregelixJob job = new PregelixJob(FailureVertex.class.getSimpleName());
+ job.setVertexClass(FailureVertex.class);
+ job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
+ job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setDynamicVertexValueSize(true);
+
+ FileInputFormat.setInputPaths(job, INPUT_PATH);
+ FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
+
+ Driver driver = new Driver(FailureVertex.class);
+ testCluster.setUp();
+ driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+ } catch (Exception e) {
+ Assert.assertTrue(e.toString().contains("This job is going to fail"));
+ } finally {
+ testCluster.tearDown();
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java
new file mode 100644
index 0000000..d2995f1
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.PageRankVertex;
+import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * @author yingyib
+ */
+public class JobConcatenationTest {
+
+ private static String INPUTPATH = "data/webmap";
+ private static String OUTPUTPAH = "actual/result";
+ private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal";
+
+ @Test
+ public void test() throws Exception {
+ TestCluster testCluster = new TestCluster();
+
+ try {
+ List<PregelixJob> jobs = new ArrayList<PregelixJob>();
+ PregelixJob job1 = new PregelixJob(PageRankVertex.class.getName());
+ job1.setVertexClass(PageRankVertex.class);
+ job1.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job1.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+ job1.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+ job1.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ FileInputFormat.setInputPaths(job1, INPUTPATH);
+ job1.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job1.setCheckpointHook(ConservativeCheckpointHook.class);
+
+ PregelixJob job2 = new PregelixJob(PageRankVertex.class.getName());
+ job2.setVertexClass(PageRankVertex.class);
+ job2.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job2.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+ job2.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+ job2.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ FileOutputFormat.setOutputPath(job2, new Path(OUTPUTPAH));
+ job2.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job2.setCheckpointHook(ConservativeCheckpointHook.class);
+
+ jobs.add(job1);
+ jobs.add(job2);
+
+ testCluster.setUp();
+ Driver driver = new Driver(PageRankVertex.class);
+ driver.runJobs(jobs, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+
+ TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ testCluster.tearDown();
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/TestCluster.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/TestCluster.java
new file mode 100644
index 0000000..d0cf654
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/TestCluster.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.test;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+
+@SuppressWarnings("deprecation")
+public class TestCluster {
+ private static final Logger LOGGER = Logger.getLogger(TestCluster.class.getName());
+
+ private static final String ACTUAL_RESULT_DIR = "actual";
+ private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+ private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
+ private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
+
+ private static final String DATA_PATH = "data/webmap/webmap_link.txt";
+ private static final String HDFS_PATH = "/webmap/";
+
+ private static final String DATA_PATH2 = "data/webmapcomplex/webmap_link.txt";
+ private static final String HDFS_PATH2 = "/webmapcomplex/";
+
+ private static final String DATA_PATH3 = "data/clique/clique.txt";
+ private static final String HDFS_PATH3 = "/clique/";
+
+ private static final String DATA_PATH4 = "data/clique2/clique.txt";
+ private static final String HDFS_PATH4 = "/clique2/";
+
+ private static final String DATA_PATH5 = "data/clique3/clique.txt";
+ private static final String HDFS_PATH5 = "/clique3/";
+
+ private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
+ private MiniDFSCluster dfsCluster;
+
+ private JobConf conf = new JobConf();
+ private int numberOfNC = 2;
+
+ public void setUp() throws Exception {
+ ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
+ ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
+ cleanupStores();
+ PregelixHyracksIntegrationUtil.init();
+ LOGGER.info("Hyracks mini-cluster started");
+ FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+ FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+ startHDFS();
+ }
+
+ private void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
+
+ private void startHDFS() throws IOException {
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+ FileSystem dfs = FileSystem.get(conf);
+ Path src = new Path(DATA_PATH);
+ Path dest = new Path(HDFS_PATH);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
+
+ src = new Path(DATA_PATH2);
+ dest = new Path(HDFS_PATH2);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
+
+ src = new Path(DATA_PATH3);
+ dest = new Path(HDFS_PATH3);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
+
+ src = new Path(DATA_PATH4);
+ dest = new Path(HDFS_PATH4);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
+
+ src = new Path(DATA_PATH5);
+ dest = new Path(HDFS_PATH5);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
+
+ DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
+ conf.writeXml(confOutput);
+ confOutput.flush();
+ confOutput.close();
+ }
+
+ /**
+ * cleanup hdfs cluster
+ */
+ private void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ }
+
+ public void tearDown() throws Exception {
+ PregelixHyracksIntegrationUtil.deinit();
+ LOGGER.info("Hyracks mini-cluster shut down");
+ cleanupHDFS();
+ }
+
+ protected static List<String> getFileList(String ignorePath) throws FileNotFoundException, IOException {
+ BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
+ String s = null;
+ List<String> ignores = new ArrayList<String>();
+ while ((s = reader.readLine()) != null) {
+ ignores.add(s);
+ }
+ reader.close();
+ return ignores;
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
index decbde8..df72d9b 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
@@ -127,6 +127,7 @@
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
<property><name>topology.script.number.args</name><value>100</value></property>
<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
index cca66bb..b0bf024 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
@@ -128,6 +128,7 @@
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
<property><name>topology.script.number.args</name><value>100</value></property>
<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
index 9e1e0b0..b50b02a 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
@@ -84,6 +84,7 @@
<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
<property><name>mapred.queue.names</name><value>default</value></property>
<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>pregelix.checkpointHook</name><value>edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook</value></property>
<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
<property><name>mapred.job.tracker</name><value>local</value></property>
<property><name>io.skip.checksum.errors</name><value>false</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
index c4366d7..217fbba 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
@@ -84,6 +84,7 @@
<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
<property><name>mapred.queue.names</name><value>default</value></property>
<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>pregelix.checkpointHook</name><value>edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook</value></property>
<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
<property><name>mapred.job.tracker</name><value>local</value></property>
<property><name>io.skip.checksum.errors</name><value>false</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
index ac0d508..636b055 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
@@ -84,6 +84,7 @@
<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
<property><name>mapred.queue.names</name><value>default</value></property>
<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>pregelix.checkpointHook</name><value>edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook</value></property>
<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
<property><name>mapred.job.tracker</name><value>local</value></property>
<property><name>io.skip.checksum.errors</name><value>false</value></property>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index d46457c..ab564fa 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -205,7 +205,11 @@
*/
if (terminate && (!vertex.isHalted() || vertex.hasMessage() || vertex.createdNewLiveVertex()))
terminate = false;
- aggregator.step(vertex);
+
+ if (msgContentList.segmentEnd()) {
+ /** the if condition makes sure aggregate only calls once per-vertex */
+ aggregator.step(vertex);
+ }
}
@Override
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index eba75c9..b4e1dd8 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -179,10 +179,7 @@
vertex.setOutputWriters(writers);
vertex.setOutputAppenders(appenders);
vertex.setOutputTupleBuilders(tbs);
-
- if (!msgIterator.hasNext() && vertex.isHalted()) {
- return;
- }
+
if (vertex.isHalted()) {
vertex.activate();
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
index 3d52a45..acd766e 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
@@ -180,7 +180,7 @@
throw new HyracksDataException(e);
}
}
- return size;
+ return size * 2;
}
private void emitResultTuple(IFrameTupleAccessor accessor, int tIndex, AggregateState state)
@@ -224,4 +224,4 @@
};
}
-}
\ No newline at end of file
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/MsgListNullWriterFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/MsgListNullWriterFactory.java
index 4eaa21c..b7689de 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/MsgListNullWriterFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/MsgListNullWriterFactory.java
@@ -32,6 +32,7 @@
@Override
public void writeNull(DataOutput out) throws HyracksDataException {
try {
+ out.writeByte(3); //start|end
out.writeInt(0);
} catch (IOException e) {
throw new HyracksDataException(e);
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
index 99bcac5..cd2012a 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
@@ -26,11 +26,11 @@
public class PreSuperStepRuntimeHookFactory implements IRuntimeHookFactory {
private static final long serialVersionUID = 1L;
private final IConfigurationFactory confFactory;
- private final String giraphJobId;
+ private final String jobId;
- public PreSuperStepRuntimeHookFactory(String giraphJobId, IConfigurationFactory confFactory) {
+ public PreSuperStepRuntimeHookFactory(String jobId, IConfigurationFactory confFactory) {
this.confFactory = confFactory;
- this.giraphJobId = giraphJobId;
+ this.jobId = jobId;
}
@Override
@@ -40,7 +40,7 @@
@Override
public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
Configuration conf = confFactory.createConfiguration(ctx);
- IterationUtils.setProperties(giraphJobId, ctx, conf);
+ IterationUtils.setProperties(jobId, ctx, conf);
}
};