merge fullstack_lsm_staging
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index b8bdf3e..ba51eff 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -27,7 +27,6 @@
INNERJOIN,
LEFTOUTERJOIN,
LIMIT,
- DIE,
NESTEDTUPLESOURCE,
ORDER,
PROJECT,
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 0efb5ff..c3a59ea 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -33,7 +33,6 @@
SPLIT,
STABLE_SORT,
STREAM_LIMIT,
- STREAM_DIE,
STREAM_SELECT,
STREAM_PROJECT,
STRING_STREAM_SCRIPT,
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DieOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DieOperator.java
deleted file mode 100644
index 03bfcba..0000000
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DieOperator.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
-
-import java.util.ArrayList;
-
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
-import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
-import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
-import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
-
-public class DieOperator extends AbstractLogicalOperator {
-
- private final Mutable<ILogicalExpression> afterObjects; // mandatory
-
- public DieOperator(ILogicalExpression maxObjectsExpr) {
- this.afterObjects = new MutableObject<ILogicalExpression>(maxObjectsExpr);
- }
-
- public Mutable<ILogicalExpression> getAfterObjects() {
- return afterObjects;
- }
-
- @Override
- public void recomputeSchema() {
- schema = new ArrayList<LogicalVariable>();
- schema.addAll(inputs.get(0).getValue().getSchema());
- }
-
- @Override
- public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
- return visitor.visitDieOperator(this, arg);
- }
-
- @Override
- public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
- boolean b = false;
- if (visitor.transform(afterObjects)) {
- b = true;
- }
- return b;
- }
-
- @Override
- public LogicalOperatorTag getOperatorTag() {
- return LogicalOperatorTag.DIE;
- }
-
- @Override
- public VariablePropagationPolicy getVariablePropagationPolicy() {
- return VariablePropagationPolicy.ALL;
- }
-
- @Override
- public boolean isMap() {
- return true;
- }
-
- @Override
- public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
- return createPropagatingAllInputsTypeEnvironment(ctx);
- }
-
- @Override
- public boolean requiresVariableReferenceExpressions() {
- return false;
- }
-}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index 1b4be1e..527a11c 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -45,7 +45,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -357,12 +356,6 @@
}
@Override
- public Void visitDieOperator(DieOperator op, IOptimizationContext ctx) throws AlgebricksException {
- propagateFDsAndEquivClasses(op, ctx);
- return null;
- }
-
- @Override
public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, IOptimizationContext ctx)
throws AlgebricksException {
AbstractLogicalOperator op1 = (AbstractLogicalOperator) op.getDataSourceReference().getValue();
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index ac6d887..745b8c6 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -36,7 +36,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -180,16 +179,6 @@
}
@Override
- public Boolean visitDieOperator(DieOperator op, ILogicalOperator arg) throws AlgebricksException {
- AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
- if (aop.getOperatorTag() != LogicalOperatorTag.DIE)
- return Boolean.FALSE;
- DieOperator dieOpArg = (DieOperator) copyAndSubstituteVar(op, arg);
- boolean isomorphic = op.getAfterObjects().getValue().equals(dieOpArg.getAfterObjects().getValue());
- return isomorphic;
- }
-
- @Override
public Boolean visitInnerJoinOperator(InnerJoinOperator op, ILogicalOperator arg) throws AlgebricksException {
AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
if (aop.getOperatorTag() != LogicalOperatorTag.INNERJOIN)
@@ -643,11 +632,6 @@
}
@Override
- public ILogicalOperator visitDieOperator(DieOperator op, Void arg) throws AlgebricksException {
- return new DieOperator(deepCopyExpressionRef(op.getAfterObjects()).getValue());
- }
-
- @Override
public ILogicalOperator visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
return new InnerJoinOperator(deepCopyExpressionRef(op.getCondition()), op.getInputs().get(0), op
.getInputs().get(1));
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index b9544c7..ac8cabe 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -35,7 +35,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -109,12 +108,6 @@
}
@Override
- public Void visitDieOperator(DieOperator op, ILogicalOperator arg) throws AlgebricksException {
- mapVariablesStandard(op, arg);
- return null;
- }
-
- @Override
public Void visitInnerJoinOperator(InnerJoinOperator op, ILogicalOperator arg) throws AlgebricksException {
mapVariablesStandard(op, arg);
return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 8f1d686..e4fd78e 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -27,7 +27,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -142,12 +141,6 @@
}
@Override
- public Void visitDieOperator(DieOperator op, IOptimizationContext arg) throws AlgebricksException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, IOptimizationContext arg)
throws AlgebricksException {
// TODO Auto-generated method stub
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 31adcba..91ffe4b 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -30,7 +30,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -136,11 +135,6 @@
}
@Override
- public Void visitDieOperator(DieOperator op, Void arg) throws AlgebricksException {
- return null;
- }
-
- @Override
public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) throws AlgebricksException {
return null;
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index cd0cee3..0de9652 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -29,7 +29,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -152,12 +151,6 @@
}
@Override
- public Void visitDieOperator(DieOperator op, Void arg) throws AlgebricksException {
- standardLayout(op);
- return null;
- }
-
- @Override
public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) throws AlgebricksException {
VariableUtilities.getLiveVariables(op.getSourceOperator(), schemaVariables);
return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 69fb3f8..bb9b600 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -31,7 +31,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -185,14 +184,6 @@
}
@Override
- public Void visitDieOperator(DieOperator op, Pair<LogicalVariable, LogicalVariable> pair)
- throws AlgebricksException {
- op.getAfterObjects().getValue().substituteVar(pair.first, pair.second);
- substVarTypes(op, pair);
- return null;
- }
-
- @Override
public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Pair<LogicalVariable, LogicalVariable> pair)
throws AlgebricksException {
return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 5361a19..a620e54 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -30,7 +30,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -198,12 +197,6 @@
}
@Override
- public Void visitDieOperator(DieOperator op, Void arg) {
- op.getAfterObjects().getValue().getUsedVariables(usedVariables);
- return null;
- }
-
- @Override
public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) {
// does not use any variable
return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamDiePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamDiePOperator.java
deleted file mode 100644
index 73b9b79..0000000
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamDiePOperator.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.std.StreamDieRuntimeFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-
-public class StreamDiePOperator extends AbstractPhysicalOperator {
-
- public StreamDiePOperator() {
- }
-
- @Override
- public PhysicalOperatorTag getOperatorTag() {
- return PhysicalOperatorTag.STREAM_DIE;
- }
-
- @Override
- public boolean isMicroOperator() {
- return true;
- }
-
- @Override
- public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- ILogicalOperator op2 = op.getInputs().get(0).getValue();
- deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
- }
-
- @Override
- public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
- StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
- pv[0] = (StructuralPropertiesVector) reqdByParent;
- return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
- }
-
- @Override
- public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
- IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
- throws AlgebricksException {
- DieOperator die = (DieOperator) op;
- IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
- IVariableTypeEnvironment env = context.getTypeEnvironment(op);
- IScalarEvaluatorFactory afterObjectsFact = expressionRuntimeProvider.createEvaluatorFactory(die
- .getAfterObjects().getValue(), env, inputSchemas, context);
- RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
- StreamDieRuntimeFactory runtime = new StreamDieRuntimeFactory(afterObjectsFact, null,
- context.getBinaryIntegerInspectorFactory());
- builder.contributeMicroOperator(die, runtime, recDesc);
- ILogicalOperator src = die.getInputs().get(0).getValue();
- builder.contributeGraphEdge(src, 0, die, 0);
- }
-
-}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
index 54c0505..7861dc0 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
@@ -31,6 +31,8 @@
public class StreamProjectPOperator extends AbstractPropagatePropertiesForUsedVariablesPOperator {
+ private boolean flushFramesRapidly;
+
@Override
public PhysicalOperatorTag getOperatorTag() {
return PhysicalOperatorTag.STREAM_PROJECT;
@@ -61,8 +63,9 @@
}
projectionList[i++] = pos;
}
- StreamProjectRuntimeFactory runtime = new StreamProjectRuntimeFactory(projectionList);
- RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+ StreamProjectRuntimeFactory runtime = new StreamProjectRuntimeFactory(projectionList, flushFramesRapidly);
+ RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
+ context);
builder.contributeMicroOperator(project, runtime, recDesc);
ILogicalOperator src = project.getInputs().get(0).getValue();
builder.contributeGraphEdge(src, 0, project, 0);
@@ -74,4 +77,8 @@
computeDeliveredPropertiesForUsedVariables(p, p.getVariables());
}
+ public void setRapidFrameFlush(boolean flushFramesRapidly) {
+ this.flushFramesRapidly = flushFramesRapidly;
+ }
+
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index fc0c433..4b7597c 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -28,7 +28,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -256,13 +255,6 @@
}
@Override
- public String visitDieOperator(DieOperator op, Integer indent) {
- StringBuilder buffer = new StringBuilder();
- addIndent(buffer, indent).append("die after " + op.getAfterObjects().getValue());
- return buffer.toString();
- }
-
- @Override
public String visitExchangeOperator(ExchangeOperator op, Integer indent) {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("exchange ");
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 23dac2a..32b049e 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -18,7 +18,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -106,5 +105,4 @@
public R visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, T tag) throws AlgebricksException;
- public R visitDieOperator(DieOperator op, T arg) throws AlgebricksException;
}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushDieUpRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushDieUpRule.java
deleted file mode 100644
index c4dd78d..0000000
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushDieUpRule.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package edu.uci.ics.hyracks.algebricks.rewriter.rules;
-
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-
-public class PushDieUpRule implements IAlgebraicRewriteRule {
-
- @Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
- return false;
- }
-
- @Override
- public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
- throws AlgebricksException {
- AbstractLogicalOperator op0 = (AbstractLogicalOperator) opRef.getValue();
- if (op0.getInputs().size() == 0)
- return false;
- AbstractLogicalOperator op1 = (AbstractLogicalOperator) op0.getInputs().get(0).getValue();
-
- if (op1.getInputs().size() == 0)
- return false;
- LogicalOperatorTag tag = op1.getOperatorTag();
- if (tag == LogicalOperatorTag.SINK || tag == LogicalOperatorTag.WRITE
- || tag == LogicalOperatorTag.INSERT_DELETE || tag == LogicalOperatorTag.WRITE_RESULT)
- return false;
-
- AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
- if (op2.getOperatorTag() == LogicalOperatorTag.DIE) {
- op0.getInputs().get(0).setValue(op2);
- op1.getInputs().clear();
- for (Mutable<ILogicalOperator> ref : op2.getInputs())
- op1.getInputs().add(ref);
- op2.getInputs().clear();
- op2.getInputs().add(new MutableObject<ILogicalOperator>(op1));
-
- context.computeAndSetTypeEnvironmentForOperator(op0);
- context.computeAndSetTypeEnvironmentForOperator(op1);
- context.computeAndSetTypeEnvironmentForOperator(op2);
- return true;
- } else {
- return false;
- }
- }
-}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 82d678b..c19ef65 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -53,7 +53,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SinkPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SinkWritePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamDiePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamLimitPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamProjectPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamSelectPOperator;
@@ -280,10 +279,6 @@
op.setPhysicalOperator(new SinkPOperator());
break;
}
- case DIE: {
- op.setPhysicalOperator(new StreamDiePOperator());
- break;
- }
}
}
if (op.hasNestedPlans()) {
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index cd9931b..79e964b 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -54,6 +54,11 @@
}
protected void appendProjectionToFrame(int tIndex, int[] projectionList) throws HyracksDataException {
+ appendProjectionToFrame(tIndex, projectionList, false);
+ }
+
+ protected void appendProjectionToFrame(int tIndex, int[] projectionList, boolean flushFrame)
+ throws HyracksDataException {
if (!appender.appendProjection(tAccess, tIndex, projectionList)) {
FrameUtils.flushFrame(frame, writer);
appender.reset(frame, true);
@@ -61,6 +66,11 @@
throw new IllegalStateException(
"Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendProjectionToFrame).");
}
+ return;
+ }
+ if (flushFrame) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
}
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java
deleted file mode 100644
index 508ce5d..0000000
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java
+++ /dev/null
@@ -1,100 +0,0 @@
-package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspector;
-import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.data.std.api.IPointable;
-import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
-
-public class StreamDieRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
-
- private static final long serialVersionUID = 1L;
-
- private IScalarEvaluatorFactory aftterObjectsEvalFactory;
- private IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
-
- public StreamDieRuntimeFactory(IScalarEvaluatorFactory maxObjectsEvalFactory, int[] projectionList,
- IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory) {
- super(projectionList);
- this.aftterObjectsEvalFactory = maxObjectsEvalFactory;
- this.binaryIntegerInspectorFactory = binaryIntegerInspectorFactory;
- }
-
- @Override
- public String toString() {
- String s = "stream-die " + aftterObjectsEvalFactory.toString();
- return s;
- }
-
- @Override
- public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) {
- final IBinaryIntegerInspector bii = binaryIntegerInspectorFactory.createBinaryIntegerInspector(ctx);
- return new AbstractOneInputOneOutputOneFramePushRuntime() {
- private IPointable p = VoidPointable.FACTORY.createPointable();
- private IScalarEvaluator evalAfterObjects;
- private int toWrite = -1;
-
- @Override
- public void open() throws HyracksDataException {
- if (evalAfterObjects == null) {
- initAccessAppendRef(ctx);
- try {
- evalAfterObjects = aftterObjectsEvalFactory.createScalarEvaluator(ctx);
- } catch (AlgebricksException ae) {
- throw new HyracksDataException(ae);
- }
- }
- writer.open();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- tAccess.reset(buffer);
- int nTuple = tAccess.getTupleCount();
- if (toWrite < 0) {
- toWrite = evaluateInteger(evalAfterObjects, 0);
- }
- for (int t = 0; t < nTuple; t++) {
- if (toWrite > 0) {
- toWrite--;
- if (projectionList != null) {
- appendProjectionToFrame(t, projectionList);
- } else {
- appendTupleToFrame(t);
- }
- } else {
- throw new HyracksDataException("injected failure");
-// System.out.println("Injected Kill-JVM");
-// System.exit(-1);
- }
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- super.close();
- }
-
- private int evaluateInteger(IScalarEvaluator eval, int tIdx) throws HyracksDataException {
- tRef.reset(tAccess, tIdx);
- try {
- eval.evaluate(tRef, p);
- } catch (AlgebricksException ae) {
- throw new HyracksDataException(ae);
- }
- int lim = bii.getIntegerValue(p.getByteArray(), p.getStartOffset(), p.getLength());
- return lim;
- }
-
- };
- }
-
-}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
index 455ad8e..0e6fc15 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -26,9 +26,16 @@
public class StreamProjectRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
private static final long serialVersionUID = 1L;
+ private final boolean flushFramesRapidly;
+
+ public StreamProjectRuntimeFactory(int[] projectionList, boolean flushFramesRapidly) {
+ super(projectionList);
+ this.flushFramesRapidly = flushFramesRapidly;
+ }
public StreamProjectRuntimeFactory(int[] projectionList) {
super(projectionList);
+ this.flushFramesRapidly = false;
}
@Override
@@ -57,7 +64,16 @@
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
tAccess.reset(buffer);
int nTuple = tAccess.getTupleCount();
- for (int t = 0; t < nTuple; t++) {
+
+ int t = 0;
+ for (; t < nTuple - 1; t++) {
+ appendProjectionToFrame(t, projectionList);
+ }
+ if (flushFramesRapidly) {
+ // Whenever all the tuples in the incoming frame have been consumed, the project operator
+ // will push its frame to the next operator; i.e., it won't wait until the frame gets full.
+ appendProjectionToFrame(t, projectionList, true);
+ } else {
appendProjectionToFrame(t, projectionList);
}
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 2847004..81c2367 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -647,4 +647,9 @@
btree.validate();
}
}
+
+ @Override
+ public String toString() {
+ return "LSMBTree [" + fileManager.getBaseDir() + "]";
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index e4dd369..12c7ea7 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -190,4 +190,9 @@
public IBufferCache getBufferCache() {
return diskBufferCache;
}
+
+ @Override
+ public String toString() {
+ return "LSMIndex [" + fileManager.getBaseDir() + "]";
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 65e6a3d..7372b86 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -16,8 +16,9 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -35,6 +36,8 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
public class LSMHarness implements ILSMHarness {
+ private static final Logger LOGGER = Logger.getLogger(LSMHarness.class.getName());
+
private final ILSMIndexInternal lsmIndex;
private final ILSMMergePolicy mergePolicy;
private final ILSMOperationTracker opTracker;
@@ -171,7 +174,7 @@
}
lsmIndex.setFlushStatus(false);
-
+
if (!lsmIndex.scheduleFlush(ctx, callback)) {
callback.beforeOperation();
callback.afterOperation(null, null);
@@ -184,7 +187,11 @@
public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
IndexException {
operation.getCallback().beforeOperation();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(lsmIndex + ": flushing");
+ }
ILSMComponent newComponent = lsmIndex.flush(operation);
+
operation.getCallback().afterOperation(null, newComponent);
lsmIndex.markAsValid(newComponent);
operation.getCallback().afterFinalize(newComponent);
@@ -215,6 +222,9 @@
IndexException {
List<ILSMComponent> mergedComponents = new ArrayList<ILSMComponent>();
operation.getCallback().beforeOperation();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(lsmIndex + ": merging");
+ }
ILSMComponent newComponent = lsmIndex.merge(mergedComponents, operation);
ctx.getComponentHolder().addAll(mergedComponents);
operation.getCallback().afterOperation(mergedComponents, newComponent);
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 33052f3..30fdd27 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -566,6 +566,7 @@
public class LSMInvertedIndexBulkLoader implements IIndexBulkLoader {
private final ILSMComponent component;
private final IIndexBulkLoader invIndexBulkLoader;
+ private boolean exceptionCaught = false;
public LSMInvertedIndexBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
throws IndexException {
@@ -599,6 +600,7 @@
}
protected void handleException() throws HyracksDataException {
+ exceptionCaught = true;
((LSMInvertedIndexImmutableComponent) component).getInvIndex().deactivate();
((LSMInvertedIndexImmutableComponent) component).getInvIndex().destroy();
((LSMInvertedIndexImmutableComponent) component).getDeletedKeysBTree().deactivate();
@@ -609,7 +611,9 @@
@Override
public void end() throws IndexException, HyracksDataException {
- invIndexBulkLoader.end();
+ if (!exceptionCaught) {
+ invIndexBulkLoader.end();
+ }
lsmHarness.addBulkLoadedComponent(component);
}
}
@@ -732,4 +736,9 @@
component.getDeletedKeysBTree().validate();
}
}
+
+ @Override
+ public String toString() {
+ return "LSMInvertedIndex [" + fileManager.getBaseDir() + "]";
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index 2668113..4b2e075 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -362,4 +362,9 @@
InMemoryBufferCache memBufferCache = (InMemoryBufferCache) mutableComponent.getRTree().getBufferCache();
return memBufferCache.getNumPages() * memBufferCache.getPageSize();
}
+
+ @Override
+ public String toString() {
+ return "LSMRTree [" + fileManager.getBaseDir() + "]";
+ }
}