Merge branch 'master' into genomix/fullstack_genomix
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index b8bdf3e..ba51eff 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -27,7 +27,6 @@
INNERJOIN,
LEFTOUTERJOIN,
LIMIT,
- DIE,
NESTEDTUPLESOURCE,
ORDER,
PROJECT,
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 0efb5ff..c3a59ea 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -33,7 +33,6 @@
SPLIT,
STABLE_SORT,
STREAM_LIMIT,
- STREAM_DIE,
STREAM_SELECT,
STREAM_PROJECT,
STRING_STREAM_SCRIPT,
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
index 2f53f9b..0c105d0 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
@@ -20,10 +20,11 @@
// private ArrayList<AggregateFunctionCallExpression> expressions;
// TODO type safe list of expressions
private List<Mutable<ILogicalExpression>> mergeExpressions;
- private LogicalVariable partitioningVariable;
+ private boolean global;
public AggregateOperator(List<LogicalVariable> variables, List<Mutable<ILogicalExpression>> expressions) {
super(variables, expressions);
+ global = true;
}
@Override
@@ -69,12 +70,12 @@
return mergeExpressions;
}
- public void setPartitioningVariable(LogicalVariable partitioningVariable) {
- this.partitioningVariable = partitioningVariable;
+ public void setGlobal(boolean global) {
+ this.global = global;
}
- public LogicalVariable getPartitioningVariable() {
- return partitioningVariable;
+ public boolean isGlobal() {
+ return global;
}
@Override
@@ -90,4 +91,5 @@
}
return env;
}
+
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DieOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DieOperator.java
deleted file mode 100644
index 03bfcba..0000000
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DieOperator.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
-
-import java.util.ArrayList;
-
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
-import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
-import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
-import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
-
-public class DieOperator extends AbstractLogicalOperator {
-
- private final Mutable<ILogicalExpression> afterObjects; // mandatory
-
- public DieOperator(ILogicalExpression maxObjectsExpr) {
- this.afterObjects = new MutableObject<ILogicalExpression>(maxObjectsExpr);
- }
-
- public Mutable<ILogicalExpression> getAfterObjects() {
- return afterObjects;
- }
-
- @Override
- public void recomputeSchema() {
- schema = new ArrayList<LogicalVariable>();
- schema.addAll(inputs.get(0).getValue().getSchema());
- }
-
- @Override
- public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
- return visitor.visitDieOperator(this, arg);
- }
-
- @Override
- public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
- boolean b = false;
- if (visitor.transform(afterObjects)) {
- b = true;
- }
- return b;
- }
-
- @Override
- public LogicalOperatorTag getOperatorTag() {
- return LogicalOperatorTag.DIE;
- }
-
- @Override
- public VariablePropagationPolicy getVariablePropagationPolicy() {
- return VariablePropagationPolicy.ALL;
- }
-
- @Override
- public boolean isMap() {
- return true;
- }
-
- @Override
- public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
- return createPropagatingAllInputsTypeEnvironment(ctx);
- }
-
- @Override
- public boolean requiresVariableReferenceExpressions() {
- return false;
- }
-}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index 1b4be1e..527a11c 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -45,7 +45,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -357,12 +356,6 @@
}
@Override
- public Void visitDieOperator(DieOperator op, IOptimizationContext ctx) throws AlgebricksException {
- propagateFDsAndEquivClasses(op, ctx);
- return null;
- }
-
- @Override
public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, IOptimizationContext ctx)
throws AlgebricksException {
AbstractLogicalOperator op1 = (AbstractLogicalOperator) op.getDataSourceReference().getValue();
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index ac6d887..745b8c6 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -36,7 +36,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -180,16 +179,6 @@
}
@Override
- public Boolean visitDieOperator(DieOperator op, ILogicalOperator arg) throws AlgebricksException {
- AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
- if (aop.getOperatorTag() != LogicalOperatorTag.DIE)
- return Boolean.FALSE;
- DieOperator dieOpArg = (DieOperator) copyAndSubstituteVar(op, arg);
- boolean isomorphic = op.getAfterObjects().getValue().equals(dieOpArg.getAfterObjects().getValue());
- return isomorphic;
- }
-
- @Override
public Boolean visitInnerJoinOperator(InnerJoinOperator op, ILogicalOperator arg) throws AlgebricksException {
AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
if (aop.getOperatorTag() != LogicalOperatorTag.INNERJOIN)
@@ -643,11 +632,6 @@
}
@Override
- public ILogicalOperator visitDieOperator(DieOperator op, Void arg) throws AlgebricksException {
- return new DieOperator(deepCopyExpressionRef(op.getAfterObjects()).getValue());
- }
-
- @Override
public ILogicalOperator visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
return new InnerJoinOperator(deepCopyExpressionRef(op.getCondition()), op.getInputs().get(0), op
.getInputs().get(1));
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index b9544c7..ac8cabe 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -35,7 +35,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -109,12 +108,6 @@
}
@Override
- public Void visitDieOperator(DieOperator op, ILogicalOperator arg) throws AlgebricksException {
- mapVariablesStandard(op, arg);
- return null;
- }
-
- @Override
public Void visitInnerJoinOperator(InnerJoinOperator op, ILogicalOperator arg) throws AlgebricksException {
mapVariablesStandard(op, arg);
return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 8f1d686..e4fd78e 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -27,7 +27,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -142,12 +141,6 @@
}
@Override
- public Void visitDieOperator(DieOperator op, IOptimizationContext arg) throws AlgebricksException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, IOptimizationContext arg)
throws AlgebricksException {
// TODO Auto-generated method stub
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 31adcba..91ffe4b 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -30,7 +30,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -136,11 +135,6 @@
}
@Override
- public Void visitDieOperator(DieOperator op, Void arg) throws AlgebricksException {
- return null;
- }
-
- @Override
public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) throws AlgebricksException {
return null;
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index cd0cee3..0de9652 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -29,7 +29,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -152,12 +151,6 @@
}
@Override
- public Void visitDieOperator(DieOperator op, Void arg) throws AlgebricksException {
- standardLayout(op);
- return null;
- }
-
- @Override
public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) throws AlgebricksException {
VariableUtilities.getLiveVariables(op.getSourceOperator(), schemaVariables);
return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 69fb3f8..bb9b600 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -31,7 +31,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -185,14 +184,6 @@
}
@Override
- public Void visitDieOperator(DieOperator op, Pair<LogicalVariable, LogicalVariable> pair)
- throws AlgebricksException {
- op.getAfterObjects().getValue().substituteVar(pair.first, pair.second);
- substVarTypes(op, pair);
- return null;
- }
-
- @Override
public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Pair<LogicalVariable, LogicalVariable> pair)
throws AlgebricksException {
return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 5361a19..42f7e20 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -30,7 +30,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -78,9 +77,6 @@
for (Mutable<ILogicalExpression> exprRef : op.getExpressions()) {
exprRef.getValue().getUsedVariables(usedVariables);
}
- if (op.getPartitioningVariable() != null) {
- usedVariables.add(op.getPartitioningVariable());
- }
return null;
}
@@ -198,12 +194,6 @@
}
@Override
- public Void visitDieOperator(DieOperator op, Void arg) {
- op.getAfterObjects().getValue().getUsedVariables(usedVariables);
- return null;
- }
-
- @Override
public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) {
// does not use any variable
return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
index 81dc2c2..f47c2ec 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
@@ -16,12 +16,10 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.Set;
import org.apache.commons.lang3.mutable.Mutable;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -31,15 +29,14 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
@@ -58,25 +55,29 @@
@Override
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
- IPhysicalPropertiesVector childProps = op2.getDeliveredPhysicalProperties();
- deliveredProperties = new StructuralPropertiesVector(childProps.getPartitioningProperty(),
- new ArrayList<ILocalStructuralProperty>(0));
+ AggregateOperator aggOp = (AggregateOperator) op;
+ ILogicalOperator op2 = op.getInputs().get(0).getValue();
+ if (aggOp.getExecutionMode() != AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
+ deliveredProperties = new StructuralPropertiesVector(op2.getDeliveredPhysicalProperties()
+ .getPartitioningProperty(), new ArrayList<ILocalStructuralProperty>());
+ } else {
+ deliveredProperties = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED,
+ new ArrayList<ILocalStructuralProperty>());
+ }
}
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
IPhysicalPropertiesVector reqdByParent) {
AggregateOperator aggOp = (AggregateOperator) op;
- if (aggOp.getExecutionMode() == ExecutionMode.PARTITIONED && aggOp.getPartitioningVariable() != null) {
- StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
- Set<LogicalVariable> partitioningVariables = new ListSet<LogicalVariable>();
- partitioningVariables.add(aggOp.getPartitioningVariable());
- pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(partitioningVariables, null), null);
+ StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+ if (aggOp.isGlobal() && aggOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
+ pv[0] = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, null);
return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
} else {
return emptyUnaryRequirements();
}
+
}
@Override
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
index 302d4d2..9a96319 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
@@ -100,10 +100,9 @@
context, columns);
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getResultHandleRuntime(
- resultOp.getDataSink(), columns, pf, inputDesc, false, spec);
+ resultOp.getDataSink(), columns, pf, inputDesc, true, spec);
builder.contributeHyracksOperator(resultOp, runtimeAndConstraints.first);
- builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
ILogicalOperator src = resultOp.getInputs().get(0).getValue();
builder.contributeGraphEdge(src, 0, resultOp, 0);
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamDiePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamDiePOperator.java
deleted file mode 100644
index 73b9b79..0000000
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamDiePOperator.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.std.StreamDieRuntimeFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-
-public class StreamDiePOperator extends AbstractPhysicalOperator {
-
- public StreamDiePOperator() {
- }
-
- @Override
- public PhysicalOperatorTag getOperatorTag() {
- return PhysicalOperatorTag.STREAM_DIE;
- }
-
- @Override
- public boolean isMicroOperator() {
- return true;
- }
-
- @Override
- public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- ILogicalOperator op2 = op.getInputs().get(0).getValue();
- deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
- }
-
- @Override
- public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
- StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
- pv[0] = (StructuralPropertiesVector) reqdByParent;
- return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
- }
-
- @Override
- public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
- IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
- throws AlgebricksException {
- DieOperator die = (DieOperator) op;
- IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
- IVariableTypeEnvironment env = context.getTypeEnvironment(op);
- IScalarEvaluatorFactory afterObjectsFact = expressionRuntimeProvider.createEvaluatorFactory(die
- .getAfterObjects().getValue(), env, inputSchemas, context);
- RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
- StreamDieRuntimeFactory runtime = new StreamDieRuntimeFactory(afterObjectsFact, null,
- context.getBinaryIntegerInspectorFactory());
- builder.contributeMicroOperator(die, runtime, recDesc);
- ILogicalOperator src = die.getInputs().get(0).getValue();
- builder.contributeGraphEdge(src, 0, die, 0);
- }
-
-}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
index 11e24d7..531b300 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
@@ -22,6 +22,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
@@ -37,10 +38,8 @@
public class StreamLimitPOperator extends AbstractPhysicalOperator {
- private boolean global;
+ public StreamLimitPOperator() {
- public StreamLimitPOperator(boolean global) {
- this.global = global;
}
@Override
@@ -55,14 +54,22 @@
@Override
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ AbstractLogicalOperator limitOp = (AbstractLogicalOperator) op;
ILogicalOperator op2 = op.getInputs().get(0).getValue();
- deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+ if (limitOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
+ //partitioning property: unpartitioned; local property: whatever from the child
+ deliveredProperties = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, op2
+ .getDeliveredPhysicalProperties().getLocalProperties());
+ } else {
+ deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+ }
}
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
IPhysicalPropertiesVector reqdByParent) {
- if (global) {
+ AbstractLogicalOperator limitOp = (AbstractLogicalOperator) op;
+ if (limitOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
pv[0] = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, null);
return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
@@ -83,7 +90,8 @@
ILogicalExpression offsetExpr = limit.getOffset().getValue();
IScalarEvaluatorFactory offsetFact = (offsetExpr == null) ? null : expressionRuntimeProvider
.createEvaluatorFactory(offsetExpr, env, inputSchemas, context);
- RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+ RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
+ context);
StreamLimitRuntimeFactory runtime = new StreamLimitRuntimeFactory(maxObjectsFact, offsetFact, null,
context.getBinaryIntegerInspectorFactory());
builder.contributeMicroOperator(limit, runtime, recDesc);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
index 54c0505..7861dc0 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
@@ -31,6 +31,8 @@
public class StreamProjectPOperator extends AbstractPropagatePropertiesForUsedVariablesPOperator {
+ private boolean flushFramesRapidly;
+
@Override
public PhysicalOperatorTag getOperatorTag() {
return PhysicalOperatorTag.STREAM_PROJECT;
@@ -61,8 +63,9 @@
}
projectionList[i++] = pos;
}
- StreamProjectRuntimeFactory runtime = new StreamProjectRuntimeFactory(projectionList);
- RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+ StreamProjectRuntimeFactory runtime = new StreamProjectRuntimeFactory(projectionList, flushFramesRapidly);
+ RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
+ context);
builder.contributeMicroOperator(project, runtime, recDesc);
ILogicalOperator src = project.getInputs().get(0).getValue();
builder.contributeGraphEdge(src, 0, project, 0);
@@ -74,4 +77,8 @@
computeDeliveredPropertiesForUsedVariables(p, p.getVariables());
}
+ public void setRapidFrameFlush(boolean flushFramesRapidly) {
+ this.flushFramesRapidly = flushFramesRapidly;
+ }
+
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index fc0c433..4b7597c 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -28,7 +28,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -256,13 +255,6 @@
}
@Override
- public String visitDieOperator(DieOperator op, Integer indent) {
- StringBuilder buffer = new StringBuilder();
- addIndent(buffer, indent).append("die after " + op.getAfterObjects().getValue());
- return buffer.toString();
- }
-
- @Override
public String visitExchangeOperator(ExchangeOperator op, Integer indent) {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("exchange ");
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionReader.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/properties/ResultSetDomain.java
similarity index 68%
rename from hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionReader.java
rename to algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/properties/ResultSetDomain.java
index 8f5ed64..ca15346 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionReader.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/properties/ResultSetDomain.java
@@ -3,19 +3,25 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.api.dataset;
+package edu.uci.ics.hyracks.algebricks.core.algebra.properties;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+public class ResultSetDomain implements INodeDomain {
+ @Override
+ public boolean sameAs(INodeDomain domain) {
+ return true;
+ }
-public interface IDatasetPartitionReader {
- public void writeTo(IFrameWriter writer);
+ @Override
+ public Integer cardinality() {
+ return 0;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 47a979c..35b36dc 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -27,6 +27,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
@@ -96,14 +97,21 @@
break;
}
default: {
+ boolean forceUnpartitioned = false;
if (op.getOperatorTag() == LogicalOperatorTag.LIMIT) {
LimitOperator opLim = (LimitOperator) op;
if (opLim.isTopmostLimitOp()) {
- if (opLim.getExecutionMode() != AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
- opLim.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
- change = true;
- }
- break;
+ opLim.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+ change = true;
+ forceUnpartitioned = true;
+ }
+ }
+ if (op.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+ AggregateOperator aggOp = (AggregateOperator) op;
+ if (aggOp.isGlobal()) {
+ op.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+ change = true;
+ forceUnpartitioned = true;
}
}
@@ -112,6 +120,8 @@
AbstractLogicalOperator inputOp = (AbstractLogicalOperator) i.getValue();
switch (inputOp.getExecutionMode()) {
case PARTITIONED: {
+ if (forceUnpartitioned)
+ break;
op.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
change = true;
exit = true;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 23dac2a..32b049e 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -18,7 +18,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -106,5 +105,4 @@
public R visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, T tag) throws AlgebricksException;
- public R visitDieOperator(DieOperator op, T arg) throws AlgebricksException;
}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
index 08271c1..013ddda 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
@@ -16,12 +16,10 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -110,16 +108,10 @@
} else {
// The local aggregate operator is fed by the input of the original aggregate operator.
pushedAgg.getInputs().add(new MutableObject<ILogicalOperator>(initAgg.getInputs().get(0).getValue()));
- // Set the partitioning variable in the local agg to ensure it is not projected away.
- context.computeAndSetTypeEnvironmentForOperator(pushedAgg);
- LogicalVariable trueVar = context.newVar();
// Reintroduce assign op for the global agg partitioning var.
- AssignOperator trueAssignOp = new AssignOperator(trueVar, new MutableObject<ILogicalExpression>(
- ConstantExpression.TRUE));
- trueAssignOp.getInputs().add(new MutableObject<ILogicalOperator>(pushedAgg));
- context.computeAndSetTypeEnvironmentForOperator(trueAssignOp);
- initAgg.setPartitioningVariable(trueVar);
- initAgg.getInputs().get(0).setValue(trueAssignOp);
+ initAgg.getInputs().get(0).setValue(pushedAgg);
+ pushedAgg.setGlobal(false);
+ context.computeAndSetTypeEnvironmentForOperator(pushedAgg);
}
return new Pair<Boolean, Mutable<ILogicalOperator>>(true, new MutableObject<ILogicalOperator>(pushedAgg));
} else {
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
index 3260ca0..7bc150a 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
@@ -51,6 +51,7 @@
orderBreakingOps.add(LogicalOperatorTag.INNERJOIN);
orderBreakingOps.add(LogicalOperatorTag.LEFTOUTERJOIN);
orderBreakingOps.add(LogicalOperatorTag.UNIONALL);
+ orderBreakingOps.add(LogicalOperatorTag.AGGREGATE);
}
@Override
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java
index c3d935c..c75db57 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java
@@ -29,7 +29,7 @@
return false;
}
AggregateOperator aggOp = (AggregateOperator) op;
- if (aggOp.getExecutionMode() != ExecutionMode.PARTITIONED || aggOp.getPartitioningVariable() == null) {
+ if (!aggOp.isGlobal() || aggOp.getExecutionMode() == ExecutionMode.LOCAL) {
return false;
}
Map<AggregateFunctionCallExpression, SimilarAggregatesInfo> toReplaceMap = new HashMap<AggregateFunctionCallExpression, SimilarAggregatesInfo>();
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushDieUpRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushDieUpRule.java
deleted file mode 100644
index c4dd78d..0000000
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushDieUpRule.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package edu.uci.ics.hyracks.algebricks.rewriter.rules;
-
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-
-public class PushDieUpRule implements IAlgebraicRewriteRule {
-
- @Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
- return false;
- }
-
- @Override
- public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
- throws AlgebricksException {
- AbstractLogicalOperator op0 = (AbstractLogicalOperator) opRef.getValue();
- if (op0.getInputs().size() == 0)
- return false;
- AbstractLogicalOperator op1 = (AbstractLogicalOperator) op0.getInputs().get(0).getValue();
-
- if (op1.getInputs().size() == 0)
- return false;
- LogicalOperatorTag tag = op1.getOperatorTag();
- if (tag == LogicalOperatorTag.SINK || tag == LogicalOperatorTag.WRITE
- || tag == LogicalOperatorTag.INSERT_DELETE || tag == LogicalOperatorTag.WRITE_RESULT)
- return false;
-
- AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
- if (op2.getOperatorTag() == LogicalOperatorTag.DIE) {
- op0.getInputs().get(0).setValue(op2);
- op1.getInputs().clear();
- for (Mutable<ILogicalOperator> ref : op2.getInputs())
- op1.getInputs().add(ref);
- op2.getInputs().clear();
- op2.getInputs().add(new MutableObject<ILogicalOperator>(op1));
-
- context.computeAndSetTypeEnvironmentForOperator(op0);
- context.computeAndSetTypeEnvironmentForOperator(op1);
- context.computeAndSetTypeEnvironmentForOperator(op2);
- return true;
- } else {
- return false;
- }
- }
-}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java
index acfdbd3..e5f60bc 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java
@@ -106,7 +106,7 @@
opLim.getMaxObjects(), opLim.getOffset());
clone2 = new LimitOperator(maxPlusOffset, false);
}
- clone2.setPhysicalOperator(new StreamLimitPOperator(false));
+ clone2.setPhysicalOperator(new StreamLimitPOperator());
clone2.getInputs().add(new MutableObject<ILogicalOperator>(op2));
clone2.setExecutionMode(op2.getExecutionMode());
clone2.recomputeSchema();
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 60a4fbb..9c8ad46 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -21,7 +21,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
@@ -53,7 +52,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SinkPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SinkWritePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamDiePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamLimitPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamProjectPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamSelectPOperator;
@@ -173,9 +171,7 @@
break;
}
case LIMIT: {
- LimitOperator opLim = (LimitOperator) op;
- op.setPhysicalOperator(new StreamLimitPOperator(opLim.isTopmostLimitOp()
- && opLim.getExecutionMode() == ExecutionMode.PARTITIONED));
+ op.setPhysicalOperator(new StreamLimitPOperator());
break;
}
case NESTEDTUPLESOURCE: {
@@ -280,10 +276,6 @@
op.setPhysicalOperator(new SinkPOperator());
break;
}
- case DIE: {
- op.setPhysicalOperator(new StreamDiePOperator());
- break;
- }
}
}
if (op.hasNestedPlans()) {
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index cd9931b..79e964b 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -54,6 +54,11 @@
}
protected void appendProjectionToFrame(int tIndex, int[] projectionList) throws HyracksDataException {
+ appendProjectionToFrame(tIndex, projectionList, false);
+ }
+
+ protected void appendProjectionToFrame(int tIndex, int[] projectionList, boolean flushFrame)
+ throws HyracksDataException {
if (!appender.appendProjection(tAccess, tIndex, projectionList)) {
FrameUtils.flushFrame(frame, writer);
appender.reset(frame, true);
@@ -61,6 +66,11 @@
throw new IllegalStateException(
"Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendProjectionToFrame).");
}
+ return;
+ }
+ if (flushFrame) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
}
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java
deleted file mode 100644
index 508ce5d..0000000
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java
+++ /dev/null
@@ -1,100 +0,0 @@
-package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspector;
-import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.data.std.api.IPointable;
-import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
-
-public class StreamDieRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
-
- private static final long serialVersionUID = 1L;
-
- private IScalarEvaluatorFactory aftterObjectsEvalFactory;
- private IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
-
- public StreamDieRuntimeFactory(IScalarEvaluatorFactory maxObjectsEvalFactory, int[] projectionList,
- IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory) {
- super(projectionList);
- this.aftterObjectsEvalFactory = maxObjectsEvalFactory;
- this.binaryIntegerInspectorFactory = binaryIntegerInspectorFactory;
- }
-
- @Override
- public String toString() {
- String s = "stream-die " + aftterObjectsEvalFactory.toString();
- return s;
- }
-
- @Override
- public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) {
- final IBinaryIntegerInspector bii = binaryIntegerInspectorFactory.createBinaryIntegerInspector(ctx);
- return new AbstractOneInputOneOutputOneFramePushRuntime() {
- private IPointable p = VoidPointable.FACTORY.createPointable();
- private IScalarEvaluator evalAfterObjects;
- private int toWrite = -1;
-
- @Override
- public void open() throws HyracksDataException {
- if (evalAfterObjects == null) {
- initAccessAppendRef(ctx);
- try {
- evalAfterObjects = aftterObjectsEvalFactory.createScalarEvaluator(ctx);
- } catch (AlgebricksException ae) {
- throw new HyracksDataException(ae);
- }
- }
- writer.open();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- tAccess.reset(buffer);
- int nTuple = tAccess.getTupleCount();
- if (toWrite < 0) {
- toWrite = evaluateInteger(evalAfterObjects, 0);
- }
- for (int t = 0; t < nTuple; t++) {
- if (toWrite > 0) {
- toWrite--;
- if (projectionList != null) {
- appendProjectionToFrame(t, projectionList);
- } else {
- appendTupleToFrame(t);
- }
- } else {
- throw new HyracksDataException("injected failure");
-// System.out.println("Injected Kill-JVM");
-// System.exit(-1);
- }
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- super.close();
- }
-
- private int evaluateInteger(IScalarEvaluator eval, int tIdx) throws HyracksDataException {
- tRef.reset(tAccess, tIdx);
- try {
- eval.evaluate(tRef, p);
- } catch (AlgebricksException ae) {
- throw new HyracksDataException(ae);
- }
- int lim = bii.getIntegerValue(p.getByteArray(), p.getStartOffset(), p.getLength());
- return lim;
- }
-
- };
- }
-
-}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
index 455ad8e..0e6fc15 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -26,9 +26,16 @@
public class StreamProjectRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
private static final long serialVersionUID = 1L;
+ private final boolean flushFramesRapidly;
+
+ public StreamProjectRuntimeFactory(int[] projectionList, boolean flushFramesRapidly) {
+ super(projectionList);
+ this.flushFramesRapidly = flushFramesRapidly;
+ }
public StreamProjectRuntimeFactory(int[] projectionList) {
super(projectionList);
+ this.flushFramesRapidly = false;
}
@Override
@@ -57,7 +64,16 @@
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
tAccess.reset(buffer);
int nTuple = tAccess.getTupleCount();
- for (int t = 0; t < nTuple; t++) {
+
+ int t = 0;
+ for (; t < nTuple - 1; t++) {
+ appendProjectionToFrame(t, projectionList);
+ }
+ if (flushFramesRapidly) {
+ // Whenever all the tuples in the incoming frame have been consumed, the project operator
+ // will push its frame to the next operator; i.e., it won't wait until the frame gets full.
+ appendProjectionToFrame(t, projectionList, true);
+ } else {
appendProjectionToFrame(t, projectionList);
}
diff --git a/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/logging.properties b/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/logging.properties
index 1cc34e1..f42e321 100644
--- a/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/logging.properties
+++ b/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/logging.properties
@@ -60,6 +60,5 @@
# For example, set the com.xyz.foo logger to only log SEVERE
# messages:
-edu.uci.ics.asterix.level = WARNING
-edu.uci.ics.algebricks.level = WARNING
+#edu.uci.ics.hyracks.algebricks.level = FINEST
edu.uci.ics.hyracks.level = WARNING
diff --git a/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java b/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java
index 7e4e271..a049f15 100644
--- a/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java
+++ b/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java
@@ -40,9 +40,7 @@
public final static LinkedList<IAlgebraicRewriteRule> NORMALIZATION = new LinkedList<IAlgebraicRewriteRule>();
static {
NORMALIZATION.add(new EliminateSubplanRule());
- NORMALIZATION.add(new IntroduceAggregateCombinerRule());
NORMALIZATION.add(new BreakSelectIntoConjunctsRule());
- NORMALIZATION.add(new IntroduceAggregateCombinerRule());
NORMALIZATION.add(new PushSelectIntoJoinRule());
NORMALIZATION.add(new ExtractGbyExpressionsRule());
NORMALIZATION.add(new RemoveRedundantSelectRule());
@@ -84,6 +82,7 @@
CONSOLIDATION.add(new IntroduceEarlyProjectRule());
CONSOLIDATION.add(new ConsolidateAssignsRule());
CONSOLIDATION.add(new IntroduceGroupByCombinerRule());
+ CONSOLIDATION.add(new IntroduceAggregateCombinerRule());
CONSOLIDATION.add(new RemoveUnusedAssignAndAggregateRule());
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
index ae38c7f..dea077d 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -29,7 +29,10 @@
public void reportPartitionFailure(JobId jobId, ResultSetId resultSetId, int partition) throws HyracksException;
- public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter noc) throws HyracksException;
+ public void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition, IFrameWriter noc)
+ throws HyracksException;
+
+ public void abortReader(JobId jobId);
public IWorkspaceFileFactory getFileFactory();
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionWriter.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionWriter.java
deleted file mode 100644
index 42dc157..0000000
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionWriter.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.dataset;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public interface IDatasetPartitionWriter extends IFrameWriter {
- public Page returnPage() throws HyracksDataException;
-}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
index d49d5cd..f29356b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
@@ -14,7 +14,7 @@
*/
package edu.uci.ics.hyracks.api.dataset;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
import edu.uci.ics.hyracks.api.job.JobId;
public interface IHyracksDatasetDirectoryServiceConnection {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
index ba21a84..3fe4ada 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
@@ -14,7 +14,7 @@
*/
package edu.uci.ics.hyracks.api.dataset;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
import edu.uci.ics.hyracks.api.job.JobId;
public interface IHyracksDatasetDirectoryServiceInterface {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetReader.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetReader.java
index b928a49..c397a94 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetReader.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetReader.java
@@ -16,7 +16,7 @@
import java.nio.ByteBuffer;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public interface IHyracksDatasetReader {
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
index 095fd7d..c882448 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
@@ -17,7 +17,7 @@
import java.net.InetSocketAddress;
import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
index 47cdf97..9c3b918 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
@@ -16,7 +16,7 @@
import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.job.JobId;
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
index 6419983..33e1b01 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
@@ -26,7 +26,7 @@
import edu.uci.ics.hyracks.api.channels.IInputChannel;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
import edu.uci.ics.hyracks.api.dataset.IDatasetInputChannelMonitor;
import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetReader;
@@ -101,7 +101,7 @@
knownRecords);
lastReadPartition = 0;
resultChannel = new DatasetNetworkInputChannel(netManager,
- getSocketAddress(knownRecords[lastReadPartition]), jobId, lastReadPartition,
+ getSocketAddress(knownRecords[lastReadPartition]), jobId, resultSetId, lastReadPartition,
NUM_READ_BUFFERS);
lastMonitor = getMonitor(lastReadPartition);
resultChannel.open(datasetClientCtx);
@@ -118,7 +118,7 @@
while (readSize <= 0 && !(isLastPartitionReadComplete())) {
synchronized (lastMonitor) {
- while (lastMonitor.getNFramesAvailable() <= 0 && !lastMonitor.eosReached()) {
+ while (lastMonitor.getNFramesAvailable() <= 0 && !lastMonitor.eosReached() && !lastMonitor.failed()) {
try {
lastMonitor.wait();
} catch (InterruptedException e) {
@@ -127,6 +127,9 @@
}
}
+ if (lastMonitor.failed()) {
+ throw new HyracksDataException("Job Failed.");
+ }
if (isPartitionReadComplete(lastMonitor)) {
knownRecords[lastReadPartition].readEOS();
if ((lastReadPartition == knownRecords.length - 1)) {
@@ -135,23 +138,17 @@
try {
lastReadPartition++;
while (knownRecords[lastReadPartition] == null) {
- try {
- knownRecords = datasetDirectoryServiceConnection.getDatasetResultLocations(jobId,
- resultSetId, knownRecords);
- } catch (Exception e) {
- // Do nothing here.
- }
+ knownRecords = datasetDirectoryServiceConnection.getDatasetResultLocations(jobId,
+ resultSetId, knownRecords);
}
resultChannel = new DatasetNetworkInputChannel(netManager,
- getSocketAddress(knownRecords[lastReadPartition]), jobId, lastReadPartition,
- NUM_READ_BUFFERS);
+ getSocketAddress(knownRecords[lastReadPartition]), jobId, resultSetId,
+ lastReadPartition, NUM_READ_BUFFERS);
lastMonitor = getMonitor(lastReadPartition);
resultChannel.open(datasetClientCtx);
resultChannel.registerMonitor(lastMonitor);
- } catch (HyracksException e) {
- throw new HyracksDataException(e);
- } catch (UnknownHostException e) {
+ } catch (Exception e) {
throw new HyracksDataException(e);
}
}
diff --git a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java
index fac2949..1ab315b 100644
--- a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java
+++ b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.api.channels.IInputChannel;
import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
@@ -41,6 +42,8 @@
private final JobId jobId;
+ private final ResultSetId resultSetId;
+
private final int partition;
private final Queue<ByteBuffer> fullQueue;
@@ -54,10 +57,11 @@
private Object attachment;
public DatasetNetworkInputChannel(IChannelConnectionFactory netManager, SocketAddress remoteAddress, JobId jobId,
- int partition, int nBuffers) {
+ ResultSetId resultSetId, int partition, int nBuffers) {
this.netManager = netManager;
this.remoteAddress = remoteAddress;
this.jobId = jobId;
+ this.resultSetId = resultSetId;
this.partition = partition;
fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
this.nBuffers = nBuffers;
@@ -103,6 +107,7 @@
}
ByteBuffer writeBuffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
writeBuffer.putLong(jobId.getId());
+ writeBuffer.putLong(resultSetId.getId());
writeBuffer.putInt(partition);
writeBuffer.flip();
if (LOGGER.isLoggable(Level.FINE)) {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index e648733..21b05d4 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -99,7 +99,7 @@
records[partition].writeEOS();
for (DatasetDirectoryRecord record : records) {
- if (record.getStatus() == DatasetDirectoryRecord.Status.SUCCESS) {
+ if ((record != null) && (record.getStatus() == DatasetDirectoryRecord.Status.SUCCESS)) {
successCount++;
}
}
@@ -112,14 +112,18 @@
@Override
public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
DatasetJobRecord djr = jobResultLocations.get(jobId);
- djr.fail();
+ if (djr != null) {
+ djr.fail();
+ }
notifyAll();
}
@Override
public synchronized void reportJobFailure(JobId jobId) {
DatasetJobRecord djr = jobResultLocations.get(jobId);
- djr.fail();
+ if (djr != null) {
+ djr.fail();
+ }
notifyAll();
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
index cecd677..4e27f12 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
@@ -21,12 +21,13 @@
import java.util.Map;
import java.util.Set;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionWriter;
import edu.uci.ics.hyracks.api.dataset.Page;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
public class DatasetMemoryManager {
+ private int availableMemory;
+
private final Set<Page> availPages;
private final LeastRecentlyUsedList leastRecentlyUsedList;
@@ -36,29 +37,32 @@
private final static int FRAME_SIZE = 32768;
public DatasetMemoryManager(int availableMemory) {
+ this.availableMemory = availableMemory;
+
availPages = new HashSet<Page>();
// Atleast have one page for temporarily storing the results.
- if (availableMemory <= 0)
- availableMemory = FRAME_SIZE;
-
- while (availableMemory >= FRAME_SIZE) {
- /* TODO(madhusudancs): Should we have some way of accounting this memory usage by using Hyrack's allocateFrame()
- * instead of direct ByteBuffer.allocate()?
- */
- availPages.add(new Page(ByteBuffer.allocate(FRAME_SIZE)));
- availableMemory -= FRAME_SIZE;
- }
+ if (this.availableMemory <= FRAME_SIZE)
+ this.availableMemory = FRAME_SIZE;
leastRecentlyUsedList = new LeastRecentlyUsedList();
resultPartitionNodesMap = new HashMap<ResultSetPartitionId, PartitionNode>();
}
- public Page requestPage(ResultSetPartitionId resultSetPartitionId, IDatasetPartitionWriter dpw)
- throws OutOfMemoryError, HyracksDataException {
+ public synchronized Page requestPage(ResultSetPartitionId resultSetPartitionId, ResultState resultState)
+ throws HyracksDataException {
Page page;
if (availPages.isEmpty()) {
- page = evictPage();
+ if (availableMemory >= FRAME_SIZE) {
+ /* TODO(madhusudancs): Should we have some way of accounting this memory usage by using Hyrack's allocateFrame()
+ * instead of direct ByteBuffer.allocate()?
+ */
+ availPages.add(new Page(ByteBuffer.allocate(FRAME_SIZE)));
+ availableMemory -= FRAME_SIZE;
+ page = getAvailablePage();
+ } else {
+ page = evictPage();
+ }
} else {
page = getAvailablePage();
}
@@ -71,7 +75,7 @@
* update reference call before a page is pushed on to the element of the LRU list. So we first obtain the page,
* then make a updateReference call which in turn creates a new node in the LRU list and then add the page to it.
*/
- PartitionNode pn = updateReference(resultSetPartitionId, dpw);
+ PartitionNode pn = updateReference(resultSetPartitionId, resultState);
pn.add(page);
return page;
}
@@ -81,7 +85,7 @@
updateReference(resultSetPartitionId, null);
}
- public int getPageSize() {
+ public static int getPageSize() {
return FRAME_SIZE;
}
@@ -90,28 +94,29 @@
resultPartitionNodesMap.put(resultSetPartitionId, pn);
}
- protected synchronized PartitionNode updateReference(ResultSetPartitionId resultSetPartitionId,
- IDatasetPartitionWriter dpw) {
+ protected PartitionNode updateReference(ResultSetPartitionId resultSetPartitionId, ResultState resultState) {
PartitionNode pn = null;
if (!resultPartitionNodesMap.containsKey(resultSetPartitionId)) {
- if (dpw != null) {
- pn = new PartitionNode(resultSetPartitionId, dpw);
+ if (resultState != null) {
+ pn = new PartitionNode(resultSetPartitionId, resultState);
insertPartitionNode(resultSetPartitionId, pn);
}
return pn;
}
- pn = resultPartitionNodesMap.get(resultSetPartitionId);
- leastRecentlyUsedList.remove(pn);
- insertPartitionNode(resultSetPartitionId, pn);
+ synchronized (this) {
+ pn = resultPartitionNodesMap.get(resultSetPartitionId);
+ leastRecentlyUsedList.remove(pn);
+ insertPartitionNode(resultSetPartitionId, pn);
+ }
return pn;
}
- protected synchronized Page evictPage() throws HyracksDataException {
+ protected Page evictPage() throws HyracksDataException {
PartitionNode pn = leastRecentlyUsedList.getFirst();
- IDatasetPartitionWriter dpw = pn.getDatasetPartitionWriter();
- Page page = dpw.returnPage();
+ ResultState resultState = pn.getResultState();
+ Page page = resultState.returnPage();
/* If the partition holding the pages breaks the contract by not returning the page or it has no page, just take
* away all the pages allocated to it and add to the available pages set.
@@ -140,7 +145,7 @@
return page;
}
- protected synchronized Page getAvailablePage() {
+ protected Page getAvailablePage() {
Iterator<Page> iter = availPages.iterator();
Page page = iter.next();
iter.remove();
@@ -197,15 +202,15 @@
private final ResultSetPartitionId resultSetPartitionId;
- private final IDatasetPartitionWriter datasetPartitionWriter;
+ private final ResultState resultState;
private PartitionNode prev;
private PartitionNode next;
- public PartitionNode(ResultSetPartitionId resultSetPartitionId, IDatasetPartitionWriter datasetPartitionWriter) {
+ public PartitionNode(ResultSetPartitionId resultSetPartitionId, ResultState resultState) {
this.resultSetPartitionId = resultSetPartitionId;
- this.datasetPartitionWriter = datasetPartitionWriter;
+ this.resultState = resultState;
prev = null;
next = null;
}
@@ -214,8 +219,8 @@
return resultSetPartitionId;
}
- public IDatasetPartitionWriter getDatasetPartitionWriter() {
- return datasetPartitionWriter;
+ public ResultState getResultState() {
+ return resultState;
}
public void setPrev(PartitionNode node) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index af9a607..2ec7acc 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -14,14 +14,15 @@
*/
package edu.uci.ics.hyracks.control.nc.dataset;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.Executor;
+import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionReader;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
@@ -32,11 +33,13 @@
import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
public class DatasetPartitionManager implements IDatasetPartitionManager {
+ private static final Logger LOGGER = Logger.getLogger(DatasetPartitionManager.class.getName());
+
private final NodeControllerService ncs;
private final Executor executor;
- private final Map<JobId, ResultState[]> partitionResultStateMap;
+ private final Map<JobId, Map<ResultSetId, ResultState[]>> partitionResultStateMap;
private final DefaultDeallocatableRegistry deallocatableRegistry;
@@ -50,18 +53,34 @@
this.executor = executor;
deallocatableRegistry = new DefaultDeallocatableRegistry();
fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
- datasetMemoryManager = new DatasetMemoryManager(availableMemory);
- partitionResultStateMap = new LinkedHashMap<JobId, ResultState[]>() {
+ if (availableMemory >= DatasetMemoryManager.getPageSize()) {
+ datasetMemoryManager = new DatasetMemoryManager(availableMemory);
+ } else {
+ datasetMemoryManager = null;
+ }
+ partitionResultStateMap = new LinkedHashMap<JobId, Map<ResultSetId, ResultState[]>>() {
private static final long serialVersionUID = 1L;
- protected boolean removeEldestEntry(Map.Entry<JobId, ResultState[]> eldest) {
- if (size() > resultHistorySize) {
- for (ResultState state : eldest.getValue()) {
- state.deinit();
+ protected boolean removeEldestEntry(Map.Entry<JobId, Map<ResultSetId, ResultState[]>> eldest) {
+ synchronized (DatasetPartitionManager.this) {
+ if (size() > resultHistorySize) {
+ Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(eldest.getValue());
+ for (ResultSetId rsId : rsIdMap.keySet()) {
+ ResultState[] resultStates = rsIdMap.get(rsId);
+ if (resultStates != null) {
+ for (int i = 0; i < resultStates.length; i++) {
+ ResultState state = resultStates[i];
+ if (state != null) {
+ state.closeAndDelete();
+ LOGGER.fine("Removing partition: " + i + " for JobId: " + eldest.getKey());
+ }
+ }
+ }
+ }
+ return true;
}
- return true;
+ return false;
}
- return false;
}
};
}
@@ -72,15 +91,21 @@
DatasetPartitionWriter dpw = null;
JobId jobId = ctx.getJobletContext().getJobId();
try {
- synchronized (partitionResultStateMap) {
+ synchronized (this) {
ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
- dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, datasetMemoryManager);
+ dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, datasetMemoryManager, fileFactory);
- ResultState[] resultStates = partitionResultStateMap.get(jobId);
+ Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
+ if (rsIdMap == null) {
+ rsIdMap = new HashMap<ResultSetId, ResultState[]>();
+ partitionResultStateMap.put(jobId, rsIdMap);
+ }
+
+ ResultState[] resultStates = rsIdMap.get(rsId);
if (resultStates == null) {
resultStates = new ResultState[nPartitions];
- partitionResultStateMap.put(jobId, resultStates);
+ rsIdMap.put(rsId, resultStates);
}
resultStates[partition] = dpw.getResultState();
}
@@ -88,12 +113,15 @@
throw new HyracksException(e);
}
+ LOGGER.fine("Initialized partition writer: JobId: " + jobId + ":partition: " + partition);
return dpw;
}
@Override
public void reportPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
try {
+ LOGGER.fine("Reporting partition write completion: JobId: " + jobId + ": ResultSetId: " + rsId
+ + ":partition: " + partition);
ncs.getClusterController().reportResultPartitionWriteCompletion(jobId, rsId, partition);
} catch (Exception e) {
throw new HyracksException(e);
@@ -103,6 +131,8 @@
@Override
public void reportPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
try {
+ LOGGER.info("Reporting partition failure: JobId: " + jobId + ": ResultSetId: " + rsId + ":partition: "
+ + partition);
ncs.getClusterController().reportResultPartitionFailure(jobId, rsId, partition);
} catch (Exception e) {
throw new HyracksException(e);
@@ -110,24 +140,51 @@
}
@Override
- public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter writer)
- throws HyracksException {
+ public void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition,
+ IFrameWriter writer) throws HyracksException {
ResultState resultState;
- synchronized (partitionResultStateMap) {
- ResultState[] resultStates = partitionResultStateMap.get(jobId);
+ synchronized (this) {
+ Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
- if (resultStates == null) {
+ if (rsIdMap == null) {
throw new HyracksException("Unknown JobId " + jobId);
}
+ ResultState[] resultStates = rsIdMap.get(resultSetId);
+ if (resultStates == null) {
+ throw new HyracksException("Unknown JobId: " + jobId + " ResultSetId: " + resultSetId);
+ }
+
resultState = resultStates[partition];
if (resultState == null) {
throw new HyracksException("No DatasetPartitionWriter for partition " + partition);
}
}
- IDatasetPartitionReader dpr = new DatasetPartitionReader(datasetMemoryManager, executor, resultState);
+ DatasetPartitionReader dpr = new DatasetPartitionReader(datasetMemoryManager, executor, resultState);
dpr.writeTo(writer);
+ LOGGER.fine("Initialized partition reader: JobId: " + jobId + ":ResultSetId: " + resultSetId + ":partition: "
+ + partition);
+ }
+
+ @Override
+ public synchronized void abortReader(JobId jobId) {
+ Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
+
+ if (rsIdMap == null) {
+ return;
+ }
+
+ for (ResultSetId rsId : rsIdMap.keySet()) {
+ ResultState[] resultStates = rsIdMap.get(rsId);
+ if (resultStates != null) {
+ for (ResultState state : resultStates) {
+ if (state != null) {
+ state.abort();
+ }
+ }
+ }
+ }
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
index a584b4b..07624de 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
@@ -20,14 +20,10 @@
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionReader;
-import edu.uci.ics.hyracks.api.dataset.Page;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.IFileHandle;
-import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
-public class DatasetPartitionReader implements IDatasetPartitionReader {
+public class DatasetPartitionReader {
private static final Logger LOGGER = Logger.getLogger(DatasetPartitionReader.class.getName());
private final DatasetMemoryManager datasetMemoryManager;
@@ -36,51 +32,12 @@
private final ResultState resultState;
- private IFileHandle fileHandle;
-
public DatasetPartitionReader(DatasetMemoryManager datasetMemoryManager, Executor executor, ResultState resultState) {
this.datasetMemoryManager = datasetMemoryManager;
this.executor = executor;
this.resultState = resultState;
}
- private long read(long offset, ByteBuffer buffer) throws HyracksDataException {
- long readSize = 0;
- synchronized (resultState) {
- while (offset >= resultState.getSize() && !resultState.getEOS()) {
- try {
- resultState.wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
- }
-
- if (offset >= resultState.getSize() && resultState.getEOS()) {
- return readSize;
- }
-
- if (offset < resultState.getPersistentSize()) {
- readSize = resultState.getIOManager().syncRead(fileHandle, offset, buffer);
- }
-
- if (readSize < buffer.capacity()) {
- long localPageOffset = offset - resultState.getPersistentSize();
- int localPageIndex = (int) (localPageOffset / datasetMemoryManager.getPageSize());
- int pageOffset = (int) (localPageOffset % datasetMemoryManager.getPageSize());
- Page page = resultState.getPage(localPageIndex);
- if (page == null) {
- return readSize;
- }
- readSize += buffer.remaining();
- buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
- }
-
- datasetMemoryManager.pageReferenced(resultState.getResultSetPartitionId());
- return readSize;
- }
-
- @Override
public void writeTo(final IFrameWriter writer) {
executor.execute(new Runnable() {
@Override
@@ -88,8 +45,7 @@
NetworkOutputChannel channel = (NetworkOutputChannel) writer;
channel.setFrameSize(resultState.getFrameSize());
try {
- fileHandle = resultState.getIOManager().open(resultState.getValidFileReference(),
- IIOManager.FileReadWriteMode.READ_ONLY, IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ resultState.readOpen();
channel.open();
try {
long offset = 0;
@@ -109,10 +65,8 @@
}
} finally {
channel.close();
- resultState.getIOManager().close(fileHandle);
+ resultState.readClose();
}
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
} catch (HyracksDataException e) {
throw new RuntimeException(e);
}
@@ -120,6 +74,14 @@
LOGGER.info("result reading successful(" + resultState.getResultSetPartitionId() + ")");
}
}
+
+ private long read(long offset, ByteBuffer buffer) throws HyracksDataException {
+ if (datasetMemoryManager == null) {
+ return resultState.read(offset, buffer);
+ } else {
+ return resultState.read(datasetMemoryManager, offset, buffer);
+ }
+ }
});
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 317f553..8f4b639 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -18,24 +18,19 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionWriter;
-import edu.uci.ics.hyracks.api.dataset.Page;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.io.IFileHandle;
-import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
-public class DatasetPartitionWriter implements IDatasetPartitionWriter {
+public class DatasetPartitionWriter implements IFrameWriter {
private static final Logger LOGGER = Logger.getLogger(DatasetPartitionWriter.class.getName());
- private static final String FILE_PREFIX = "result_";
-
private final IDatasetPartitionManager manager;
private final JobId jobId;
@@ -50,10 +45,9 @@
private final ResultState resultState;
- private IFileHandle fileHandle;
-
public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
- ResultSetId rsId, int partition, DatasetMemoryManager datasetMemoryManager) {
+ ResultSetId rsId, int partition, DatasetMemoryManager datasetMemoryManager,
+ IWorkspaceFileFactory fileFactory) {
this.manager = manager;
this.jobId = jobId;
this.resultSetId = rsId;
@@ -61,7 +55,7 @@
this.datasetMemoryManager = datasetMemoryManager;
resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, partition);
- resultState = new ResultState(resultSetPartitionId, ctx.getIOManager(), ctx.getFrameSize());
+ resultState = new ResultState(resultSetPartitionId, ctx.getIOManager(), fileFactory, ctx.getFrameSize());
}
public ResultState getResultState() {
@@ -69,41 +63,27 @@
}
@Override
- public void open() throws HyracksDataException {
+ public void open() {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("open(" + partition + ")");
}
- String fName = FILE_PREFIX + String.valueOf(partition);
- FileReference fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(fName);
- fileHandle = resultState.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
- IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
- resultState.init(fRef, fileHandle);
+ resultState.open();
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- int srcOffset = 0;
- Page destPage = resultState.getLastPage();
-
- while (srcOffset < buffer.limit()) {
- if ((destPage == null) || (destPage.getBuffer().remaining() <= 0)) {
- destPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
- resultState.addPage(destPage);
- }
- int srcLength = Math.min(buffer.limit() - srcOffset, destPage.getBuffer().remaining());
- destPage.getBuffer().put(buffer.array(), srcOffset, srcLength);
- srcOffset += srcLength;
- resultState.incrementSize(srcLength);
- }
-
- synchronized (resultState) {
- resultState.notifyAll();
+ if (datasetMemoryManager == null) {
+ resultState.write(buffer);
+ } else {
+ resultState.write(datasetMemoryManager, buffer);
}
}
@Override
public void fail() throws HyracksDataException {
try {
+ resultState.closeAndDelete();
+ resultState.abort();
manager.reportPartitionFailure(jobId, resultSetId, partition);
} catch (HyracksException e) {
throw new HyracksDataException(e);
@@ -117,32 +97,10 @@
}
try {
- synchronized (resultState) {
- resultState.setEOS(true);
- resultState.notifyAll();
- }
+ resultState.close();
manager.reportPartitionWriteCompletion(jobId, resultSetId, partition);
} catch (HyracksException e) {
throw new HyracksDataException(e);
}
}
-
- @Override
- public Page returnPage() throws HyracksDataException {
- Page page = resultState.removePage(0);
-
- IIOManager ioManager = resultState.getIOManager();
-
- // If we do not have any pages to be given back close the write channel since we don't write any more, return null.
- if (page == null) {
- ioManager.close(fileHandle);
- return null;
- }
-
- page.getBuffer().flip();
-
- long delta = ioManager.syncWrite(fileHandle, resultState.getPersistentSize(), page.getBuffer());
- resultState.incrementPersistentSize(delta);
- return page;
- }
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
index 661df93..911f372 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -17,28 +17,35 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
import edu.uci.ics.hyracks.api.dataset.Page;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IFileHandle;
import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
public class ResultState implements IStateObject {
+ private static final String FILE_PREFIX = "result_";
+
private final ResultSetPartitionId resultSetPartitionId;
private final int frameSize;
private final IIOManager ioManager;
+ private final IWorkspaceFileFactory fileFactory;
+
private final AtomicBoolean eos;
- private final AtomicBoolean readEOS;
+ private final AtomicBoolean failed;
private final List<Page> localPageList;
@@ -46,29 +53,40 @@
private IFileHandle writeFileHandle;
+ private IFileHandle readFileHandle;
+
private long size;
private long persistentSize;
- ResultState(ResultSetPartitionId resultSetPartitionId, IIOManager ioManager, int frameSize) {
+ ResultState(ResultSetPartitionId resultSetPartitionId, IIOManager ioManager, IWorkspaceFileFactory fileFactory,
+ int frameSize) {
this.resultSetPartitionId = resultSetPartitionId;
this.ioManager = ioManager;
+ this.fileFactory = fileFactory;
this.frameSize = frameSize;
eos = new AtomicBoolean(false);
- readEOS = new AtomicBoolean(false);
+ failed = new AtomicBoolean(false);
localPageList = new ArrayList<Page>();
+
+ fileRef = null;
+ writeFileHandle = null;
}
- public synchronized void init(FileReference fileRef, IFileHandle writeFileHandle) {
- this.fileRef = fileRef;
- this.writeFileHandle = writeFileHandle;
-
+ public synchronized void open() {
size = 0;
persistentSize = 0;
+ }
+
+ public synchronized void close() {
+ eos.set(true);
notifyAll();
}
- public synchronized void deinit() {
+ public synchronized void closeAndDelete() {
+ // Deleting a job is equivalent to aborting the job for all practical purposes, so the same action, needs
+ // to be taken when there are more requests to these result states.
+ failed.set(true);
if (writeFileHandle != null) {
try {
ioManager.close(writeFileHandle);
@@ -76,7 +94,149 @@
// Since file handle could not be closed, just ignore.
}
}
- fileRef.delete();
+ if (fileRef != null) {
+ fileRef.delete();
+ }
+ }
+
+ public synchronized void write(ByteBuffer buffer) throws HyracksDataException {
+ if (fileRef == null) {
+ String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition());
+ fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
+ writeFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ }
+
+ size += ioManager.syncWrite(writeFileHandle, size, buffer);
+
+ notifyAll();
+ }
+
+ public synchronized void write(DatasetMemoryManager datasetMemoryManager, ByteBuffer buffer)
+ throws HyracksDataException {
+ int srcOffset = 0;
+ Page destPage = null;
+
+ if (!localPageList.isEmpty()) {
+ destPage = localPageList.get(localPageList.size() - 1);
+ }
+
+ while (srcOffset < buffer.limit()) {
+ if ((destPage == null) || (destPage.getBuffer().remaining() <= 0)) {
+ destPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
+ localPageList.add(destPage);
+ }
+ int srcLength = Math.min(buffer.limit() - srcOffset, destPage.getBuffer().remaining());
+ destPage.getBuffer().put(buffer.array(), srcOffset, srcLength);
+ srcOffset += srcLength;
+ size += srcLength;
+ }
+
+ notifyAll();
+ }
+
+ public synchronized void readOpen() {
+ // It is a noOp for now, leaving here to keep the API stable for future usage.
+ }
+
+ public synchronized void readClose() throws HyracksDataException {
+ if (readFileHandle != null) {
+ ioManager.close(readFileHandle);
+ }
+ }
+
+ public synchronized long read(long offset, ByteBuffer buffer) throws HyracksDataException {
+ long readSize = 0;
+
+ while (offset >= size && !eos.get() && !failed.get()) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ if ((offset >= size && eos.get()) || failed.get()) {
+ return readSize;
+ }
+
+ if (readFileHandle == null) {
+ initReadFileHandle();
+ }
+ readSize = ioManager.syncRead(readFileHandle, offset, buffer);
+
+ return readSize;
+ }
+
+ public long read(DatasetMemoryManager datasetMemoryManager, long offset, ByteBuffer buffer)
+ throws HyracksDataException {
+ long readSize = 0;
+ synchronized (this) {
+ while (offset >= size && !eos.get() && !failed.get()) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ if ((offset >= size && eos.get()) || failed.get()) {
+ return readSize;
+ }
+
+ if (offset < persistentSize) {
+ if (readFileHandle == null) {
+ initReadFileHandle();
+ }
+ readSize = ioManager.syncRead(readFileHandle, offset, buffer);
+ }
+
+ if (readSize < buffer.capacity()) {
+ long localPageOffset = offset - persistentSize;
+ int localPageIndex = (int) (localPageOffset / DatasetMemoryManager.getPageSize());
+ int pageOffset = (int) (localPageOffset % DatasetMemoryManager.getPageSize());
+ Page page = getPage(localPageIndex);
+ if (page == null) {
+ return readSize;
+ }
+ readSize += buffer.remaining();
+ buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
+ }
+ }
+ datasetMemoryManager.pageReferenced(resultSetPartitionId);
+ return readSize;
+ }
+
+ public synchronized void abort() {
+ failed.set(true);
+ notifyAll();
+ }
+
+ public synchronized Page returnPage() throws HyracksDataException {
+ Page page = removePage();
+
+ // If we do not have any pages to be given back close the write channel since we don't write any more, return null.
+ if (page == null) {
+ ioManager.close(writeFileHandle);
+ return null;
+ }
+
+ page.getBuffer().flip();
+
+ if (fileRef == null) {
+ String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition());
+ fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
+ writeFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ notifyAll();
+ }
+
+ long delta = ioManager.syncWrite(writeFileHandle, persistentSize, page.getBuffer());
+ persistentSize += delta;
+ return page;
+ }
+
+ public synchronized void setEOS(boolean eos) {
+ this.eos.set(eos);
}
public ResultSetPartitionId getResultSetPartitionId() {
@@ -91,76 +251,6 @@
return ioManager;
}
- public synchronized void incrementSize(long delta) {
- size += delta;
- }
-
- public synchronized long getSize() {
- return size;
- }
-
- public synchronized void incrementPersistentSize(long delta) {
- persistentSize += delta;
- }
-
- public synchronized long getPersistentSize() {
- return persistentSize;
- }
-
- public void setEOS(boolean eos) {
- this.eos.set(eos);
- }
-
- public boolean getEOS() {
- return eos.get();
- }
-
- public boolean getReadEOS() {
- return readEOS.get();
- }
-
- public synchronized void addPage(Page page) {
- localPageList.add(page);
- }
-
- public synchronized Page removePage(int index) {
- Page page = null;
- if (!localPageList.isEmpty()) {
- page = localPageList.remove(index);
- }
- return page;
- }
-
- public synchronized Page getPage(int index) {
- Page page = null;
- if (!localPageList.isEmpty()) {
- page = localPageList.get(index);
- }
- return page;
- }
-
- public synchronized Page getLastPage() {
- Page page = null;
- if (!localPageList.isEmpty()) {
- page = localPageList.get(localPageList.size() - 1);
- }
- return page;
- }
-
- public synchronized Page getFirstPage() {
- Page page = null;
- if (!localPageList.isEmpty()) {
- page = localPageList.get(0);
- }
- return page;
- }
-
- public synchronized FileReference getValidFileReference() throws InterruptedException {
- while (fileRef == null)
- wait();
- return fileRef;
- }
-
@Override
public JobId getJobId() {
return resultSetPartitionId.getJobId();
@@ -185,4 +275,36 @@
public void fromBytes(DataInput in) throws IOException {
throw new UnsupportedOperationException();
}
+
+ private Page getPage(int index) {
+ Page page = null;
+ if (!localPageList.isEmpty()) {
+ page = localPageList.get(index);
+ }
+ return page;
+ }
+
+ private Page removePage() {
+ Page page = null;
+ if (!localPageList.isEmpty()) {
+ page = localPageList.remove(localPageList.size() - 1);
+ }
+ return page;
+ }
+
+ private void initReadFileHandle() throws HyracksDataException {
+ while (fileRef == null && !failed.get()) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ if (failed.get()) {
+ return;
+ }
+
+ readFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_ONLY,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
index 5b8b333..84baf49 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.comm.channels.IChannelConnectionFactory;
@@ -95,6 +96,7 @@
@Override
public void accept(ByteBuffer buffer) {
JobId jobId = new JobId(buffer.getLong());
+ ResultSetId rsId = new ResultSetId(buffer.getLong());
int partition = buffer.getInt();
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Received initial dataset partition read request for JobId: " + jobId + " partition: "
@@ -102,7 +104,7 @@
}
noc = new NetworkOutputChannel(ccb, 1);
try {
- partitionManager.initializeDatasetPartitionReader(jobId, partition, noc);
+ partitionManager.initializeDatasetPartitionReader(jobId, rsId, partition, noc);
} catch (HyracksException e) {
noc.abort();
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
index 8f8c032..54ac99a 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
@@ -20,6 +20,7 @@
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
import edu.uci.ics.hyracks.control.nc.Joblet;
@@ -46,6 +47,11 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Aborting Tasks: " + jobId + ":" + tasks);
}
+ IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
+ if (dpm != null) {
+ ncs.getDatasetPartitionManager().abortReader(jobId);
+ }
+
Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
Joblet ji = jobletMap.get(jobId);
if (ji != null) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
index 3957934..013544d 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.hyracks.control.nc.work;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.control.nc.Task;
@@ -32,8 +34,12 @@
@Override
public void run() {
try {
- ncs.getClusterController().notifyTaskFailure(task.getJobletContext().getJobId(), task.getTaskAttemptId(),
- ncs.getId(), details);
+ JobId jobId = task.getJobletContext().getJobId();
+ IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
+ if (dpm != null) {
+ dpm.abortReader(jobId);
+ }
+ ncs.getClusterController().notifyTaskFailure(jobId, task.getTaskAttemptId(), ncs.getId(), details);
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java
index 07f6ba2..c403501 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java
@@ -32,6 +32,9 @@
}
public void reset(ByteBuffer buffer, boolean clear) {
+ if (clear) {
+ buffer.clear();
+ }
frameTupleAppender.reset(buffer, clear);
}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index e74e54c..715f822 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -60,7 +60,7 @@
private static IHyracksClientConnection hcc;
private final List<File> outputFiles;
-
+
protected static int DEFAULT_MEM_PAGE_SIZE = 32768;
protected static int DEFAULT_MEM_NUM_PAGES = 1000;
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 2847004..81c2367 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -647,4 +647,9 @@
btree.validate();
}
}
+
+ @Override
+ public String toString() {
+ return "LSMBTree [" + fileManager.getBaseDir() + "]";
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index e4dd369..12c7ea7 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -190,4 +190,9 @@
public IBufferCache getBufferCache() {
return diskBufferCache;
}
+
+ @Override
+ public String toString() {
+ return "LSMIndex [" + fileManager.getBaseDir() + "]";
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 65e6a3d..7372b86 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -16,8 +16,9 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -35,6 +36,8 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
public class LSMHarness implements ILSMHarness {
+ private static final Logger LOGGER = Logger.getLogger(LSMHarness.class.getName());
+
private final ILSMIndexInternal lsmIndex;
private final ILSMMergePolicy mergePolicy;
private final ILSMOperationTracker opTracker;
@@ -171,7 +174,7 @@
}
lsmIndex.setFlushStatus(false);
-
+
if (!lsmIndex.scheduleFlush(ctx, callback)) {
callback.beforeOperation();
callback.afterOperation(null, null);
@@ -184,7 +187,11 @@
public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
IndexException {
operation.getCallback().beforeOperation();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(lsmIndex + ": flushing");
+ }
ILSMComponent newComponent = lsmIndex.flush(operation);
+
operation.getCallback().afterOperation(null, newComponent);
lsmIndex.markAsValid(newComponent);
operation.getCallback().afterFinalize(newComponent);
@@ -215,6 +222,9 @@
IndexException {
List<ILSMComponent> mergedComponents = new ArrayList<ILSMComponent>();
operation.getCallback().beforeOperation();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(lsmIndex + ": merging");
+ }
ILSMComponent newComponent = lsmIndex.merge(mergedComponents, operation);
ctx.getComponentHolder().addAll(mergedComponents);
operation.getCallback().afterOperation(mergedComponents, newComponent);
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 33052f3..30fdd27 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -566,6 +566,7 @@
public class LSMInvertedIndexBulkLoader implements IIndexBulkLoader {
private final ILSMComponent component;
private final IIndexBulkLoader invIndexBulkLoader;
+ private boolean exceptionCaught = false;
public LSMInvertedIndexBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
throws IndexException {
@@ -599,6 +600,7 @@
}
protected void handleException() throws HyracksDataException {
+ exceptionCaught = true;
((LSMInvertedIndexImmutableComponent) component).getInvIndex().deactivate();
((LSMInvertedIndexImmutableComponent) component).getInvIndex().destroy();
((LSMInvertedIndexImmutableComponent) component).getDeletedKeysBTree().deactivate();
@@ -609,7 +611,9 @@
@Override
public void end() throws IndexException, HyracksDataException {
- invIndexBulkLoader.end();
+ if (!exceptionCaught) {
+ invIndexBulkLoader.end();
+ }
lsmHarness.addBulkLoadedComponent(component);
}
}
@@ -732,4 +736,9 @@
component.getDeletedKeysBTree().validate();
}
}
+
+ @Override
+ public String toString() {
+ return "LSMInvertedIndex [" + fileManager.getBaseDir() + "]";
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index 9b377a9..4b2e075 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -26,6 +26,7 @@
import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
@@ -55,6 +56,7 @@
import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree.RTreeAccessor;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
public abstract class AbstractLSMRTree extends AbstractLSMIndex implements ITreeIndex {
@@ -279,6 +281,8 @@
throw new UnsupportedOperationException("Physical delete not supported in the LSM-RTree");
}
+ ctx.modificationCallback.before(tuple);
+ ctx.modificationCallback.found(null, tuple);
if (ctx.getOperation() == IndexOperation.INSERT) {
// Before each insert, we must check whether there exist a killer
// tuple in the memBTree. If we find a killer tuple, we must truly
@@ -323,13 +327,16 @@
}
protected LSMRTreeOpContext createOpContext(IModificationOperationCallback modCallback) {
- return new LSMRTreeOpContext((RTree.RTreeAccessor) mutableComponent.getRTree().createAccessor(modCallback,
- NoOpOperationCallback.INSTANCE), (IRTreeLeafFrame) rtreeLeafFrameFactory.createFrame(),
+ RTreeAccessor rtreeAccessor = (RTree.RTreeAccessor) mutableComponent.getRTree().createAccessor(
+ NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ BTreeAccessor btreeAccessor = (BTree.BTreeAccessor) mutableComponent.getBTree().createAccessor(
+ NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+
+ return new LSMRTreeOpContext(rtreeAccessor, (IRTreeLeafFrame) rtreeLeafFrameFactory.createFrame(),
(IRTreeInteriorFrame) rtreeInteriorFrameFactory.createFrame(), memFreePageManager
- .getMetaDataFrameFactory().createFrame(), 4, (BTree.BTreeAccessor) mutableComponent.getBTree()
- .createAccessor(modCallback, NoOpOperationCallback.INSTANCE), btreeLeafFrameFactory,
+ .getMetaDataFrameFactory().createFrame(), 4, btreeAccessor, btreeLeafFrameFactory,
btreeInteriorFrameFactory, memFreePageManager.getMetaDataFrameFactory().createFrame(),
- rtreeCmpFactories, btreeCmpFactories, null, null);
+ rtreeCmpFactories, btreeCmpFactories, modCallback, NoOpOperationCallback.INSTANCE);
}
@Override
@@ -355,4 +362,9 @@
InMemoryBufferCache memBufferCache = (InMemoryBufferCache) mutableComponent.getRTree().getBufferCache();
return memBufferCache.getNumPages() * memBufferCache.getPageSize();
}
+
+ @Override
+ public String toString() {
+ return "LSMRTree [" + fileManager.getBaseDir() + "]";
+ }
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index b761f62..6c7d5fd 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -52,7 +52,7 @@
ccConfig.clusterNetPort = TEST_HYRACKS_CC_PORT;
ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_PORT;
ccConfig.defaultMaxJobAttempts = 0;
- ccConfig.jobHistorySize = 0;
+ ccConfig.jobHistorySize = 1;
ccConfig.profileDumpPeriod = -1;
// cluster controller