Merge branch 'master' into yingyi/fullstack_fix
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
index 7efee75..b9ac62f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
@@ -23,12 +23,22 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
public class UnnestOperator extends AbstractUnnestOperator {
private LogicalVariable positionalVariable;
+
+ /**
+ * Used to set the position offset for positional variable
+ */
+ private ILogicalExpression positionOffsetExpr;
+
+ /**
+ * Specify the type of the positional variable
+ */
private Object positionalVariableType;
public UnnestOperator(LogicalVariable variable, Mutable<ILogicalExpression> expression) {
@@ -67,6 +77,14 @@
return positionalVariableType;
}
+ public void setPositionOffsetExpr(ILogicalExpression posOffsetExpr) {
+ this.positionOffsetExpr = posOffsetExpr;
+ }
+
+ public ILogicalExpression getPositionOffsetExpr() {
+ return this.positionOffsetExpr;
+ }
+
@Override
public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
return visitor.visitUnnestOperator(this, arg);
@@ -88,4 +106,24 @@
}
return env;
}
+
+ @Override
+ public VariablePropagationPolicy getVariablePropagationPolicy() {
+ return new VariablePropagationPolicy() {
+
+ @Override
+ public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
+ throws AlgebricksException {
+ if (sources.length > 0) {
+ target.addAllVariables(sources[0]);
+ }
+ for (LogicalVariable v : variables) {
+ target.addVariable(v);
+ }
+ if (positionalVariable != null) {
+ target.addVariable(positionalVariable);
+ }
+ }
+ };
+ }
}
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index cebddee..3a2617f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -404,7 +404,8 @@
@Override
public Void visitRunningAggregateOperator(RunningAggregateOperator op, IOptimizationContext ctx)
throws AlgebricksException {
- propagateFDsAndEquivClasses(op, ctx);
+ ctx.putEquivalenceClassMap(op, new HashMap<LogicalVariable, EquivalenceClass>());
+ ctx.putFDList(op, new ArrayList<FunctionalDependency>());
return null;
}
@@ -646,4 +647,4 @@
return null;
}
-}
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
index 51b009f..bcd998d 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
@@ -34,6 +35,7 @@
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.NestedPlansAccumulatingAggregatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.NestedPlansRunningAggregatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -76,13 +78,19 @@
}
// compile subplans and set the gby op. schema accordingly
AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], gby, opSchema, context);
- IAggregatorDescriptorFactory aggregatorFactory = new NestedPlansAccumulatingAggregatorFactory(subplans, keys,
- fdColumns);
+ IAggregatorDescriptorFactory aggregatorFactory;
+
+ if (((AbstractLogicalOperator) (gby.getNestedPlans().get(0).getRoots().get(0).getValue())).getOperatorTag() == LogicalOperatorTag.RUNNINGAGGREGATE) {
+ aggregatorFactory = new NestedPlansRunningAggregatorFactory(subplans, keys, fdColumns);
+ } else {
+ aggregatorFactory = new NestedPlansAccumulatingAggregatorFactory(subplans, keys, fdColumns);
+ }
IOperatorDescriptorRegistry spec = builder.getJobSpec();
IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
columnList, context.getTypeEnvironment(op), context);
- RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
+ RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
+ context);
PreclusteredGroupOperatorDescriptor opDesc = new PreclusteredGroupOperatorDescriptor(spec, keys,
comparatorFactories, aggregatorFactory, recordDescriptor);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
index 21def02..c5326ce 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
@@ -30,8 +30,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
@@ -62,22 +60,8 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
IPhysicalPropertiesVector reqdByParent) {
- IPartitioningProperty pp = null;
- RunningAggregateOperator ragg = (RunningAggregateOperator) op;
- for (Mutable<ILogicalExpression> exprRef : ragg.getExpressions()) {
- StatefulFunctionCallExpression f = (StatefulFunctionCallExpression) exprRef.getValue();
- IPartitioningProperty p = f.getRequiredPartitioningProperty();
- if (p != null) {
- if (pp == null) {
- pp = p;
- } else {
- throw new IllegalStateException("Two stateful functions want to set partitioning requirements: "
- + pp + " and " + p);
- }
- }
- }
- StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, null) };
- return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+
+ return emptyUnaryRequirements();
}
@Override
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
index bf45e18..bf2f68a 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
@@ -30,6 +30,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.UnnestRuntimeFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -57,10 +58,7 @@
IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
UnnestOperator unnest = (UnnestOperator) op;
- if (unnest.getPositionalVariable() != null) {
- throw new AlgebricksException("Cannot generate runtime for unnest with positional variable "
- + unnest.getPositionalVariable());
- }
+
int outCol = opSchema.findVariable(unnest.getVariable());
ILogicalExpression unnestExpr = unnest.getExpressionRef().getValue();
IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
@@ -79,8 +77,18 @@
UnnestingFunctionCallExpression agg = (UnnestingFunctionCallExpression) unnestExpr;
IUnnestingEvaluatorFactory unnestingFactory = expressionRuntimeProvider.createUnnestingFunctionFactory(agg,
context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
+
+ // for position offset
+ ILogicalExpression posOffsetExpr = unnest.getPositionOffsetExpr();
+ IScalarEvaluatorFactory posOffsetExprEvalFactory = null;
+ if (posOffsetExpr != null) {
+ posOffsetExprEvalFactory = expressionRuntimeProvider.createEvaluatorFactory(posOffsetExpr,
+ context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
+ }
+
int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
- UnnestRuntimeFactory unnestRuntime = new UnnestRuntimeFactory(outCol, unnestingFactory, projectionList);
+ UnnestRuntimeFactory unnestRuntime = new UnnestRuntimeFactory(outCol, unnestingFactory, projectionList,
+ unnest.getPositionalVariable() != null, posOffsetExprEvalFactory);
RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
builder.contributeMicroOperator(unnest, unnestRuntime, recDesc);
ILogicalOperator src = unnest.getInputs().get(0).getValue();
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java
index af11b70..82e6970 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java
@@ -28,6 +28,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
public class IntroHashPartitionMergeExchange implements IAlgebraicRewriteRule {
@@ -60,6 +61,7 @@
hpe.getDomain());
op1.setPhysicalOperator(hpme);
op1.getInputs().get(0).setValue(op2.getInputs().get(0).getValue());
+ op1.computeDeliveredPhysicalProperties(context);
return true;
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index 8f2eaac..09354a5 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -28,11 +28,11 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-public class NestedPlansAccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulatingAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
private AlgebricksPipeline[] subplans;
@@ -95,7 +95,7 @@
}
@Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
for (int i = 0; i < pipelines.length; i++) {
outputWriter.setInputIdx(i);
@@ -114,6 +114,7 @@
offset = fieldEnds[i] - start;
tupleBuilder.addField(data, start, offset);
}
+ return true;
}
@Override
@@ -127,7 +128,7 @@
}
@Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
throw new IllegalStateException("this method should not be called");
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
new file mode 100644
index 0000000..dc9c805
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -0,0 +1,252 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class NestedPlansRunningAggregatorFactory implements IAggregatorDescriptorFactory {
+
+ private static final long serialVersionUID = 1L;
+ private AlgebricksPipeline[] subplans;
+ private int[] keyFieldIdx;
+ private int[] decorFieldIdx;
+
+ public NestedPlansRunningAggregatorFactory(AlgebricksPipeline[] subplans, int[] keyFieldIdx, int[] decorFieldIdx) {
+ this.subplans = subplans;
+ this.keyFieldIdx = keyFieldIdx;
+ this.decorFieldIdx = decorFieldIdx;
+ }
+
+ /* (non-Javadoc)
+ * @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[], int[])
+ */
+ @Override
+ public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults,
+ final IFrameWriter writer) throws HyracksDataException {
+ final RunningAggregatorOutput outputWriter = new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length,
+ decorFieldIdx.length, writer);
+ final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
+ for (int i = 0; i < subplans.length; i++) {
+ try {
+ pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], outputWriter, ctx);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ final ArrayTupleBuilder gbyTb = outputWriter.getGroupByTupleBuilder();
+
+ final ByteBuffer outputFrame = ctx.allocateFrame();
+ final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+ outputAppender.reset(outputFrame, true);
+
+ return new IAggregatorDescriptor() {
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+
+ for (int i = 0; i < pipelines.length; ++i) {
+ pipelines[i].open();
+ }
+
+ gbyTb.reset();
+ for (int i = 0; i < keyFieldIdx.length; ++i) {
+ gbyTb.addField(accessor, tIndex, keyFieldIdx[i]);
+ }
+ for (int i = 0; i < decorFieldIdx.length; ++i) {
+ gbyTb.addField(accessor, tIndex, decorFieldIdx[i]);
+ }
+
+ // aggregate the first tuple
+ for (int i = 0; i < pipelines.length; i++) {
+ outputWriter.setInputIdx(i);
+ pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
+ pipelines[i].forceFlush();
+ }
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ for (int i = 0; i < pipelines.length; i++) {
+ outputWriter.setInputIdx(i);
+ pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
+ pipelines[i].forceFlush();
+ }
+ }
+
+ @Override
+ public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ return false;
+ }
+
+ @Override
+ public AggregateState createAggregateStates() {
+ return new AggregateState();
+ }
+
+ @Override
+ public void reset() {
+
+ }
+
+ @Override
+ public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
+ int tIndex, AggregateState state) throws HyracksDataException {
+ throw new IllegalStateException("this method should not be called");
+ }
+
+ @Override
+ public void close() {
+
+ }
+ };
+ }
+
+ private IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, IHyracksTaskContext ctx)
+ throws AlgebricksException, HyracksDataException {
+ // plug the operators
+ IFrameWriter start = writer;
+ IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
+ RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
+ for (int i = runtimeFactories.length - 1; i >= 0; i--) {
+ IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx);
+ newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
+ if (i > 0) {
+ newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]);
+ } else {
+ // the nts has the same input and output rec. desc.
+ newRuntime.setInputRecordDescriptor(0, recordDescriptors[0]);
+ }
+ start = newRuntime;
+ }
+ return start;
+ }
+
+ private static class RunningAggregatorOutput implements IFrameWriter {
+
+ private FrameTupleAccessor[] tAccess;
+ private RecordDescriptor[] inputRecDesc;
+ private int inputIdx;
+ private ArrayTupleBuilder tb;
+ private ArrayTupleBuilder gbyTb;
+ private AlgebricksPipeline[] subplans;
+ private IFrameWriter outputWriter;
+ private ByteBuffer outputFrame;
+ private FrameTupleAppender outputAppender;
+
+ public RunningAggregatorOutput(IHyracksTaskContext ctx, AlgebricksPipeline[] subplans, int numKeys,
+ int numDecors, IFrameWriter outputWriter) throws HyracksDataException {
+ this.subplans = subplans;
+ this.outputWriter = outputWriter;
+
+ // this.keyFieldIndexes = keyFieldIndexes;
+ int totalAggFields = 0;
+ this.inputRecDesc = new RecordDescriptor[subplans.length];
+ for (int i = 0; i < subplans.length; i++) {
+ RecordDescriptor[] rd = subplans[i].getRecordDescriptors();
+ this.inputRecDesc[i] = rd[rd.length - 1];
+ totalAggFields += subplans[i].getOutputWidth();
+ }
+ tb = new ArrayTupleBuilder(numKeys + numDecors + totalAggFields);
+ gbyTb = new ArrayTupleBuilder(numKeys + numDecors);
+
+ this.tAccess = new FrameTupleAccessor[inputRecDesc.length];
+ for (int i = 0; i < inputRecDesc.length; i++) {
+ tAccess[i] = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc[i]);
+ }
+
+ this.outputFrame = ctx.allocateFrame();
+ this.outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+ this.outputAppender.reset(outputFrame, true);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ int w = subplans[inputIdx].getOutputWidth();
+ IFrameTupleAccessor accessor = tAccess[inputIdx];
+ accessor.reset(buffer);
+ for (int tIndex = 0; tIndex < accessor.getTupleCount(); tIndex++) {
+ tb.reset();
+ byte[] data = gbyTb.getByteArray();
+ int[] fieldEnds = gbyTb.getFieldEndOffsets();
+ int start = 0;
+ int offset = 0;
+ for (int i = 0; i < fieldEnds.length; i++) {
+ if (i > 0)
+ start = fieldEnds[i - 1];
+ offset = fieldEnds[i] - start;
+ tb.addField(data, start, offset);
+ }
+ for (int f = 0; f < w; f++) {
+ tb.addField(accessor, tIndex, f);
+ }
+ if (!outputAppender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(outputFrame, outputWriter);
+ outputAppender.reset(outputFrame, true);
+ if (!outputAppender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new HyracksDataException(
+ "Failed to write a running aggregation result into an empty frame: possibly the size of the result is too large.");
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ // clearFrame();
+ }
+
+ public void setInputIdx(int inputIdx) {
+ this.inputIdx = inputIdx;
+ }
+
+ public ArrayTupleBuilder getGroupByTupleBuilder() {
+ return gbyTb;
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+
+ }
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
index d4161fc..39db3e1 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
@@ -25,11 +25,11 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-public class SerializableAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+public class SerializableAggregatorDescriptorFactory extends AbstractAccumulatingAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
private ICopySerializableAggregateFunctionFactory[] aggFactories;
@@ -109,7 +109,7 @@
}
@Override
- public void outputPartialResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+ public boolean outputPartialResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
byte[] data = accessor.getBuffer().array();
int startOffset = accessor.getTupleStartOffset(tIndex);
@@ -125,10 +125,11 @@
throw new HyracksDataException(e);
}
}
+ return true;
}
@Override
- public void outputFinalResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+ public boolean outputFinalResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
byte[] data = accessor.getBuffer().array();
int startOffset = accessor.getTupleStartOffset(tIndex);
@@ -144,6 +145,7 @@
throw new HyracksDataException(e);
}
}
+ return true;
}
@Override
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
index 8e472ec..9b638b4 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
@@ -25,11 +25,11 @@
import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-public class SimpleAlgebricksAccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+public class SimpleAlgebricksAccumulatingAggregatorFactory extends AbstractAccumulatingAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
private IAggregateEvaluatorFactory[] aggFactories;
@@ -86,7 +86,7 @@
}
@Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
IAggregateEvaluator[] agg = (IAggregateEvaluator[]) state.state;
for (int i = 0; i < agg.length; i++) {
@@ -97,6 +97,7 @@
throw new HyracksDataException(e);
}
}
+ return true;
}
@Override
@@ -118,7 +119,7 @@
}
@Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
throw new IllegalStateException("this method should not be called");
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 6c1dd5e..b079c3e 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -51,8 +51,8 @@
FrameUtils.flushFrame(frame, writer);
appender.reset(frame, true);
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException(
- "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendToFrameFromTupleBuilder).");
+ throw new HyracksDataException(
+ "Could not write frame: the size of the tuple is too long to be fit into a single frame. (AbstractOneInputOneOutputOneFramePushRuntime.appendToFrameFromTupleBuilder)");
}
}
if (flushFrame) {
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
index 200e5c5..7ecb288 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
@@ -63,8 +63,6 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- final IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc,
- outRecordDesc, groupFields, groupFields);
final ByteBuffer copyFrame = ctx.allocateFrame();
final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
copyFrameAccessor.reset(copyFrame);
@@ -78,6 +76,8 @@
@Override
public void open() throws HyracksDataException {
+ IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc,
+ outRecordDesc, groupFields, groupFields, writer);
pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregator, inRecordDesc,
outRecordDesc, writer);
pgw.open();
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 0499684..e5bede2 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -107,7 +107,8 @@
FrameUtils.flushFrame(frame, writer);
appender.reset(frame, true);
if (!appender.appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t)) {
- throw new IllegalStateException("Could not write frame.");
+ throw new HyracksDataException(
+ "Could not write frame: subplan result is larger than the single-frame limit.");
}
}
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index 5486fae..55dbcbb 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -21,6 +21,7 @@
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
public class NestedTupleSourceRuntimeFactory implements IPushRuntimeFactory {
@@ -64,5 +65,10 @@
public void fail() throws HyracksDataException {
writer.fail();
}
+
+ public void forceFlush() throws HyracksDataException {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ }
}
}
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index e92724a..cfc2bb6 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -28,6 +28,7 @@
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
@@ -89,6 +90,10 @@
@Override
public void open() throws HyracksDataException {
+ if(!first){
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ }
initAccessAppendRef(ctx);
if (first) {
first = false;
@@ -96,12 +101,18 @@
for (int i = 0; i < n; i++) {
try {
raggs[i] = runningAggregates[i].createRunningAggregateEvaluator();
- raggs[i].init();
} catch (AlgebricksException ae) {
throw new HyracksDataException(ae);
}
}
}
+ for (int i = 0; i < runningAggregates.length; i++) {
+ try {
+ raggs[i].init();
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ }
writer.open();
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index df70437..88dc19f 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -14,11 +14,15 @@
*/
package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+import java.io.IOException;
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.evaluators.ConstantEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -26,6 +30,7 @@
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
@@ -36,12 +41,20 @@
private int outColPos;
private final boolean outColIsProjected;
+ private final boolean hasPositionalVariable;
+ private IScalarEvaluatorFactory posOffsetEvalFactory;
+
// Each time step() is called on the aggregate, a new value is written in
// its output. One byte is written before that value and is neglected.
// By convention, if the aggregate function writes nothing, it means it
// produced the last value.
public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory unnestingFactory, int[] projectionList) {
+ this(outCol, unnestingFactory, projectionList, false, null);
+ }
+
+ public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory unnestingFactory, int[] projectionList,
+ boolean hashPositionalVariable, IScalarEvaluatorFactory posOffsetEvalFactory) {
super(projectionList);
this.outCol = outCol;
this.unnestingFactory = unnestingFactory;
@@ -52,6 +65,11 @@
}
}
outColIsProjected = outColPos >= 0;
+ this.hasPositionalVariable = hashPositionalVariable;
+ this.posOffsetEvalFactory = posOffsetEvalFactory;
+ if (this.posOffsetEvalFactory == null) {
+ this.posOffsetEvalFactory = new ConstantEvaluatorFactory(new byte[5]);
+ }
}
@Override
@@ -68,6 +86,9 @@
private IUnnestingEvaluator agg;
private ArrayTupleBuilder tupleBuilder;
+ private int tupleCount;
+ private IScalarEvaluator offsetEval = posOffsetEvalFactory.createScalarEvaluator(ctx);
+
@Override
public void open() throws HyracksDataException {
initAccessAppendRef(ctx);
@@ -77,6 +98,7 @@
throw new HyracksDataException(ae);
}
tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+ tupleCount = 1;
writer.open();
}
@@ -86,6 +108,16 @@
int nTuple = tAccess.getTupleCount();
for (int t = 0; t < nTuple; t++) {
tRef.reset(tAccess, t);
+
+ try {
+ offsetEval.evaluate(tRef, p);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+
+ @SuppressWarnings("static-access")
+ int offset = IntegerSerializerDeserializer.INSTANCE.getInt(p.getByteArray(), p.getStartOffset());
+
try {
agg.init(tRef);
boolean goon = true;
@@ -94,21 +126,33 @@
if (!agg.step(p)) {
goon = false;
} else {
- if (!outColIsProjected) {
+
+ if (!outColIsProjected && !hasPositionalVariable) {
appendProjectionToFrame(t, projectionList);
} else {
for (int f = 0; f < outColPos; f++) {
tupleBuilder.addField(tAccess, t, f);
}
- tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
- for (int f = outColPos + 1; f < projectionList.length; f++) {
+ if (outColIsProjected) {
+ tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
+ } else {
+ tupleBuilder.addField(tAccess, t, outColPos);
+ }
+ for (int f = outColPos + 1; f < (hasPositionalVariable ? projectionList.length - 1
+ : projectionList.length); f++) {
tupleBuilder.addField(tAccess, t, f);
}
}
+ if (hasPositionalVariable) {
+ // Write the positional variable as an INT32
+ tupleBuilder.getDataOutput().writeByte(3);
+ tupleBuilder.getDataOutput().writeInt(offset + tupleCount++);
+ tupleBuilder.addFieldEndOffset();
+ }
appendToFrameFromTupleBuilder(tupleBuilder);
}
} while (goon);
- } catch (AlgebricksException ae) {
+ } catch (AlgebricksException | IOException ae) {
throw new HyracksDataException(ae);
}
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
index c75f9f9..1af32de 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
@@ -25,4 +25,6 @@
public IIOManager getIOManager();
public ByteBuffer allocateFrame() throws HyracksDataException;
+
+ public void deallocateFrames(int frameCount);
}
\ No newline at end of file
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
index aa7008f..3ea81ef 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
@@ -41,4 +41,10 @@
return ByteBuffer.allocate(frameSize);
}
+ @Override
+ public void deallocateFrames(int frameCount) {
+ // TODO Auto-generated method stub
+
+ }
+
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index c72ced1..20075ff 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -115,6 +115,11 @@
}
@Override
+ public void deallocateFrames(int frameCount) {
+ joblet.deallocateFrames(frameCount);
+ }
+
+ @Override
public int getFrameSize() {
return joblet.getFrameSize();
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/memory/MemoryManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/memory/MemoryManager.java
index 8add260..4829173 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/memory/MemoryManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/memory/MemoryManager.java
@@ -26,10 +26,11 @@
@Override
public boolean allocate(long memory) {
- if (this.memory.addAndGet(-memory) < 0) {
- this.memory.addAndGet(memory);
- return false;
- }
+ // commented as now the deallocation is not implemented yet.
+ // if (this.memory.addAndGet(-memory) < 0) {
+ // this.memory.addAndGet(memory);
+ // return false;
+ // }
return true;
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..48ee410
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractAccumulatingAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ /* (non-Javadoc)
+ * @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[], int[], edu.uci.ics.hyracks.api.comm.IFrameWriter)
+ */
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults, IFrameWriter writer)
+ throws HyracksDataException {
+ return this
+ .createAggregator(ctx, inRecordDescriptor, outRecordDescriptor, keyFields, keyFieldsInPartialResults);
+ }
+
+ abstract protected IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
+ RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int[] keyFields,
+ final int[] keyFieldsInPartialResults) throws HyracksDataException;
+
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractRunningAggregatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractRunningAggregatorDescriptor.java
new file mode 100644
index 0000000..3dcd4c3
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractRunningAggregatorDescriptor.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public abstract class AbstractRunningAggregatorDescriptor implements IAggregatorDescriptor {
+
+ /* (non-Javadoc)
+ * @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor#outputPartialResult(edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder, edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor, int, edu.uci.ics.hyracks.dataflow.std.group.AggregateState)
+ */
+ @Override
+ public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor#outputFinalResult(edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder, edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor, int, edu.uci.ics.hyracks.dataflow.std.group.AggregateState)
+ */
+ @Override
+ public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ return false;
+ }
+
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index ecd5284..df74a93 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -38,9 +38,6 @@
import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
-/**
- *
- */
public class HashSpillableTableFactory implements ISpillableTableFactory {
private static final long serialVersionUID = 1L;
@@ -104,7 +101,7 @@
}
final IAggregatorDescriptor aggregator = aggregateFactory.createAggregator(ctx, inRecordDescriptor,
- outRecordDescriptor, keyFields, keyFieldsInPartialResults);
+ outRecordDescriptor, keyFields, keyFieldsInPartialResults, null);
final AggregateState aggregateState = aggregator.createAggregateStates();
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
index afe460c..f9d4cd3 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
@@ -18,9 +18,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-/**
- *
- */
public interface IAggregatorDescriptor {
/**
@@ -82,9 +79,10 @@
* @param offset
* @param state
* The aggregation state.
+ * @return TODO
* @throws HyracksDataException
*/
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException;
/**
@@ -97,9 +95,10 @@
* @param offset
* @param state
* The aggregation state.
+ * @return TODO
* @throws HyracksDataException
*/
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException;
public void close();
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
index a4c0564..241956b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
@@ -16,17 +16,15 @@
import java.io.Serializable;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-/**
- *
- */
public interface IAggregatorDescriptorFactory extends Serializable {
IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults)
- throws HyracksDataException;
+ RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults,
+ IFrameWriter writer) throws HyracksDataException;
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
index 239f24f..08b2218 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
@@ -21,16 +21,14 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
-/**
- *
- */
-public class MultiFieldsAggregatorFactory implements IAggregatorDescriptorFactory {
+public class MultiFieldsAggregatorFactory extends AbstractAccumulatingAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
private final IFieldAggregateDescriptorFactory[] aggregatorFactories;
@@ -78,8 +76,8 @@
}
@Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
+ public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
+ int tIndex, AggregateState state) throws HyracksDataException {
DataOutput dos = tupleBuilder.getDataOutput();
int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -90,11 +88,11 @@
((AggregateState[]) state.state)[i]);
tupleBuilder.addFieldEndOffset();
}
-
+ return true;
}
@Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
DataOutput dos = tupleBuilder.getDataOutput();
@@ -110,6 +108,7 @@
}
tupleBuilder.addFieldEndOffset();
}
+ return true;
}
@Override
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
index bb3d0f3..fdb59a4 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
@@ -95,7 +95,7 @@
}
aggregator = mergerFactory.createAggregator(ctx, outRecordDescriptor, outRecordDescriptor, keyFields,
- keyFieldsInPartialResults);
+ keyFieldsInPartialResults, writer);
aggregateState = aggregator.createAggregateStates();
storedKeys = new int[keyFields.length];
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java
index f8ba003..3c0eb2b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java
@@ -121,7 +121,7 @@
}
this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDescriptor, outRecordDescriptor, fields,
- keyFieldsInPartialResults);
+ keyFieldsInPartialResults, null);
this.aggregateStates = new AggregateState[INIT_AGG_STATE_SIZE];
accumulatorSize = 0;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
index 35cb9e1..71af928 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
@@ -55,7 +55,7 @@
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
final IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDescriptor,
- outRecordDescriptor, groupFields, groupFields);
+ outRecordDescriptor, groupFields, groupFields, writer);
final ByteBuffer copyFrame = ctx.allocateFrame();
final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDescriptor);
copyFrameAccessor.reset(copyFrame);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 7773765..c7a9f6d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -44,6 +44,8 @@
private boolean first;
+ private boolean isFailed = false;
+
public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc,
IFrameWriter writer) throws HyracksDataException {
@@ -119,9 +121,9 @@
for (int j = 0; j < groupFields.length; j++) {
tupleBuilder.addField(lastTupleAccessor, lastTupleIndex, groupFields[j]);
}
- aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState);
+ boolean hasOutput = aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState);
- if (!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ if (hasOutput && !appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
tupleBuilder.getSize())) {
FrameUtils.flushFrame(outFrame, writer);
appender.reset(outFrame, true);
@@ -149,12 +151,13 @@
@Override
public void fail() throws HyracksDataException {
+ isFailed = true;
writer.fail();
}
@Override
public void close() throws HyracksDataException {
- if (!first) {
+ if (!isFailed && !first) {
writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
if (appender.getTupleCount() > 0) {
FrameUtils.flushFrame(outFrame, writer);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
index 1bd22d7..5ac2961 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
@@ -19,6 +19,8 @@
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
import org.junit.Before;
import org.junit.Test;
@@ -83,7 +85,11 @@
TestStorageManagerComponentHolder.init(8192, 20, 20);
}
- protected static final int MERGE_THRESHOLD = 3;
+ protected static final Map<String, String> MERGE_POLICY_PROPERTIES;
+ static {
+ MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+ MERGE_POLICY_PROPERTIES.put("num-components", "3");
+ }
protected IVirtualBufferCacheProvider virtualBufferCacheProvider = new TestVirtualBufferCacheProvider(
DEFAULT_MEM_PAGE_SIZE, DEFAULT_MEM_NUM_PAGES);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java
index fc932a3..8addae7 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java
@@ -26,7 +26,7 @@
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -52,7 +52,7 @@
PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) };
invertedIndexDataflowHelperFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
- virtualBufferCacheProvider, new ConstantMergePolicyProvider(MERGE_THRESHOLD),
+ virtualBufferCacheProvider, new ConstantMergePolicyFactory(), MERGE_POLICY_PROPERTIES,
ThreadCountingOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
NoOpIOOperationCallback.INSTANCE, DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
index 75045b4..82de0c9 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
@@ -24,7 +24,7 @@
import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -47,9 +47,9 @@
PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) };
invertedIndexDataflowHelperFactory = new LSMInvertedIndexDataflowHelperFactory(virtualBufferCacheProvider,
- new ConstantMergePolicyProvider(MERGE_THRESHOLD), ThreadCountingOperationTrackerProvider.INSTANCE,
- SynchronousSchedulerProvider.INSTANCE, NoOpIOOperationCallback.INSTANCE,
- DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
+ new ConstantMergePolicyFactory(), MERGE_POLICY_PROPERTIES,
+ ThreadCountingOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
+ NoOpIOOperationCallback.INSTANCE, DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
}
@Override
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
index e98ecde..a4fd4c1 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
@@ -15,10 +15,13 @@
package edu.uci.ics.hyracks.tests.am.lsm.btree;
+import java.util.HashMap;
+import java.util.Map;
+
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -26,15 +29,19 @@
public class LSMBTreeOperatorTestHelper extends LSMTreeOperatorTestHelper {
- private static final int MERGE_THRESHOLD = 3;
-
+ private static final Map<String, String> MERGE_POLICY_PROPERTIES;
+ static {
+ MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+ MERGE_POLICY_PROPERTIES.put("num-components", "3");
+ }
+
public LSMBTreeOperatorTestHelper(IOManager ioManager) {
super(ioManager);
}
public IIndexDataflowHelperFactory createDataFlowHelperFactory() {
- return new LSMBTreeDataflowHelperFactory(virtualBufferCacheProvider, new ConstantMergePolicyProvider(
- MERGE_THRESHOLD), ThreadCountingOperationTrackerProvider.INSTANCE,
+ return new LSMBTreeDataflowHelperFactory(virtualBufferCacheProvider, new ConstantMergePolicyFactory(),
+ MERGE_POLICY_PROPERTIES, ThreadCountingOperationTrackerProvider.INSTANCE,
SynchronousSchedulerProvider.INSTANCE, NoOpIOOperationCallback.INSTANCE,
DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
index e7478d6..79b8eb7 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
@@ -15,12 +15,15 @@
package edu.uci.ics.hyracks.tests.am.lsm.rtree;
+import java.util.HashMap;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -30,7 +33,11 @@
public class LSMRTreeOperatorTestHelper extends LSMTreeOperatorTestHelper {
- private static final int MERGE_THRESHOLD = 3;
+ private static final Map<String, String> MERGE_POLICY_PROPERTIES;
+ static {
+ MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+ MERGE_POLICY_PROPERTIES.put("num-components", "3");
+ }
public LSMRTreeOperatorTestHelper(IOManager ioManager) {
super(ioManager);
@@ -40,7 +47,7 @@
IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
IBinaryComparatorFactory[] btreeComparatorFactories, ILinearizeComparatorFactory linearizerCmpFactory) {
return new LSMRTreeDataflowHelperFactory(valueProviderFactories, rtreePolicyType, btreeComparatorFactories,
- virtualBufferCacheProvider, new ConstantMergePolicyProvider(MERGE_THRESHOLD),
+ virtualBufferCacheProvider, new ConstantMergePolicyFactory(), MERGE_POLICY_PROPERTIES,
ThreadCountingOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
NoOpIOOperationCallback.INSTANCE, linearizerCmpFactory, DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java
index f883d90..fd6171a 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java
@@ -15,12 +15,15 @@
package edu.uci.ics.hyracks.tests.am.lsm.rtree;
+import java.util.HashMap;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -30,7 +33,11 @@
public class LSMRTreeWithAntiMatterTuplesOperatorTestHelper extends LSMTreeOperatorTestHelper {
- private static final int MERGE_THRESHOLD = 3;
+ private static final Map<String, String> MERGE_POLICY_PROPERTIES;
+ static {
+ MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+ MERGE_POLICY_PROPERTIES.put("num-components", "3");
+ }
public LSMRTreeWithAntiMatterTuplesOperatorTestHelper(IOManager ioManager) {
super(ioManager);
@@ -40,9 +47,8 @@
IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
IBinaryComparatorFactory[] btreeComparatorFactories, ILinearizeComparatorFactory linearizerCmpFactory) {
return new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(valueProviderFactories, rtreePolicyType,
- btreeComparatorFactories, virtualBufferCacheProvider, new ConstantMergePolicyProvider(MERGE_THRESHOLD),
- ThreadCountingOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
- NoOpIOOperationCallback.INSTANCE, linearizerCmpFactory);
+ btreeComparatorFactories, virtualBufferCacheProvider, new ConstantMergePolicyFactory(),
+ MERGE_POLICY_PROPERTIES, ThreadCountingOperationTrackerProvider.INSTANCE,
+ SynchronousSchedulerProvider.INSTANCE, NoOpIOOperationCallback.INSTANCE, linearizerCmpFactory);
}
-
}
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java
index 6e17ff8..9c2268d 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java
@@ -23,21 +23,20 @@
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
/**
- * Quick & dirty data generator for multi-thread testing.
- *
+ * Quick & dirty data generator for multi-thread testing.
*/
@SuppressWarnings("rawtypes")
public class DataGenThread extends Thread {
public final BlockingQueue<TupleBatch> tupleBatchQueue;
private final int maxNumBatches;
- private final int maxOutstandingBatches;
+ private final int maxOutstandingBatches;
private int numBatches = 0;
private final Random rnd;
-
+
// maxOutstandingBatches pre-created tuple-batches for populating the queue.
private TupleBatch[] tupleBatches;
private int ringPos;
-
+
public DataGenThread(int numConsumers, int maxNumBatches, int batchSize, ISerializerDeserializer[] fieldSerdes,
int payloadSize, int rndSeed, int maxOutstandingBatches, boolean sorted) {
this.maxNumBatches = maxNumBatches;
@@ -51,7 +50,7 @@
tupleBatchQueue = new LinkedBlockingQueue<TupleBatch>(maxOutstandingBatches);
ringPos = 0;
}
-
+
public DataGenThread(int numConsumers, int maxNumBatches, int batchSize, ISerializerDeserializer[] fieldSerdes,
IFieldValueGenerator[] fieldGens, int rndSeed, int maxOutstandingBatches) {
this.maxNumBatches = maxNumBatches;
@@ -64,13 +63,13 @@
tupleBatchQueue = new LinkedBlockingQueue<TupleBatch>(maxOutstandingBatches);
ringPos = 0;
}
-
+
@Override
public void run() {
- while(numBatches < maxNumBatches) {
+ while (numBatches < maxNumBatches) {
boolean added = false;
try {
- if (tupleBatches[ringPos].inUse.compareAndSet(false, true)) {
+ if (tupleBatches[ringPos].inUse.compareAndSet(false, true)) {
tupleBatches[ringPos].generate();
tupleBatchQueue.put(tupleBatches[ringPos]);
added = true;
@@ -89,11 +88,11 @@
}
}
}
-
+
public TupleBatch getBatch() throws InterruptedException {
return tupleBatchQueue.take();
}
-
+
public void releaseBatch(TupleBatch batch) {
batch.inUse.set(false);
}
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenUtils.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenUtils.java
index d7234fa..cb9c545 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenUtils.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenUtils.java
@@ -23,7 +23,7 @@
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-@SuppressWarnings("rawtypes")
+@SuppressWarnings("rawtypes")
public class DataGenUtils {
public static IFieldValueGenerator getFieldGenFromSerde(ISerializerDeserializer serde, Random rnd, boolean sorted) {
if (serde instanceof IntegerSerializerDeserializer) {
@@ -49,8 +49,9 @@
}
return null;
}
-
- public static IFieldValueGenerator[] getFieldGensFromSerdes(ISerializerDeserializer[] serdes, Random rnd, boolean sorted) {
+
+ public static IFieldValueGenerator[] getFieldGensFromSerdes(ISerializerDeserializer[] serdes, Random rnd,
+ boolean sorted) {
IFieldValueGenerator[] fieldValueGens = new IFieldValueGenerator[serdes.length];
for (int i = 0; i < serdes.length; i++) {
fieldValueGens[i] = getFieldGenFromSerde(serdes[i], rnd, sorted);
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java
index 91207e8..b5c0a1d 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java
@@ -25,5 +25,6 @@
PHYSICALDELETE,
NOOP,
MERGE,
+ FULL_MERGE,
FLUSH
}
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java
index 104a70d..0fdef13 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java
@@ -15,12 +15,14 @@
package edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
@@ -30,11 +32,11 @@
private static final long serialVersionUID = 1L;
public LSMBTreeDataflowHelperFactory(IVirtualBufferCacheProvider virtualBufferCacheProvider,
- ILSMMergePolicyProvider mergePolicyProvider, ILSMOperationTrackerProvider opTrackerFactory,
- ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
- double bloomFilterFalsePositiveRate) {
- super(virtualBufferCacheProvider, mergePolicyProvider, opTrackerFactory, ioSchedulerProvider,
- ioOpCallbackFactory, bloomFilterFalsePositiveRate);
+ ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+ ILSMOperationTrackerProvider opTrackerFactory, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
+ ILSMIOOperationCallbackFactory ioOpCallbackFactory, double bloomFilterFalsePositiveRate) {
+ super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, opTrackerFactory,
+ ioSchedulerProvider, ioOpCallbackFactory, bloomFilterFalsePositiveRate);
}
@Override
@@ -42,7 +44,7 @@
int partition) {
return new LSMBTreeDataflowHelper(opDesc, ctx, partition,
virtualBufferCacheProvider.getVirtualBufferCaches(ctx), bloomFilterFalsePositiveRate,
- mergePolicyProvider.getMergePolicy(ctx), opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx),
- ioOpCallbackFactory);
+ mergePolicyFactory.createMergePolicy(mergePolicyProperties), opTrackerFactory,
+ ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackFactory);
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 33ae7bb..28f44e6 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -54,7 +54,6 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
@@ -164,8 +163,7 @@
}
if (flushOnExit) {
- BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(
- ioOpCallback);
+ BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(ioOpCallback);
ILSMIndexAccessor accessor = createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
accessor.scheduleFlush(cb);
try {
@@ -242,10 +240,10 @@
public void getOperationalComponents(ILSMIndexOperationContext ctx) {
List<ILSMComponent> immutableComponents = diskComponents;
List<ILSMComponent> operationalComponents = ctx.getComponentHolder();
- operationalComponents.clear();
int cmc = currentMutableComponentId.get();
ctx.setCurrentMutableComponentId(cmc);
int numMutableComponents = memoryComponents.size();
+ operationalComponents.clear();
switch (ctx.getOperation()) {
case UPDATE:
case UPSERT:
@@ -256,6 +254,7 @@
break;
case SEARCH:
case INSERT:
+
for (int i = 0; i < numMutableComponents - 1; i++) {
ILSMComponent c = memoryComponents.get((cmc + i + 1) % numMutableComponents);
LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) c;
@@ -269,6 +268,9 @@
operationalComponents.addAll(immutableComponents);
break;
case MERGE:
+ operationalComponents.addAll(ctx.getComponentsToBeMerged());
+ break;
+ case FULL_MERGE:
operationalComponents.addAll(immutableComponents);
break;
default:
@@ -366,7 +368,8 @@
opCtx.getComponentHolder().add(flushingComponent);
ILSMIndexAccessorInternal flushAccessor = new LSMBTreeAccessor(lsmHarness, opCtx);
ioScheduler.scheduleOperation(new LSMBTreeFlushOperation(flushAccessor, flushingComponent, componentFileRefs
- .getInsertIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), callback));
+ .getInsertIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), callback, fileManager
+ .getBaseDir()));
}
@Override
@@ -423,8 +426,12 @@
LSMBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
opCtx.setOperation(IndexOperation.MERGE);
List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
- ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(opCtx);
-
+ boolean returnDeletedTuples = false;
+ if (ctx.getComponentHolder().get(ctx.getComponentHolder().size() - 1) != diskComponents.get(diskComponents
+ .size() - 1)) {
+ returnDeletedTuples = true;
+ }
+ ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(opCtx, returnDeletedTuples);
BTree firstBTree = (BTree) ((LSMBTreeDiskComponent) mergingComponents.get(0)).getBTree();
BTree lastBTree = (BTree) ((LSMBTreeDiskComponent) mergingComponents.get(mergingComponents.size() - 1))
.getBTree();
@@ -434,7 +441,8 @@
.getName(), lastFile.getFile().getName());
ILSMIndexAccessorInternal accessor = new LSMBTreeAccessor(lsmHarness, opCtx);
ioScheduler.scheduleOperation(new LSMBTreeMergeOperation(accessor, mergingComponents, cursor, relMergeFileRefs
- .getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), callback));
+ .getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), callback, fileManager
+ .getBaseDir()));
}
@Override
@@ -455,8 +463,8 @@
int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
BloomFilterSpecification bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement,
bloomFilterFalsePositiveRate);
- LSMBTreeDiskComponent mergedComponent = createDiskComponent(componentFactory,
- mergeOp.getBTreeMergeTarget(), mergeOp.getBloomFilterMergeTarget(), true);
+ LSMBTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getBTreeMergeTarget(),
+ mergeOp.getBloomFilterMergeTarget(), true);
IIndexBulkLoader bulkLoader = mergedComponent.getBTree().createBulkLoader(1.0f, false, numElements, false);
IIndexBulkLoader builder = mergedComponent.getBloomFilter().createBuilder(numElements,
@@ -476,9 +484,8 @@
return mergedComponent;
}
- private LSMBTreeDiskComponent createDiskComponent(LSMBTreeDiskComponentFactory factory,
- FileReference btreeFileRef, FileReference bloomFilterFileRef, boolean createComponent)
- throws HyracksDataException, IndexException {
+ private LSMBTreeDiskComponent createDiskComponent(LSMBTreeDiskComponentFactory factory, FileReference btreeFileRef,
+ FileReference bloomFilterFileRef, boolean createComponent) throws HyracksDataException, IndexException {
// Create new BTree instance.
LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) factory
.createLSMComponentInstance(new LSMComponentFileReferences(btreeFileRef, null, bloomFilterFileRef));
@@ -541,8 +548,8 @@
} catch (HyracksDataException | IndexException e) {
throw new TreeIndexException(e);
}
- bulkLoader = (BTreeBulkLoader) ((LSMBTreeDiskComponent) component).getBTree().createBulkLoader(
- fillFactor, verifyInput, numElementsHint, false);
+ bulkLoader = (BTreeBulkLoader) ((LSMBTreeDiskComponent) component).getBTree().createBulkLoader(fillFactor,
+ verifyInput, numElementsHint, false);
int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
BloomFilterSpecification bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement,
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
index 0b2d7cf..381b012 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
@@ -43,4 +43,9 @@
public BloomFilter getBloomFilter() {
return bloomFilter;
}
+
+ @Override
+ public long getComponentSize() {
+ return btree.getFileReference().getFile().length() + bloomFilter.getFileReference().getFile().length();
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
index 668e727..94b1569 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
@@ -27,21 +27,24 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
-public class LSMBTreeFlushOperation implements ILSMIOOperation {
+public class LSMBTreeFlushOperation implements ILSMIOOperation, Comparable<LSMBTreeFlushOperation> {
private final ILSMIndexAccessorInternal accessor;
private final ILSMComponent flushingComponent;
private final FileReference btreeFlushTarget;
private final FileReference bloomFilterFlushTarget;
private final ILSMIOOperationCallback callback;
+ private final String indexIdentifier;
public LSMBTreeFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
- FileReference btreeFlushTarget, FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback) {
+ FileReference btreeFlushTarget, FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback,
+ String indexIdentifier) {
this.accessor = accessor;
this.flushingComponent = flushingComponent;
this.btreeFlushTarget = btreeFlushTarget;
this.bloomFilterFlushTarget = bloomFilterFlushTarget;
this.callback = callback;
+ this.indexIdentifier = indexIdentifier;
}
@Override
@@ -83,4 +86,19 @@
public ILSMComponent getFlushingComponent() {
return flushingComponent;
}
+
+ @Override
+ public String getIndexUniqueIdentifier() {
+ return indexIdentifier;
+ }
+
+ @Override
+ public LSMIOOpertionType getIOOpertionType() {
+ return LSMIOOpertionType.FLUSH;
+ }
+
+ @Override
+ public int compareTo(LSMBTreeFlushOperation o) {
+ return btreeFlushTarget.getFile().getName().compareTo(o.getBTreeFlushTarget().getFile().getName());
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
index 3a608fe..a4f6875 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
@@ -37,16 +37,18 @@
private final FileReference btreeMergeTarget;
private final FileReference bloomFilterMergeTarget;
private final ILSMIOOperationCallback callback;
+ private final String indexIdentifier;
public LSMBTreeMergeOperation(ILSMIndexAccessorInternal accessor, List<ILSMComponent> mergingComponents,
ITreeIndexCursor cursor, FileReference btreeMergeTarget, FileReference bloomFilterMergeTarget,
- ILSMIOOperationCallback callback) {
+ ILSMIOOperationCallback callback, String indexIdentifier) {
this.accessor = accessor;
this.mergingComponents = mergingComponents;
this.cursor = cursor;
this.btreeMergeTarget = btreeMergeTarget;
this.bloomFilterMergeTarget = bloomFilterMergeTarget;
this.callback = callback;
+ this.indexIdentifier = indexIdentifier;
}
@Override
@@ -94,4 +96,14 @@
public List<ILSMComponent> getMergingComponents() {
return mergingComponents;
}
+
+ @Override
+ public String getIndexUniqueIdentifier() {
+ return indexIdentifier;
+ }
+
+ @Override
+ public LSMIOOpertionType getIOOpertionType() {
+ return LSMIOOpertionType.MERGE;
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
index 6d2d7c0..cb7bae7 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
@@ -48,6 +48,7 @@
public final IModificationOperationCallback modificationCallback;
public final ISearchOperationCallback searchCallback;
private final List<ILSMComponent> componentHolder;
+ private final List<ILSMComponent> componentsToBeMerged;
public LSMBTreeOpContext(List<ILSMComponent> mutableComponents, ITreeIndexFrameFactory insertLeafFrameFactory,
ITreeIndexFrameFactory deleteLeafFrameFactory, IModificationOperationCallback modificationCallback,
@@ -84,6 +85,7 @@
deleteLeafFrame.setMultiComparator(cmp);
}
this.componentHolder = new LinkedList<ILSMComponent>();
+ this.componentsToBeMerged = new LinkedList<ILSMComponent>();
this.modificationCallback = modificationCallback;
this.searchCallback = searchCallback;
}
@@ -107,6 +109,7 @@
@Override
public void reset() {
componentHolder.clear();
+ componentsToBeMerged.clear();
}
public IndexOperation getOperation() {
@@ -153,4 +156,9 @@
break;
}
}
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeMerged() {
+ return componentsToBeMerged;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 6eada4b..668a12c 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -49,7 +49,11 @@
private boolean proceed = true;
public LSMBTreeRangeSearchCursor(ILSMIndexOperationContext opCtx) {
- super(opCtx);
+ this(opCtx, false);
+ }
+
+ public LSMBTreeRangeSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples) {
+ super(opCtx, returnDeletedTuples);
this.copyTuple = new ArrayTupleReference();
this.reusablePred = new RangePredicate(null, null, true, true, null, null);
}
@@ -126,7 +130,7 @@
}
// If there is no previous tuple or the previous tuple can be ignored
if (outputElement == null) {
- if (isDeleted(checkElement)) {
+ if (isDeleted(checkElement) && !returnDeletedTuples) {
// If the key has been deleted then pop it and set needPush to true.
// We cannot push immediately because the tuple may be
// modified if hasNext() is called
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index bc7cbf7..1903998 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -37,6 +37,9 @@
public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException, IndexException;
+ public void scheduleFullMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException, IndexException;
+
public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
IndexException;
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
index 2c3940f..1638a5d 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
@@ -22,6 +22,12 @@
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
public interface ILSMIOOperation extends Callable<Boolean> {
+
+ public enum LSMIOOpertionType {
+ FLUSH,
+ MERGE
+ }
+
public Set<IODeviceHandle> getReadDevices();
public Set<IODeviceHandle> getWriteDevices();
@@ -29,4 +35,8 @@
public Boolean call() throws HyracksDataException, IndexException;
public ILSMIOOperationCallback getCallback();
+
+ public String getIndexUniqueIdentifier();
+
+ public LSMIOOpertionType getIOOpertionType();
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index 3405b60..36a2ca1 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+import java.util.List;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
@@ -30,7 +32,10 @@
public interface ILSMIndexAccessor extends IIndexAccessor {
public void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException;
- public void scheduleMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException;
+ public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMComponent> components)
+ throws HyracksDataException, IndexException;
+
+ public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException;
/**
* Deletes the tuple from the memory component only.
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
index fcd4037..80264bc 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
@@ -22,6 +22,8 @@
public interface ILSMIndexOperationContext extends IIndexOperationContext {
public List<ILSMComponent> getComponentHolder();
+
+ public List<ILSMComponent> getComponentsToBeMerged();
public ISearchOperationCallback getSearchOperationCallback();
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
index 1473071..279605f 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
@@ -15,9 +15,14 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
public interface ILSMMergePolicy {
- public void diskComponentAdded(ILSMIndex index) throws HyracksDataException, IndexException;
+ public void diskComponentAdded(ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
+ IndexException;
+
+ public void configure(Map<String, String> properties);
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyProvider.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyFactory.java
similarity index 73%
rename from hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyProvider.java
rename to hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyFactory.java
index cf56750..c90b5f7 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyProvider.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyFactory.java
@@ -15,9 +15,13 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
import java.io.Serializable;
+import java.util.Map;
+import java.util.Set;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+public interface ILSMMergePolicyFactory extends Serializable {
+ public ILSMMergePolicy createMergePolicy(Map<String, String> configuration);
-public interface ILSMMergePolicyProvider extends Serializable {
- public ILSMMergePolicy getMergePolicy(IHyracksTaskContext ctx);
-}
+ public String getName();
+
+ public Set<String> getPropertiesNames();
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelperFactory.java
index 2c082bb..4276ba7 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelperFactory.java
@@ -15,10 +15,12 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.dataflow;
+import java.util.Map;
+
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
@@ -26,21 +28,23 @@
protected static final long serialVersionUID = 1L;
protected final IVirtualBufferCacheProvider virtualBufferCacheProvider;
- protected final ILSMMergePolicyProvider mergePolicyProvider;
+ protected final ILSMMergePolicyFactory mergePolicyFactory;
+ protected final Map<String, String> mergePolicyProperties;
protected final ILSMOperationTrackerProvider opTrackerFactory;
protected final ILSMIOOperationSchedulerProvider ioSchedulerProvider;
protected final ILSMIOOperationCallbackFactory ioOpCallbackFactory;
protected final double bloomFilterFalsePositiveRate;
public AbstractLSMIndexDataflowHelperFactory(IVirtualBufferCacheProvider virtualBufferCacheProvider,
- ILSMMergePolicyProvider mergePolicyProvider, ILSMOperationTrackerProvider opTrackerFactory,
- ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
- double bloomFilterFalsePositiveRate) {
+ ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+ ILSMOperationTrackerProvider opTrackerFactory, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
+ ILSMIOOperationCallbackFactory ioOpCallbackFactory, double bloomFilterFalsePositiveRate) {
this.virtualBufferCacheProvider = virtualBufferCacheProvider;
- this.mergePolicyProvider = mergePolicyProvider;
+ this.mergePolicyFactory = mergePolicyFactory;
this.opTrackerFactory = opTrackerFactory;
this.ioSchedulerProvider = ioSchedulerProvider;
this.ioOpCallbackFactory = ioOpCallbackFactory;
this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate;
+ this.mergePolicyProperties = mergePolicyProperties;
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
new file mode 100644
index 0000000..a61dec4
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.common.dataflow;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
+
+public class LSMIndexCompactOperatorNodePushable extends AbstractOperatorNodePushable {
+ private final IIndexDataflowHelper indexHelper;
+
+ public LSMIndexCompactOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition) {
+ this.indexHelper = opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(opDesc, ctx, partition);
+ }
+
+ @Override
+ public void deinitialize() throws HyracksDataException {
+ indexHelper.close();
+ }
+
+ @Override
+ public int getInputArity() {
+ return 0;
+ }
+
+ @Override
+ public IFrameWriter getInputFrameWriter(int index) {
+ return null;
+ }
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ indexHelper.open();
+ ILSMIndex index = (ILSMIndex) indexHelper.getIndexInstance();
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ try {
+ accessor.scheduleFullMerge(NoOpIOOperationCallback.INSTANCE);
+ } catch (Exception e) {
+ indexHelper.close();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java
new file mode 100644
index 0000000..3c40f94
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider;
+
+public class LSMTreeIndexCompactOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ public LSMTreeIndexCompactOperatorDescriptor(IOperatorDescriptorRegistry spec,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider,
+ IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
+ IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields,
+ IIndexDataflowHelperFactory dataflowHelperFactory,
+ IModificationOperationCallbackFactory modificationOpCallbackProvider) {
+ super(spec, 0, 0, null, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory, null, false,
+ NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE,
+ modificationOpCallbackProvider);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new LSMIndexCompactOperatorNodePushable(this, ctx, partition);
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
index bc6baeb..ec12c23 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
@@ -85,4 +85,6 @@
protected abstract void destroy() throws HyracksDataException;
+ public abstract long getComponentSize();
+
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
index 25894f1..b785764 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
@@ -14,24 +14,87 @@
*/
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOpertionType;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
public class AsynchronousScheduler implements ILSMIOOperationScheduler {
+ // Since this is a asynchronous scheduler, we make sure that flush operations coming from the same lsm index
+ // will be executed serially in same order of scheduling the operations. Look at asterix issue 630.
+
public final static AsynchronousScheduler INSTANCE = new AsynchronousScheduler();
private ExecutorService executor;
+ private final Map<String, ILSMIOOperation> runningFlushOperations = new HashMap<String, ILSMIOOperation>();
+ private final Map<String, PriorityQueue<ILSMIOOperation>> waitingFlushOperations = new HashMap<String, PriorityQueue<ILSMIOOperation>>();
public void init(ThreadFactory threadFactory) {
- executor = Executors.newCachedThreadPool(threadFactory);
+ // Creating an executor with the same configuration of Executors.newCachedThreadPool.
+ executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(), threadFactory) {
+
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+ return new LSMIOOperationTask<T>(callable);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ super.afterExecute(r, t);
+ LSMIOOperationTask<Boolean> task = (LSMIOOperationTask<Boolean>) r;
+ ILSMIOOperation executedOp = task.getOperation();
+ String id = executedOp.getIndexUniqueIdentifier();
+ synchronized (this) {
+ runningFlushOperations.remove(id);
+ if (waitingFlushOperations.containsKey(id)) {
+ try {
+ ILSMIOOperation op = waitingFlushOperations.get(id).poll();
+ if (op != null) {
+ scheduleOperation(op);
+ } else {
+ waitingFlushOperations.remove(id);
+ }
+ } catch (HyracksDataException e) {
+ t = e.getCause();
+ }
+ }
+ }
+ }
+ };
}
@Override
public void scheduleOperation(ILSMIOOperation operation) throws HyracksDataException {
- executor.submit(operation);
+ if (operation.getIOOpertionType() == LSMIOOpertionType.MERGE) {
+ executor.submit(operation);
+ } else {
+ String id = operation.getIndexUniqueIdentifier();
+ synchronized (executor) {
+ if (runningFlushOperations.containsKey(id)) {
+ if (waitingFlushOperations.containsKey(id)) {
+ waitingFlushOperations.get(id).offer(operation);
+ } else {
+ PriorityQueue<ILSMIOOperation> q = new PriorityQueue<ILSMIOOperation>();
+ q.offer(operation);
+ waitingFlushOperations.put(id, q);
+ }
+ } else {
+ runningFlushOperations.put(id, operation);
+ executor.submit(operation);
+ }
+ }
+ }
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
index b6f5657..8a6b8bd 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
@@ -15,27 +15,43 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+import java.util.List;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
public class ConstantMergePolicy implements ILSMMergePolicy {
+ private int numComponents;
- private final int threshold;
-
- public ConstantMergePolicy(int threshold) {
- this.threshold = threshold;
+ @Override
+ public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
+ IndexException {
+ List<ILSMComponent> immutableComponents = index.getImmutableComponents();
+ for (ILSMComponent c : immutableComponents) {
+ if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+ return;
+ }
+ }
+ if (fullMergeIsRequested) {
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ accessor.scheduleFullMerge(index.getIOOperationCallback());
+ } else if (immutableComponents.size() >= numComponents) {
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents);
+ }
}
@Override
- public void diskComponentAdded(final ILSMIndex index) throws HyracksDataException, IndexException {
- if (index.getImmutableComponents().size() >= threshold) {
- ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
- accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
- }
+ public void configure(Map<String, String> properties) {
+ numComponents = Integer.parseInt(properties.get("num-components"));
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyFactory.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyFactory.java
new file mode 100644
index 0000000..13f5ad9
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+
+public class ConstantMergePolicyFactory implements ILSMMergePolicyFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String[] SET_VALUES = new String[] { "num-components" };
+ private static final Set<String> PROPERTIES_NAMES = new HashSet<String>(Arrays.asList(SET_VALUES));
+
+ @Override
+ public ILSMMergePolicy createMergePolicy(Map<String, String> properties) {
+ ILSMMergePolicy policy = new ConstantMergePolicy();
+ policy.configure(properties);
+ return policy;
+ }
+
+ @Override
+ public String getName() {
+ return "constant";
+ }
+
+ @Override
+ public Set<String> getPropertiesNames() {
+ return PROPERTIES_NAMES;
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java
deleted file mode 100644
index a7383c1..0000000
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
-
-public class ConstantMergePolicyProvider implements ILSMMergePolicyProvider {
-
- private static final long serialVersionUID = 1L;
-
- private final int threshold;
-
- public ConstantMergePolicyProvider(int threshold) {
- this.threshold = threshold;
- }
-
- @Override
- public ILSMMergePolicy getMergePolicy(IHyracksTaskContext ctx) {
- return new ConstantMergePolicy(threshold);
- }
-
-}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index ca775b7..443ad2b 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -16,6 +16,7 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -41,11 +42,13 @@
private final ILSMIndexInternal lsmIndex;
private final ILSMMergePolicy mergePolicy;
private final ILSMOperationTracker opTracker;
+ private final AtomicBoolean fullMergeIsRequested;
public LSMHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker) {
this.lsmIndex = lsmIndex;
this.opTracker = opTracker;
this.mergePolicy = mergePolicy;
+ fullMergeIsRequested = new AtomicBoolean();
}
private boolean getAndEnterComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, boolean isTryOperation)
@@ -180,13 +183,14 @@
// newComponent is null if the flush op. was not performed.
if (newComponent != null) {
lsmIndex.addComponent(newComponent);
- mergePolicy.diskComponentAdded(lsmIndex);
+ mergePolicy.diskComponentAdded(lsmIndex, false);
}
break;
case MERGE:
// newComponent is null if the merge op. was not performed.
if (newComponent != null) {
lsmIndex.subsumeMergedComponents(newComponent, ctx.getComponentHolder());
+ mergePolicy.diskComponentAdded(lsmIndex, fullMergeIsRequested.get());
}
break;
default:
@@ -287,7 +291,6 @@
@Override
public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException, IndexException {
- // Merge should always be a try operation, because it should never fail to enter the components unless the merge policy is erroneous.
if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
callback.afterFinalize(LSMOperationType.MERGE, null);
return;
@@ -296,6 +299,20 @@
}
@Override
+ public void scheduleFullMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException, IndexException {
+ fullMergeIsRequested.set(true);
+ if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
+ // If the merge cannot be scheduled because there is already an ongoing merge on subset/all of the components, then
+ // whenever the current merge has finished, it will schedule the full merge again.
+ callback.afterFinalize(LSMOperationType.MERGE, null);
+ return;
+ }
+ fullMergeIsRequested.set(false);
+ lsmIndex.scheduleMerge(ctx, callback);
+ }
+
+ @Override
public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
IndexException {
if (LOGGER.isLoggable(Level.INFO)) {
@@ -320,7 +337,7 @@
public void addBulkLoadedComponent(ILSMComponent c) throws HyracksDataException, IndexException {
lsmIndex.markAsValid(c);
lsmIndex.addComponent(c);
- mergePolicy.diskComponentAdded(lsmIndex);
+ mergePolicy.diskComponentAdded(lsmIndex, false);
}
@Override
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIOOperationTask.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIOOperationTask.java
new file mode 100644
index 0000000..9743afe
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIOOperationTask.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+
+public class LSMIOOperationTask<T> extends FutureTask<T> {
+ private final ILSMIOOperation operation;
+
+ public LSMIOOperationTask(Callable<T> callable) {
+ super(callable);
+ this.operation = (ILSMIOOperation) callable;
+ }
+
+ public ILSMIOOperation getOperation() {
+ return operation;
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index 45cc69b..2bc45a9 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@ -42,11 +42,13 @@
protected boolean includeMutableComponent;
protected ILSMHarness lsmHarness;
protected final ILSMIndexOperationContext opCtx;
+ protected final boolean returnDeletedTuples;
protected List<ILSMComponent> operationalComponents;
- public LSMIndexSearchCursor(ILSMIndexOperationContext opCtx) {
+ public LSMIndexSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples) {
this.opCtx = opCtx;
+ this.returnDeletedTuples = returnDeletedTuples;
outputElement = null;
needPush = false;
}
@@ -110,14 +112,14 @@
@Override
public void close() throws HyracksDataException {
- if (lsmHarness != null) {
- try {
- outputPriorityQueue.clear();
- for (int i = 0; i < rangeCursors.length; i++) {
- rangeCursors[i].close();
- }
- rangeCursors = null;
- } finally {
+ try {
+ outputPriorityQueue.clear();
+ for (int i = 0; i < rangeCursors.length; i++) {
+ rangeCursors[i].close();
+ }
+ rangeCursors = null;
+ } finally {
+ if (lsmHarness != null) {
lsmHarness.endSearch(opCtx);
}
}
@@ -154,7 +156,50 @@
return ((ILSMTreeTupleReference) checkElement.getTuple()).isAntimatter();
}
- abstract protected void checkPriorityQueue() throws HyracksDataException, IndexException;
+ protected void checkPriorityQueue() throws HyracksDataException, IndexException {
+ while (!outputPriorityQueue.isEmpty() || needPush == true) {
+ if (!outputPriorityQueue.isEmpty()) {
+ PriorityQueueElement checkElement = outputPriorityQueue.peek();
+ // If there is no previous tuple or the previous tuple can be ignored
+ if (outputElement == null) {
+ if (isDeleted(checkElement) && !returnDeletedTuples) {
+ // If the key has been deleted then pop it and set needPush to true.
+ // We cannot push immediately because the tuple may be
+ // modified if hasNext() is called
+ outputElement = outputPriorityQueue.poll();
+ needPush = true;
+ } else {
+ break;
+ }
+ } else {
+ // Compare the previous tuple and the head tuple in the PQ
+ if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
+ // If the previous tuple and the head tuple are
+ // identical
+ // then pop the head tuple and push the next tuple from
+ // the tree of head tuple
+
+ // the head element of PQ is useless now
+ PriorityQueueElement e = outputPriorityQueue.poll();
+ pushIntoPriorityQueue(e);
+ } else {
+ // If the previous tuple and the head tuple are different
+ // the info of previous tuple is useless
+ if (needPush == true) {
+ pushIntoPriorityQueue(outputElement);
+ needPush = false;
+ }
+ outputElement = null;
+ }
+ }
+ } else {
+ // the priority queue is empty and needPush
+ pushIntoPriorityQueue(outputElement);
+ needPush = false;
+ outputElement = null;
+ }
+ }
+ }
@Override
public boolean exclusiveLatchNodes() {
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index f11a061..c828bd2 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -15,12 +15,15 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+import java.util.List;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -116,12 +119,21 @@
}
@Override
- public void scheduleMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
+ public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMComponent> components)
+ throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.MERGE);
+ ctx.getComponentsToBeMerged().clear();
+ ctx.getComponentsToBeMerged().addAll(components);
lsmHarness.scheduleMerge(ctx, callback);
}
@Override
+ public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.FULL_MERGE);
+ lsmHarness.scheduleFullMerge(ctx, callback);
+ }
+
+ @Override
public void forcePhysicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.PHYSICALDELETE);
lsmHarness.forceModify(ctx, tuple);
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
index 17d1b17..ca22268 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+import java.util.Map;
+
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
@@ -21,8 +23,12 @@
INSTANCE;
@Override
- public void diskComponentAdded(ILSMIndex index) {
+ public void diskComponentAdded(ILSMIndex index, boolean fullMergeIsRequested) {
// Do nothing
}
+ @Override
+ public void configure(Map<String, String> properties) {
+ // Do nothing
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
new file mode 100644
index 0000000..fe04db6
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+
+public class PrefixMergePolicy implements ILSMMergePolicy {
+
+ private long maxMergableComponentSize;
+ private int maxTolernaceComponentCount;
+
+ @Override
+ public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
+ IndexException {
+ // 1. Look at the candidate components for merging in oldest-first order. If one exists, identify the prefix of the sequence of
+ // all such components for which the sum of their sizes exceeds MaxMrgCompSz. Schedule a merge of those components into a new component.
+ // 2. If a merge from 1 doesn't happen, see if the set of candidate components for merging exceeds MaxTolCompCnt. If so, schedule
+ // a merge all of the current candidates into a new single component.
+ List<ILSMComponent> immutableComponents = index.getImmutableComponents();
+ for (ILSMComponent c : immutableComponents) {
+ if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+ return;
+ }
+ }
+ if (fullMergeIsRequested) {
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ accessor.scheduleFullMerge(index.getIOOperationCallback());
+ return;
+ }
+ long totalSize = 0;
+ int startIndex = -1;
+ for (int i = 0; i < immutableComponents.size(); i++) {
+ ILSMComponent c = immutableComponents.get(i);
+ long componentSize = ((AbstractDiskLSMComponent) c).getComponentSize();
+ if (componentSize > maxMergableComponentSize) {
+ startIndex = i;
+ totalSize = 0;
+ continue;
+ }
+ totalSize += componentSize;
+ boolean isLastComponent = i + 1 == immutableComponents.size() ? true : false;
+ if (totalSize > maxMergableComponentSize
+ || (isLastComponent && i - startIndex >= maxTolernaceComponentCount)) {
+ List<ILSMComponent> mergableCopments = new ArrayList<ILSMComponent>();
+ for (int j = startIndex + 1; j <= i; j++) {
+ mergableCopments.add(immutableComponents.get(j));
+ }
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ accessor.scheduleMerge(index.getIOOperationCallback(), mergableCopments);
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void configure(Map<String, String> properties) {
+ maxMergableComponentSize = Long.parseLong(properties.get("max-mergable-component-size"));
+ maxTolernaceComponentCount = Integer.parseInt(properties.get("max-tolernace-component-count"));
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicyFactory.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicyFactory.java
new file mode 100644
index 0000000..981ec6c
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicyFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+
+public class PrefixMergePolicyFactory implements ILSMMergePolicyFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String[] SET_VALUES = new String[] { "max-mergable-component-size",
+ "max-tolernace-component-count" };
+ private static final Set<String> PROPERTIES_NAMES = new HashSet<String>(Arrays.asList(SET_VALUES));
+
+ @Override
+ public ILSMMergePolicy createMergePolicy(Map<String, String> properties) {
+ ILSMMergePolicy policy = new PrefixMergePolicy();
+ policy.configure(properties);
+ return policy;
+ }
+
+ @Override
+ public String getName() {
+ return "prefix";
+ }
+
+ @Override
+ public Set<String> getPropertiesNames() {
+ return PROPERTIES_NAMES;
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexCompactOperator.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexCompactOperator.java
new file mode 100644
index 0000000..22e7505
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexCompactOperator.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMIndexCompactOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider;
+
+public class LSMInvertedIndexCompactOperator extends AbstractLSMInvertedIndexOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ public LSMInvertedIndexCompactOperator(IOperatorDescriptorRegistry spec, IStorageManagerInterface storageManager,
+ IFileSplitProvider fileSplitProvider, IIndexLifecycleManagerProvider lifecycleManagerProvider,
+ ITypeTraits[] tokenTypeTraits, IBinaryComparatorFactory[] tokenComparatorFactories,
+ ITypeTraits[] invListsTypeTraits, IBinaryComparatorFactory[] invListComparatorFactories,
+ IBinaryTokenizerFactory tokenizerFactory, IIndexDataflowHelperFactory dataflowHelperFactory,
+ IModificationOperationCallbackFactory modificationOpCallbackFactory) {
+ super(spec, 1, 1, null, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
+ tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
+ dataflowHelperFactory, null, false, NoOpLocalResourceFactoryProvider.INSTANCE,
+ NoOpOperationCallbackFactory.INSTANCE, modificationOpCallbackFactory);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new LSMIndexCompactOperatorNodePushable(this, ctx, partition);
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelperFactory.java
index 84c7150..c018f16 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelperFactory.java
@@ -15,12 +15,14 @@
package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
@@ -30,11 +32,11 @@
private static final long serialVersionUID = 1L;
public LSMInvertedIndexDataflowHelperFactory(IVirtualBufferCacheProvider virtualBufferCacheProvider,
- ILSMMergePolicyProvider mergePolicyProvider, ILSMOperationTrackerProvider opTrackerProvider,
- ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
- double bloomFilterFalsePositiveRate) {
- super(virtualBufferCacheProvider, mergePolicyProvider, opTrackerProvider, ioSchedulerProvider,
- ioOpCallbackFactory, bloomFilterFalsePositiveRate);
+ ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+ ILSMOperationTrackerProvider opTrackerProvider, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
+ ILSMIOOperationCallbackFactory ioOpCallbackFactory, double bloomFilterFalsePositiveRate) {
+ super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, opTrackerProvider,
+ ioSchedulerProvider, ioOpCallbackFactory, bloomFilterFalsePositiveRate);
}
@Override
@@ -42,8 +44,8 @@
int partition) {
return new LSMInvertedIndexDataflowHelper(opDesc, ctx, partition,
virtualBufferCacheProvider.getVirtualBufferCaches(ctx), bloomFilterFalsePositiveRate,
- mergePolicyProvider.getMergePolicy(ctx), opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx),
- ioOpCallbackFactory);
+ mergePolicyFactory.createMergePolicy(mergePolicyProperties), opTrackerFactory,
+ ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackFactory);
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java
index d78ae7e..aef0863 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java
@@ -15,12 +15,14 @@
package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
@@ -30,11 +32,11 @@
private static final long serialVersionUID = 1L;
public PartitionedLSMInvertedIndexDataflowHelperFactory(IVirtualBufferCacheProvider virtualBufferCacheProvider,
- ILSMMergePolicyProvider mergePolicyProvider, ILSMOperationTrackerProvider opTrackerProvider,
- ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
- double bloomFilterFalsePositiveRate) {
- super(virtualBufferCacheProvider, mergePolicyProvider, opTrackerProvider, ioSchedulerProvider,
- ioOpCallbackFactory, bloomFilterFalsePositiveRate);
+ ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+ ILSMOperationTrackerProvider opTrackerProvider, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
+ ILSMIOOperationCallbackFactory ioOpCallbackFactory, double bloomFilterFalsePositiveRate) {
+ super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, opTrackerProvider,
+ ioSchedulerProvider, ioOpCallbackFactory, bloomFilterFalsePositiveRate);
}
@Override
@@ -42,7 +44,7 @@
int partition) {
return new PartitionedLSMInvertedIndexDataflowHelper(opDesc, ctx, partition,
virtualBufferCacheProvider.getVirtualBufferCaches(ctx), bloomFilterFalsePositiveRate,
- mergePolicyProvider.getMergePolicy(ctx), opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx),
- ioOpCallbackFactory);
+ mergePolicyFactory.createMergePolicy(mergePolicyProperties), opTrackerFactory,
+ ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackFactory);
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index cef6aee..93c5a58 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -267,10 +267,10 @@
public void getOperationalComponents(ILSMIndexOperationContext ctx) {
List<ILSMComponent> immutableComponents = diskComponents;
List<ILSMComponent> operationalComponents = ctx.getComponentHolder();
- operationalComponents.clear();
int cmc = currentMutableComponentId.get();
ctx.setCurrentMutableComponentId(cmc);
int numMutableComponents = memoryComponents.size();
+ operationalComponents.clear();
switch (ctx.getOperation()) {
case FLUSH:
case DELETE:
@@ -291,8 +291,10 @@
operationalComponents.addAll(immutableComponents);
break;
case MERGE:
- operationalComponents.addAll(immutableComponents);
+ operationalComponents.addAll(ctx.getComponentsToBeMerged());
break;
+ case FULL_MERGE:
+ operationalComponents.addAll(immutableComponents);
default:
throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
}
@@ -436,7 +438,7 @@
ioScheduler.scheduleOperation(new LSMInvertedIndexFlushOperation(
new LSMInvertedIndexAccessor(lsmHarness, opCtx), flushingComponent, componentFileRefs
.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(),
- componentFileRefs.getBloomFilterFileReference(), callback));
+ componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir()));
}
@Override
@@ -541,7 +543,7 @@
ILSMIndexAccessorInternal accessor = new LSMInvertedIndexAccessor(lsmHarness, ctx);
ioScheduler.scheduleOperation(new LSMInvertedIndexMergeOperation(accessor, mergingComponents, cursor,
relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getDeleteIndexFileReference(),
- relMergeFileRefs.getBloomFilterFileReference(), callback));
+ relMergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir()));
}
@Override
@@ -561,6 +563,31 @@
mergeOp.getBloomFilterMergeTarget(), true);
IInvertedIndex mergedDiskInvertedIndex = component.getInvIndex();
+
+ // In case we must keep the deleted-keys BTrees, then they must be merged *before* merging the inverted indexes so that
+ // lsmHarness.endSearch() is called once when the inverted indexes have been merged.
+ if (mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) != diskComponents
+ .get(diskComponents.size() - 1)) {
+ // Keep the deleted tuples since the oldest disk component is not included in the merge operation
+
+ LSMInvertedIndexDeletedKeysBTreeMergeCursor btreeCursor = new LSMInvertedIndexDeletedKeysBTreeMergeCursor(
+ opCtx);
+ search(opCtx, btreeCursor, mergePred);
+
+ BTree btree = component.getDeletedKeysBTree();
+ IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true, 0L, false);
+ try {
+ while (btreeCursor.hasNext()) {
+ btreeCursor.next();
+ ITupleReference tuple = btreeCursor.getTuple();
+ btreeBulkLoader.add(tuple);
+ }
+ } finally {
+ btreeCursor.close();
+ }
+ btreeBulkLoader.end();
+ }
+
IIndexBulkLoader invIndexBulkLoader = mergedDiskInvertedIndex.createBulkLoader(1.0f, true, 0L, false);
try {
while (cursor.hasNext()) {
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index e31af9a..7e34dfe 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -15,12 +15,15 @@
package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+import java.util.List;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -84,12 +87,21 @@
}
@Override
- public void scheduleMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
+ public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMComponent> components)
+ throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.MERGE);
+ ctx.getComponentsToBeMerged().clear();
+ ctx.getComponentsToBeMerged().addAll(components);
lsmHarness.scheduleMerge(ctx, callback);
}
@Override
+ public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.FULL_MERGE);
+ lsmHarness.scheduleFullMerge(ctx, callback);
+ }
+
+ @Override
public void merge(ILSMIOOperation operation) throws HyracksDataException, IndexException {
lsmHarness.merge(ctx, operation);
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java
new file mode 100644
index 0000000..3efaaba
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
+
+public class LSMInvertedIndexDeletedKeysBTreeMergeCursor extends LSMIndexSearchCursor {
+
+ public LSMInvertedIndexDeletedKeysBTreeMergeCursor(ILSMIndexOperationContext opCtx) {
+ super(opCtx, true);
+ }
+
+ @Override
+ protected boolean isDeleted(PriorityQueueElement checkElement) throws HyracksDataException, IndexException {
+ return false;
+ }
+
+ @Override
+ public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException,
+ IndexException {
+ LSMInvertedIndexRangeSearchCursorInitialState lsmInitialState = (LSMInvertedIndexRangeSearchCursorInitialState) initialState;
+ cmp = lsmInitialState.getOriginalKeyComparator();
+ operationalComponents = lsmInitialState.getOperationalComponents();
+ // We intentionally set the lsmHarness to null so that we don't call lsmHarness.endSearch() because we already do that when we merge the inverted indexes.
+ lsmHarness = null;
+ int numBTrees = operationalComponents.size();
+ rangeCursors = new IIndexCursor[numBTrees];
+
+ MultiComparator keyCmp = lsmInitialState.getKeyComparator();
+ RangePredicate btreePredicate = new RangePredicate(null, null, true, true, keyCmp, keyCmp);
+ ArrayList<IIndexAccessor> btreeAccessors = lsmInitialState.getDeletedKeysBTreeAccessors();
+ for (int i = 0; i < numBTrees; i++) {
+ rangeCursors[i] = btreeAccessors.get(i).createSearchCursor();
+ btreeAccessors.get(i).search(rangeCursors[i], btreePredicate);
+ }
+ setPriorityQueueComparator();
+ initPriorityQueue();
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
index 323edd1..446e807 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
@@ -19,6 +19,7 @@
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractDiskLSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
public class LSMInvertedIndexDiskComponent extends AbstractDiskLSMComponent {
@@ -53,4 +54,12 @@
public BloomFilter getBloomFilter() {
return bloomFilter;
}
+
+ @Override
+ public long getComponentSize() {
+ return ((OnDiskInvertedIndex) invIndex).getInvListsFile().getFile().length()
+ + ((OnDiskInvertedIndex) invIndex).getBTree().getFileReference().getFile().length()
+ + deletedKeysBTree.getFileReference().getFile().length()
+ + bloomFilter.getFileReference().getFile().length();
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
index 45433e7..beb7643 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
@@ -28,23 +28,25 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
-public class LSMInvertedIndexFlushOperation implements ILSMIOOperation {
+public class LSMInvertedIndexFlushOperation implements ILSMIOOperation, Comparable<LSMInvertedIndexFlushOperation> {
private final ILSMIndexAccessorInternal accessor;
private final ILSMComponent flushingComponent;
private final FileReference dictBTreeFlushTarget;
private final FileReference deletedKeysBTreeFlushTarget;
private final FileReference bloomFilterFlushTarget;
private final ILSMIOOperationCallback callback;
+ private final String indexIdentifier;
public LSMInvertedIndexFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
FileReference dictBTreeFlushTarget, FileReference deletedKeysBTreeFlushTarget,
- FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback) {
+ FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier) {
this.accessor = accessor;
this.flushingComponent = flushingComponent;
this.dictBTreeFlushTarget = dictBTreeFlushTarget;
this.deletedKeysBTreeFlushTarget = deletedKeysBTreeFlushTarget;
this.bloomFilterFlushTarget = bloomFilterFlushTarget;
this.callback = callback;
+ this.indexIdentifier = indexIdentifier;
}
@Override
@@ -88,4 +90,19 @@
public ILSMComponent getFlushingComponent() {
return flushingComponent;
}
+
+ @Override
+ public String getIndexUniqueIdentifier() {
+ return indexIdentifier;
+ }
+
+ @Override
+ public LSMIOOpertionType getIOOpertionType() {
+ return LSMIOOpertionType.FLUSH;
+ }
+
+ @Override
+ public int compareTo(LSMInvertedIndexFlushOperation o) {
+ return dictBTreeFlushTarget.getFile().getName().compareTo(o.getDictBTreeFlushTarget().getFile().getName());
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
index 7cd921a..5a1e413 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
@@ -38,10 +38,11 @@
private final FileReference deletedKeysBTreeMergeTarget;
private final FileReference bloomFilterMergeTarget;
private final ILSMIOOperationCallback callback;
+ private final String indexIdentifier;
public LSMInvertedIndexMergeOperation(ILSMIndexAccessorInternal accessor, List<ILSMComponent> mergingComponents,
IIndexCursor cursor, FileReference dictBTreeMergeTarget, FileReference deletedKeysBTreeMergeTarget,
- FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback) {
+ FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier) {
this.accessor = accessor;
this.mergingComponents = mergingComponents;
this.cursor = cursor;
@@ -49,6 +50,7 @@
this.deletedKeysBTreeMergeTarget = deletedKeysBTreeMergeTarget;
this.bloomFilterMergeTarget = bloomFilterMergeTarget;
this.callback = callback;
+ this.indexIdentifier = indexIdentifier;
}
@Override
@@ -103,4 +105,14 @@
public List<ILSMComponent> getMergingComponents() {
return mergingComponents;
}
+
+ @Override
+ public String getIndexUniqueIdentifier() {
+ return indexIdentifier;
+ }
+
+ @Override
+ public LSMIOOpertionType getIOOpertionType() {
+ return LSMIOOpertionType.MERGE;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
index 1a9303f..671e3f8 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
@@ -35,7 +35,8 @@
private IndexOperation op;
private final List<ILSMComponent> componentHolder;
-
+ private final List<ILSMComponent> componentsToBeMerged;
+
public final IModificationOperationCallback modificationCallback;
public final ISearchOperationCallback searchCallback;
@@ -54,6 +55,7 @@
IModificationOperationCallback modificationCallback, ISearchOperationCallback searchCallback)
throws HyracksDataException {
this.componentHolder = new LinkedList<ILSMComponent>();
+ this.componentsToBeMerged = new LinkedList<ILSMComponent>();
this.modificationCallback = modificationCallback;
this.searchCallback = searchCallback;
@@ -84,6 +86,7 @@
@Override
public void reset() {
componentHolder.clear();
+ componentsToBeMerged.clear();
}
@Override
@@ -118,4 +121,9 @@
currentMutableInvIndexAccessors = mutableInvIndexAccessors[currentMutableComponentId];
currentDeletedKeysBTreeAccessors = deletedKeysBTreeAccessors[currentMutableComponentId];
}
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeMerged() {
+ return componentsToBeMerged;
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
index cd0dde3..a6ff07e 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
@@ -28,8 +28,8 @@
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingTupleReference;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BloomFilterAwareBTreePointSearchCursor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccessor;
@@ -43,7 +43,7 @@
protected RangePredicate keySearchPred;
public LSMInvertedIndexRangeSearchCursor(ILSMIndexOperationContext opCtx) {
- super(opCtx);
+ super(opCtx, false);
}
@Override
@@ -76,12 +76,12 @@
for (int i = 0; i < operationalComponents.size(); i++) {
ILSMComponent component = operationalComponents.get(i);
if (component.getType() == LSMComponentType.MEMORY) {
- // No need for a bloom filter for the in-memory BTree.
+ // No need for a bloom filter for the in-memory BTree.
deletedKeysBTreeCursors[i] = deletedKeysBTreeAccessors.get(i).createSearchCursor();
} else {
- deletedKeysBTreeCursors[i] = new BloomFilterAwareBTreePointSearchCursor((IBTreeLeafFrame) lsmInitState
- .getgetDeletedKeysBTreeLeafFrameFactory().createFrame(), false,
- ((LSMInvertedIndexDiskComponent) operationalComponents.get(i)).getBloomFilter());
+ deletedKeysBTreeCursors[i] = new BloomFilterAwareBTreePointSearchCursor(
+ (IBTreeLeafFrame) lsmInitState.getgetDeletedKeysBTreeLeafFrameFactory().createFrame(),
+ false, ((LSMInvertedIndexDiskComponent) operationalComponents.get(i)).getBloomFilter());
}
}
}
@@ -114,50 +114,4 @@
}
return false;
}
-
- @Override
- protected void checkPriorityQueue() throws HyracksDataException, IndexException {
- while (!outputPriorityQueue.isEmpty() || needPush == true) {
- if (!outputPriorityQueue.isEmpty()) {
- PriorityQueueElement checkElement = outputPriorityQueue.peek();
- // If there is no previous tuple or the previous tuple can be ignored
- if (outputElement == null) {
- if (isDeleted(checkElement)) {
- // If the key has been deleted then pop it and set needPush to true.
- // We cannot push immediately because the tuple may be
- // modified if hasNext() is called
- outputElement = outputPriorityQueue.poll();
- needPush = true;
- } else {
- break;
- }
- } else {
- // Compare the previous tuple and the head tuple in the PQ
- if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
- // If the previous tuple and the head tuple are
- // identical
- // then pop the head tuple and push the next tuple from
- // the tree of head tuple
-
- // the head element of PQ is useless now
- PriorityQueueElement e = outputPriorityQueue.poll();
- pushIntoPriorityQueue(e);
- } else {
- // If the previous tuple and the head tuple are different
- // the info of previous tuple is useless
- if (needPush == true) {
- pushIntoPriorityQueue(outputElement);
- needPush = false;
- }
- outputElement = null;
- }
- }
- } else {
- // the priority queue is empty and needPush
- pushIntoPriorityQueue(outputElement);
- needPush = false;
- outputElement = null;
- }
- }
- }
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 0ab2f4d..4f586f9 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -462,6 +462,10 @@
public BTree getBTree() {
return btree;
}
+
+ public FileReference getInvListsFile() {
+ return invListsFile;
+ }
public class OnDiskInvertedIndexAccessor implements IInvertedIndexAccessor {
private final OnDiskInvertedIndex index;
@@ -561,6 +565,12 @@
public ByteBuffer allocateFrame() {
return ByteBuffer.allocate(FRAME_SIZE);
}
+
+ @Override
+ public void deallocateFrames(int frameCount) {
+ // TODO Auto-generated method stub
+
+ }
}
@Override
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java
index 8254689..7b54f19 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
@@ -23,7 +25,7 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
@@ -40,12 +42,12 @@
public LSMRTreeDataflowHelperFactory(IPrimitiveValueProviderFactory[] valueProviderFactories,
RTreePolicyType rtreePolicyType, IBinaryComparatorFactory[] btreeComparatorFactories,
- IVirtualBufferCacheProvider virtualBufferCacheProvider, ILSMMergePolicyProvider mergePolicyProvider,
- ILSMOperationTrackerProvider opTrackerFactory, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
- ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILinearizeComparatorFactory linearizeCmpFactory,
- double bloomFilterFalsePositiveRate) {
- super(virtualBufferCacheProvider, mergePolicyProvider, opTrackerFactory, ioSchedulerProvider,
- ioOpCallbackFactory, bloomFilterFalsePositiveRate);
+ IVirtualBufferCacheProvider virtualBufferCacheProvider, ILSMMergePolicyFactory mergePolicyFactory,
+ Map<String, String> mergePolicyProperties, ILSMOperationTrackerProvider opTrackerFactory,
+ ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
+ ILinearizeComparatorFactory linearizeCmpFactory, double bloomFilterFalsePositiveRate) {
+ super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, opTrackerFactory,
+ ioSchedulerProvider, ioOpCallbackFactory, bloomFilterFalsePositiveRate);
this.btreeComparatorFactories = btreeComparatorFactories;
this.valueProviderFactories = valueProviderFactories;
this.rtreePolicyType = rtreePolicyType;
@@ -58,7 +60,7 @@
return new LSMRTreeDataflowHelper(opDesc, ctx, partition,
virtualBufferCacheProvider.getVirtualBufferCaches(ctx), bloomFilterFalsePositiveRate,
btreeComparatorFactories, valueProviderFactories, rtreePolicyType,
- mergePolicyProvider.getMergePolicy(ctx), opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx),
- ioOpCallbackFactory, linearizeCmpFactory);
+ mergePolicyFactory.createMergePolicy(mergePolicyProperties), opTrackerFactory,
+ ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackFactory, linearizeCmpFactory);
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelperFactory.java
index ef34876..2e1cfaa 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelperFactory.java
@@ -15,47 +15,42 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreePolicyType;
-public class LSMRTreeWithAntiMatterTuplesDataflowHelperFactory implements IIndexDataflowHelperFactory {
+public class LSMRTreeWithAntiMatterTuplesDataflowHelperFactory extends AbstractLSMIndexDataflowHelperFactory {
private static final long serialVersionUID = 1L;
- private final IVirtualBufferCacheProvider virtualBufferCacheProvider;
private final IBinaryComparatorFactory[] btreeComparatorFactories;
private final IPrimitiveValueProviderFactory[] valueProviderFactories;
private final RTreePolicyType rtreePolicyType;
- private final ILSMMergePolicyProvider mergePolicyProvider;
- private final ILSMOperationTrackerProvider opTrackerProvider;
- private final ILSMIOOperationSchedulerProvider ioSchedulerProvider;
- private final ILSMIOOperationCallbackFactory ioOpCallbackFactory;
private final ILinearizeComparatorFactory linearizeCmpFactory;
public LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(IPrimitiveValueProviderFactory[] valueProviderFactories,
RTreePolicyType rtreePolicyType, IBinaryComparatorFactory[] btreeComparatorFactories,
- IVirtualBufferCacheProvider virtualBufferCacheProvider, ILSMMergePolicyProvider mergePolicyProvider,
- ILSMOperationTrackerProvider opTrackerProvider, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
- ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILinearizeComparatorFactory linearizeCmpFactory) {
- this.virtualBufferCacheProvider = virtualBufferCacheProvider;
+ IVirtualBufferCacheProvider virtualBufferCacheProvider, ILSMMergePolicyFactory mergePolicyFactory,
+ Map<String, String> mergePolicyProperties, ILSMOperationTrackerProvider opTrackerFactory,
+ ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
+ ILinearizeComparatorFactory linearizeCmpFactory) {
+ super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, opTrackerFactory, ioSchedulerProvider,
+ ioOpCallbackFactory, 1.0);
this.btreeComparatorFactories = btreeComparatorFactories;
this.valueProviderFactories = valueProviderFactories;
this.rtreePolicyType = rtreePolicyType;
- this.mergePolicyProvider = mergePolicyProvider;
- this.ioSchedulerProvider = ioSchedulerProvider;
- this.opTrackerProvider = opTrackerProvider;
- this.ioOpCallbackFactory = ioOpCallbackFactory;
this.linearizeCmpFactory = linearizeCmpFactory;
}
@@ -64,7 +59,7 @@
int partition) {
return new LSMRTreeWithAntiMatterTuplesDataflowHelper(opDesc, ctx, partition,
virtualBufferCacheProvider.getVirtualBufferCaches(ctx), btreeComparatorFactories,
- valueProviderFactories, rtreePolicyType, mergePolicyProvider.getMergePolicy(ctx), opTrackerProvider,
- ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackFactory, linearizeCmpFactory);
+ valueProviderFactories, rtreePolicyType, mergePolicyFactory.createMergePolicy(mergePolicyProperties),
+ opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackFactory, linearizeCmpFactory);
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index b09e115..673c7ae 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -24,18 +24,15 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
-import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -200,11 +197,11 @@
@Override
public void getOperationalComponents(ILSMIndexOperationContext ctx) {
List<ILSMComponent> operationalComponents = ctx.getComponentHolder();
- operationalComponents.clear();
List<ILSMComponent> immutableComponents = diskComponents;
int cmc = currentMutableComponentId.get();
ctx.setCurrentMutableComponentId(cmc);
int numMutableComponents = memoryComponents.size();
+ operationalComponents.clear();
switch (ctx.getOperation()) {
case INSERT:
case DELETE:
@@ -225,8 +222,10 @@
operationalComponents.addAll(immutableComponents);
break;
case MERGE:
- operationalComponents.addAll(immutableComponents);
+ operationalComponents.addAll(ctx.getComponentsToBeMerged());
break;
+ case FULL_MERGE:
+ operationalComponents.addAll(immutableComponents);
default:
throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
}
@@ -332,38 +331,11 @@
ctx.modificationCallback.before(tuple);
ctx.modificationCallback.found(null, tuple);
if (ctx.getOperation() == IndexOperation.INSERT) {
- // Before each insert, we must check whether there exist a killer
- // tuple in the memBTree. If we find a killer tuple, we must truly
- // delete the existing tuple from the BTree, and then insert it to
- // memRTree. Otherwise, the old killer tuple will kill the newly
- // added RTree tuple.
- RangePredicate btreeRangePredicate = new RangePredicate(tuple, tuple, true, true,
- ctx.getBTreeMultiComparator(), ctx.getBTreeMultiComparator());
- ITreeIndexCursor cursor = ctx.currentMutableBTreeAccessor.createSearchCursor();
- ctx.currentMutableBTreeAccessor.search(cursor, btreeRangePredicate);
- boolean foundTupleInMemoryBTree = false;
- try {
- if (cursor.hasNext()) {
- foundTupleInMemoryBTree = true;
- }
- } finally {
- cursor.close();
- }
- if (foundTupleInMemoryBTree) {
- try {
- ctx.currentMutableBTreeAccessor.delete(tuple);
- } catch (TreeIndexNonExistentKeyException e) {
- // Tuple has been deleted in the meantime. Do nothing.
- // This normally shouldn't happen if we are dealing with
- // good citizens since LSMRTree is used as a secondary
- // index and a tuple shouldn't be deleted twice without
- // insert between them.
- }
- } else {
- ctx.currentMutableRTreeAccessor.insert(tuple);
- }
-
+ ctx.currentMutableRTreeAccessor.insert(tuple);
} else {
+ // First remove all entries in the in-memory rtree (if any).
+ ctx.currentMutableRTreeAccessor.delete(tuple);
+ // Insert key into the deleted-keys BTree.
try {
ctx.currentMutableBTreeAccessor.insert(tuple);
} catch (TreeIndexDuplicateKeyException e) {
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index a4b6139..3d2cc62 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -177,7 +177,7 @@
LSMRTreeAccessor accessor = new LSMRTreeAccessor(lsmHarness, rctx);
ioScheduler.scheduleOperation(new LSMRTreeFlushOperation(accessor, flushingComponent, componentFileRefs
.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(), componentFileRefs
- .getBloomFilterFileReference(), callback));
+ .getBloomFilterFileReference(), callback, fileManager.getBaseDir()));
}
@Override
@@ -295,7 +295,8 @@
ILSMIndexAccessorInternal accessor = new LSMRTreeAccessor(lsmHarness, rctx);
ioScheduler.scheduleOperation(new LSMRTreeMergeOperation((ILSMIndexAccessorInternal) accessor,
mergingComponents, cursor, relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs
- .getDeleteIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), callback));
+ .getDeleteIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), callback,
+ fileManager.getBaseDir()));
}
@Override
@@ -309,8 +310,31 @@
LSMRTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getRTreeMergeTarget(),
mergeOp.getBTreeMergeTarget(), mergeOp.getBloomFilterMergeTarget(), true);
- IIndexBulkLoader bulkLoader = mergedComponent.getRTree().createBulkLoader(1.0f, false, 0L, false);
+ // In case we must keep the deleted-keys BTrees, then they must be merged *before* merging the r-trees so that
+ // lsmHarness.endSearch() is called once when the r-trees have been merged.
+ if (mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) != diskComponents
+ .get(diskComponents.size() - 1)) {
+ // Keep the deleted tuples since the oldest disk component is not included in the merge operation
+
+ LSMRTreeDeletedKeysBTreeMergeCursor btreeCursor = new LSMRTreeDeletedKeysBTreeMergeCursor(opCtx);
+ search(opCtx, btreeCursor, rtreeSearchPred);
+
+ BTree btree = mergedComponent.getBTree();
+ IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true, 0L, false);
+ try {
+ while (btreeCursor.hasNext()) {
+ btreeCursor.next();
+ ITupleReference tuple = btreeCursor.getTuple();
+ btreeBulkLoader.add(tuple);
+ }
+ } finally {
+ btreeCursor.close();
+ }
+ btreeBulkLoader.end();
+ }
+
+ IIndexBulkLoader bulkLoader = mergedComponent.getRTree().createBulkLoader(1.0f, false, 0L, false);
try {
while (cursor.hasNext()) {
cursor.next();
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java
new file mode 100644
index 0000000..fe2ed96
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
+
+public class LSMRTreeDeletedKeysBTreeMergeCursor extends LSMIndexSearchCursor {
+
+ public LSMRTreeDeletedKeysBTreeMergeCursor(ILSMIndexOperationContext opCtx) {
+ super(opCtx, true);
+ }
+
+ @Override
+ protected boolean isDeleted(PriorityQueueElement checkElement) throws HyracksDataException, IndexException {
+ return false;
+ }
+
+ @Override
+ public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException,
+ IndexException {
+ LSMRTreeCursorInitialState lsmInitialState = (LSMRTreeCursorInitialState) initialState;
+ cmp = lsmInitialState.getBTreeCmp();
+ operationalComponents = lsmInitialState.getOperationalComponents();
+ // We intentionally set the lsmHarness to null so that we don't call lsmHarness.endSearch() because we already do that when we merge r-trees.
+ lsmHarness = null;
+ int numBTrees = operationalComponents.size();
+ rangeCursors = new IIndexCursor[numBTrees];
+
+ RangePredicate btreePredicate = new RangePredicate(null, null, true, true, cmp, cmp);
+ IIndexAccessor[] btreeAccessors = new ITreeIndexAccessor[numBTrees];
+ for (int i = 0; i < numBTrees; i++) {
+ ILSMComponent component = operationalComponents.get(i);
+ IBTreeLeafFrame leafFrame = (IBTreeLeafFrame) lsmInitialState.getBTreeLeafFrameFactory().createFrame();
+ rangeCursors[i] = new BTreeRangeSearchCursor(leafFrame, false);
+ BTree btree = (BTree) ((LSMRTreeDiskComponent) component).getBTree();
+ btreeAccessors[i] = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ btreeAccessors[i].search(rangeCursors[i], btreePredicate);
+ }
+ setPriorityQueueComparator();
+ initPriorityQueue();
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
index 7bc3f79..c3c0a55 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
@@ -54,4 +54,14 @@
public BloomFilter getBloomFilter() {
return bloomFilter;
}
+
+ @Override
+ public long getComponentSize() {
+ long size = rtree.getFileReference().getFile().length();
+ if (btree != null) {
+ size += btree.getFileReference().getFile().length();
+ size += bloomFilter.getFileReference().getFile().length();
+ }
+ return size;
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
index 18d7a7e..0d40eb4 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
@@ -27,7 +27,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
-public class LSMRTreeFlushOperation implements ILSMIOOperation {
+public class LSMRTreeFlushOperation implements ILSMIOOperation, Comparable<LSMRTreeFlushOperation> {
private final ILSMIndexAccessorInternal accessor;
private final ILSMComponent flushingComponent;
@@ -35,16 +35,18 @@
private final FileReference btreeFlushTarget;
private final FileReference bloomFilterFlushTarget;
private final ILSMIOOperationCallback callback;
+ private final String indexIdentifier;
public LSMRTreeFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
FileReference rtreeFlushTarget, FileReference btreeFlushTarget, FileReference bloomFilterFlushTarget,
- ILSMIOOperationCallback callback) {
+ ILSMIOOperationCallback callback, String indexIdentifier) {
this.accessor = accessor;
this.flushingComponent = flushingComponent;
this.rtreeFlushTarget = rtreeFlushTarget;
this.btreeFlushTarget = btreeFlushTarget;
this.bloomFilterFlushTarget = bloomFilterFlushTarget;
this.callback = callback;
+ this.indexIdentifier = indexIdentifier;
}
@Override
@@ -89,4 +91,19 @@
public ILSMComponent getFlushingComponent() {
return flushingComponent;
}
+
+ @Override
+ public String getIndexUniqueIdentifier() {
+ return indexIdentifier;
+ }
+
+ @Override
+ public LSMIOOpertionType getIOOpertionType() {
+ return LSMIOOpertionType.FLUSH;
+ }
+
+ @Override
+ public int compareTo(LSMRTreeFlushOperation o) {
+ return rtreeFlushTarget.getFile().getName().compareTo(o.getRTreeFlushTarget().getFile().getName());
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
index da7a2fb..ddf3ebf 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
@@ -36,10 +36,11 @@
private final FileReference btreeMergeTarget;
private final FileReference bloomFilterMergeTarget;
private final ILSMIOOperationCallback callback;
+ private final String indexIdentifier;
public LSMRTreeMergeOperation(ILSMIndexAccessorInternal accessor, List<ILSMComponent> mergingComponents,
ITreeIndexCursor cursor, FileReference rtreeMergeTarget, FileReference btreeMergeTarget,
- FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback) {
+ FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier) {
this.accessor = accessor;
this.mergingComponents = mergingComponents;
this.cursor = cursor;
@@ -47,6 +48,7 @@
this.btreeMergeTarget = btreeMergeTarget;
this.bloomFilterMergeTarget = bloomFilterMergeTarget;
this.callback = callback;
+ this.indexIdentifier = indexIdentifier;
}
@Override
@@ -104,4 +106,14 @@
public List<ILSMComponent> getMergingComponents() {
return mergingComponents;
}
+
+ @Override
+ public String getIndexUniqueIdentifier() {
+ return indexIdentifier;
+ }
+
+ @Override
+ public LSMIOOpertionType getIOOpertionType() {
+ return LSMIOOpertionType.MERGE;
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
index b94feba..132e55b 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
@@ -48,6 +48,7 @@
private IndexOperation op;
public final List<ILSMComponent> componentHolder;
+ private final List<ILSMComponent> componentsToBeMerged;
public final IModificationOperationCallback modificationCallback;
public final ISearchOperationCallback searchCallback;
@@ -76,6 +77,7 @@
currentRTreeOpContext = rtreeOpContexts[0];
currentBTreeOpContext = btreeOpContexts[0];
this.componentHolder = new LinkedList<ILSMComponent>();
+ this.componentsToBeMerged = new LinkedList<ILSMComponent>();
this.modificationCallback = modificationCallback;
this.searchCallback = searchCallback;
}
@@ -101,6 +103,7 @@
@Override
public void reset() {
componentHolder.clear();
+ componentsToBeMerged.clear();
}
@Override
@@ -126,4 +129,9 @@
public IModificationOperationCallback getModificationCallback() {
return modificationCallback;
}
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeMerged() {
+ return componentsToBeMerged;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
index 30dd467..f669585 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
@@ -79,7 +79,7 @@
ITupleReference currentTuple = rtreeCursors[currentCursor].getTuple();
boolean killerTupleFound = false;
- for (int i = 0; i <= currentCursor; i++) {
+ for (int i = 0; i < currentCursor; i++) {
btreeCursors[i].reset();
btreeRangePredicate.setHighKey(currentTuple, true);
btreeRangePredicate.setLowKey(currentTuple, true);
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
index dd31165..2e6fc78 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
@@ -25,6 +25,8 @@
public class LSMRTreeSortedCursor extends LSMRTreeAbstractCursor {
+ // TODO: This class can be removed and instead use a search cursor that uses a logic similar
+ // to the one in LSMRTreeWithAntiMatterTuplesSearchCursor
private ILinearizeComparator linearizeCmp;
private boolean[] depletedRtreeCursors;
private int foundIn = -1;
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 812e942..bfd0260 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -40,7 +40,6 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
@@ -76,9 +75,9 @@
ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
ILSMIOOperationCallback ioOpCallback) {
super(virtualBufferCaches, rtreeInteriorFrameFactory, rtreeLeafFrameFactory, btreeInteriorFrameFactory,
- btreeLeafFrameFactory, fileManager, new LSMRTreeWithAntiMatterTuplesDiskComponentFactory(diskRTreeFactory),
- diskFileMapProvider, fieldCount, rtreeCmpFactories, btreeCmpFactories, linearizer, comparatorFields,
- linearizerArray, 0, mergePolicy, opTracker, ioScheduler, ioOpCallback);
+ btreeLeafFrameFactory, fileManager, new LSMRTreeWithAntiMatterTuplesDiskComponentFactory(
+ diskRTreeFactory), diskFileMapProvider, fieldCount, rtreeCmpFactories, btreeCmpFactories,
+ linearizer, comparatorFields, linearizerArray, 0, mergePolicy, opTracker, ioScheduler, ioOpCallback);
bulkLoaComponentFactory = new LSMRTreeWithAntiMatterTuplesDiskComponentFactory(bulkLoadRTreeFactory);
this.bTreeTupleSorter = null;
}
@@ -156,7 +155,7 @@
opCtx.getComponentHolder().add(flushingComponent);
ILSMIndexAccessorInternal accessor = new LSMRTreeWithAntiMatterTuplesAccessor(lsmHarness, opCtx);
ioScheduler.scheduleOperation(new LSMRTreeFlushOperation(accessor, flushingComponent, relFlushFileRefs
- .getInsertIndexFileReference(), null, null, callback));
+ .getInsertIndexFileReference(), null, null, callback, fileManager.getBaseDir()));
}
@Override
@@ -248,11 +247,16 @@
LSMRTreeOpContext rctx = createOpContext(NoOpOperationCallback.INSTANCE);
rctx.setOperation(IndexOperation.MERGE);
List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
- ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor(rctx);
+ boolean returnDeletedTuples = false;
+ if (ctx.getComponentHolder().get(ctx.getComponentHolder().size() - 1) != diskComponents.get(diskComponents
+ .size() - 1)) {
+ returnDeletedTuples = true;
+ }
+ ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor(rctx, returnDeletedTuples);
LSMComponentFileReferences relMergeFileRefs = getMergeTargetFileName(mergingComponents);
ILSMIndexAccessorInternal accessor = new LSMRTreeWithAntiMatterTuplesAccessor(lsmHarness, rctx);
ioScheduler.scheduleOperation(new LSMRTreeMergeOperation(accessor, mergingComponents, cursor, relMergeFileRefs
- .getInsertIndexFileReference(), null, null, callback));
+ .getInsertIndexFileReference(), null, null, callback, fileManager.getBaseDir()));
}
@Override
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
index cbaf3b3..7099d7d 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
@@ -54,7 +54,11 @@
private int numMutableComponents;
public LSMRTreeWithAntiMatterTuplesSearchCursor(ILSMIndexOperationContext opCtx) {
- super(opCtx);
+ this(opCtx, false);
+ }
+
+ public LSMRTreeWithAntiMatterTuplesSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples) {
+ super(opCtx, returnDeletedTuples);
currentCursor = 0;
}
@@ -152,7 +156,7 @@
while (super.hasNext()) {
super.next();
ITupleReference diskRTreeTuple = super.getTuple();
- if (searchMemBTrees(diskRTreeTuple, numMutableComponents - 1)) {
+ if (searchMemBTrees(diskRTreeTuple, numMutableComponents)) {
foundNext = true;
frameTuple = diskRTreeTuple;
return true;
@@ -216,7 +220,7 @@
private boolean searchMemBTrees(ITupleReference tuple, int lastBTreeToSearch) throws HyracksDataException,
IndexException {
- for (int i = 0; i <= lastBTreeToSearch; i++) {
+ for (int i = 0; i < lastBTreeToSearch; i++) {
btreeCursors[i].reset();
btreeRangePredicate.setHighKey(tuple, true);
btreeRangePredicate.setLowKey(tuple, true);
@@ -261,50 +265,4 @@
}
}
}
-
- @Override
- protected void checkPriorityQueue() throws HyracksDataException, IndexException {
- while (!outputPriorityQueue.isEmpty() || needPush == true) {
- if (!outputPriorityQueue.isEmpty()) {
- PriorityQueueElement checkElement = outputPriorityQueue.peek();
- // If there is no previous tuple or the previous tuple can be ignored
- if (outputElement == null) {
- if (isDeleted(checkElement)) {
- // If the key has been deleted then pop it and set needPush to true.
- // We cannot push immediately because the tuple may be
- // modified if hasNext() is called
- outputElement = outputPriorityQueue.poll();
- needPush = true;
- } else {
- break;
- }
- } else {
- // Compare the previous tuple and the head tuple in the PQ
- if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
- // If the previous tuple and the head tuple are
- // identical
- // then pop the head tuple and push the next tuple from
- // the tree of head tuple
-
- // the head element of PQ is useless now
- PriorityQueueElement e = outputPriorityQueue.poll();
- pushIntoPriorityQueue(e);
- } else {
- // If the previous tuple and the head tuple are different
- // the info of previous tuple is useless
- if (needPush == true) {
- pushIntoPriorityQueue(outputElement);
- needPush = false;
- }
- outputElement = null;
- }
- }
- } else {
- // the priority queue is empty and needPush
- pushIntoPriorityQueue(outputElement);
- needPush = false;
- outputElement = null;
- }
- }
- }
}
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index 8090564..2992dfe 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -119,7 +119,8 @@
@Override
public ICachedPage tryPin(long dpid) throws HyracksDataException {
- pinSanityCheck(dpid);
+ // Calling the pinSanityCheck should be used only for debugging, since the synchronized block over the fileInfoMap is a hot spot.
+ //pinSanityCheck(dpid);
CachedPage cPage = null;
int hash = hash(dpid);
CacheBucket bucket = pageMap[hash];
@@ -142,7 +143,8 @@
@Override
public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
- pinSanityCheck(dpid);
+ // Calling the pinSanityCheck should be used only for debugging, since the synchronized block over the fileInfoMap is a hot spot.
+ //pinSanityCheck(dpid);
CachedPage cPage = findPage(dpid, newPage);
if (!newPage) {
// Resolve race of multiple threads trying to read the page from
@@ -760,8 +762,8 @@
BufferedFileHandle fInfo = null;
synchronized (fileInfoMap) {
fInfo = fileInfoMap.get(fileId);
- ioManager.sync(fInfo.getFileHandle(), metadata);
}
+ ioManager.sync(fInfo.getFileHandle(), metadata);
}
@Override
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
index 10b5582..f18099b 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
@@ -46,6 +46,10 @@
public ByteBuffer allocateFrame() {
return jobletContext.allocateFrame();
}
+
+ @Override
+ public void deallocateFrames(int frameCount) {
+ }
@Override
public int getFrameSize() {
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
index f7aa7f4..9729d51 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.storage.am.btree.OrderedIndexTestUtils;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
@@ -70,7 +71,8 @@
}
ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor();
- accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE,
+ ((LSMBTree) ctx.getIndex()).getImmutableComponents());
orderedIndexTestUtils.checkPointSearches(ctx);
orderedIndexTestUtils.checkScan(ctx);
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
index 4a96131..35ecc20 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
@@ -106,7 +106,7 @@
break;
case MERGE:
- accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE, lsmBTree.getImmutableComponents());
break;
default:
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
index 3591f78..fb06a7e 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
@@ -20,10 +20,12 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
+import java.util.concurrent.ThreadFactory;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
@@ -37,9 +39,9 @@
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AsynchronousScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
@@ -50,8 +52,8 @@
public class LSMTreeRunner implements IExperimentRunner {
- private static final int MAX_OPEN_FILES = 10000;
- private static final int HYRACKS_FRAME_SIZE = 128;
+ private static final int MAX_OPEN_FILES = Integer.MAX_VALUE;
+ private static final int HYRACKS_FRAME_SIZE = 131072;
protected IHyracksTaskContext ctx;
protected IOManager ioManager;
@@ -61,7 +63,7 @@
protected final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
protected final static String sep = System.getProperty("file.separator");
- protected final static String classDir = "/lsmtree/";
+ protected final static String classDir = "/tmp/lsmtree/";
protected String onDiskDir;
protected FileReference file;
@@ -71,6 +73,11 @@
protected IBufferCache memBufferCache;
private final int onDiskPageSize;
private final int onDiskNumPages;
+ private final static ThreadFactory threadFactory = new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ return new Thread(r);
+ }
+ };
public LSMTreeRunner(int numBatches, int inMemPageSize, int inMemNumPages, int onDiskPageSize, int onDiskNumPages,
ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories, int[] bloomFilterKeyFields,
@@ -93,11 +100,13 @@
List<IVirtualBufferCache> virtualBufferCaches = new ArrayList<IVirtualBufferCache>();
for (int i = 0; i < 2; i++) {
IVirtualBufferCache virtualBufferCache = new VirtualBufferCache(new HeapBufferAllocator(), inMemPageSize,
- inMemNumPages);
+ inMemNumPages / 2);
virtualBufferCaches.add(virtualBufferCache);
}
- this.ioScheduler = SynchronousScheduler.INSTANCE;
+ this.ioScheduler = AsynchronousScheduler.INSTANCE;
+ AsynchronousScheduler.INSTANCE.init(threadFactory);
+
lsmtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fmp, typeTraits, cmpFactories,
bloomFilterKeyFields, bloomFilterFalsePositiveRate, NoMergePolicy.INSTANCE,
new ThreadCountingTracker(), ioScheduler, NoOpIOOperationCallback.INSTANCE);
@@ -133,15 +142,23 @@
@Override
public void reset() throws Exception {
+ try {
+ lsmtree.deactivate();
+ } catch (HyracksDataException e) {
+ // ignore
+ }
+ try {
+ lsmtree.destroy();
+ } catch (HyracksDataException e) {
+ // ignore
+ }
+
lsmtree.create();
+ lsmtree.activate();
}
@Override
public void deinit() throws Exception {
- bufferCache.closeFile(lsmtreeFileId);
- bufferCache.close();
- memBufferCache.closeFile(lsmtreeFileId);
- memBufferCache.close();
}
public class LSMTreeThread extends Thread {
@@ -166,6 +183,7 @@
} catch (TreeIndexException e) {
}
}
+ dataGen.releaseBatch(batch);
}
} catch (Exception e) {
e.printStackTrace();
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/PerfExperiment.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/PerfExperiment.java
index 641362e..2354550 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/PerfExperiment.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/PerfExperiment.java
@@ -31,18 +31,20 @@
public static void main(String[] args) throws Exception {
// Disable logging so we can better see the output times.
Enumeration<String> loggers = LogManager.getLogManager().getLoggerNames();
- while(loggers.hasMoreElements()) {
+ while (loggers.hasMoreElements()) {
String loggerName = loggers.nextElement();
Logger logger = LogManager.getLogManager().getLogger(loggerName);
logger.setLevel(Level.OFF);
}
-
- int numTuples = 100000; // 100K
+ boolean sorted = Boolean.parseBoolean(args[0]);
+ int numThreads = Integer.parseInt(args[1]);
+
+ //int numTuples = 100000; // 100K
//int numTuples = 1000000; // 1M
//int numTuples = 2000000; // 2M
//int numTuples = 3000000; // 3M
//int numTuples = 10000000; // 10M
- //int numTuples = 20000000; // 20M
+ int numTuples = 20000000; // 20M
//int numTuples = 30000000; // 30M
//int numTuples = 40000000; // 40M
//int numTuples = 60000000; // 60M
@@ -50,27 +52,41 @@
//int numTuples = 200000000; // 200M
int batchSize = 10000;
int numBatches = numTuples / batchSize;
-
+
+ int payLoadSize = 240;
ISerializerDeserializer[] fieldSerdes = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE };
- ITypeTraits[] typeTraits = SerdeUtils.serdesToTypeTraits(fieldSerdes, 30);
-
- IBinaryComparatorFactory[] cmpFactories = SerdeUtils.serdesToComparatorFactories(fieldSerdes, fieldSerdes.length);
-
+ ITypeTraits[] typeTraits = SerdeUtils.serdesToTypeTraits(fieldSerdes, payLoadSize);
+
+ IBinaryComparatorFactory[] cmpFactories = SerdeUtils.serdesToComparatorFactories(fieldSerdes,
+ fieldSerdes.length);
+ int[] bloomFilterKeyFields = new int[cmpFactories.length];
+ for (int i = 0; i < bloomFilterKeyFields.length; i++) {
+ bloomFilterKeyFields[i] = i;
+ }
+ double bloomFilterFalsePositiveRate = 0.01;
+
//int repeats = 1000;
int repeats = 1;
long[] times = new long[repeats];
- int numThreads = 2;
+// int numThreads = 4;
+// boolean sorted = true;
for (int i = 0; i < repeats; i++) {
//ConcurrentSkipListRunner runner = new ConcurrentSkipListRunner(numBatches, batchSize, tupleSize, typeTraits, cmp);
- InMemoryBTreeRunner runner = new InMemoryBTreeRunner(numBatches, 8192, 100000, typeTraits, cmpFactories);
+ //InMemoryBTreeRunner runner = new InMemoryBTreeRunner(numBatches, 8192, 100000, typeTraits, cmpFactories);
//BTreeBulkLoadRunner runner = new BTreeBulkLoadRunner(numBatches, 8192, 100000, typeTraits, cmp, 1.0f);
- //BTreeRunner runner = new BTreeRunner(numBatches, 8192, 100000, typeTraits, cmp);
- //String btreeName = "071211";
- //BTreeSearchRunner runner = new BTreeSearchRunner(btreeName, 10, numBatches, 8192, 25000, typeTraits, cmp);
- //LSMTreeRunner runner = new LSMTreeRunner(numBatches, 8192, 100, 8192, 250, typeTraits, cmp);
- //LSMTreeSearchRunner runner = new LSMTreeSearchRunner(100000, numBatches, 8192, 24750, 8192, 250, typeTraits, cmp);
- DataGenThread dataGen = new DataGenThread(numThreads, numBatches, batchSize, fieldSerdes, 30, 50, 10, false);
+ //BTreeRunner runner = new BTreeRunner(numBatches, 8192, 100000, typeTraits, cmp);
+ //String btreeName = "071211";
+ //BTreeSearchRunner runner = new BTreeSearchRunner(btreeName, 10, numBatches, 8192, 25000, typeTraits, cmp);
+ //LSMTreeRunner runner = new LSMTreeRunner(numBatches, 8192, 100, 8192, 250, typeTraits, cmp);
+ //LSMTreeSearchRunner runner = new LSMTreeSearchRunner(100000, numBatches, 8192, 24750, 8192, 250, typeTraits, cmp);
+ int inMemPageSize = 131072; // 128kb
+ int onDiskPageSize = inMemPageSize;
+ int inMemNumPages = 8192; // 1GB
+ int onDiskNumPages = 16384; // 2GB
+ LSMTreeRunner runner = new LSMTreeRunner(numBatches, inMemPageSize, inMemNumPages, onDiskPageSize,
+ onDiskNumPages, typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate);
+ DataGenThread dataGen = new DataGenThread(numThreads, numBatches, batchSize, fieldSerdes, payLoadSize, 50, 10, sorted);
dataGen.start();
runner.reset();
times[i] = runner.runExperiment(dataGen, numThreads);
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java
index 5115b7e..3713498 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java
@@ -25,6 +25,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndex;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestUtils;
@@ -54,7 +55,8 @@
invIndex.activate();
}
// Perform merge.
- invIndexAccessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ invIndexAccessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE,
+ ((LSMInvertedIndex) invIndex).getImmutableComponents());
validateAndCheckIndex(testCtx);
runTinySearchWorkload(testCtx, tupleGen);
}
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
index 523557d..53076fa 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
@@ -25,6 +25,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndex;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestUtils;
@@ -54,7 +55,8 @@
invIndex.activate();
}
// Perform merge.
- invIndexAccessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ invIndexAccessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE,
+ ((LSMInvertedIndex) invIndex).getImmutableComponents());
validateAndCheckIndex(testCtx);
runTinySearchWorkload(testCtx, tupleGen);
}
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java
index e1570af..35afd09 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java
@@ -109,7 +109,7 @@
}
case MERGE: {
- accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE, invIndex.getImmutableComponents());
break;
}
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
index 18528c4..a410b2a 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.AbstractLSMRTree;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeTestContext;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeTestDriver;
import edu.uci.ics.hyracks.storage.am.rtree.RTreeTestUtils;
@@ -72,7 +73,8 @@
}
ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor();
- accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE,
+ ((AbstractLSMRTree) ctx.getIndex()).getImmutableComponents());
rTreeTestUtils.checkScan(ctx);
rTreeTestUtils.checkDiskOrderScan(ctx);
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
index b3fddc8..c7a86d9 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
@@ -73,7 +73,7 @@
break;
case MERGE:
- accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE, lsmRTree.getImmutableComponents());
break;
default:
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
index 12d7742..383cbc4 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
@@ -25,6 +25,7 @@
import edu.uci.ics.hyracks.storage.am.common.datagen.DataGenThread;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.AbstractLSMRTree;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeWithAntiMatterTuples.LSMRTreeWithAntiMatterTuplesAccessor;
import edu.uci.ics.hyracks.storage.am.rtree.impls.SearchPredicate;
@@ -61,7 +62,8 @@
break;
case MERGE:
- accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE,
+ ((AbstractLSMRTree) lsmRTree).getImmutableComponents());
break;
default:
diff --git a/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheTest.java b/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheTest.java
index 7e71aea..c67386e 100644
--- a/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheTest.java
@@ -96,8 +96,9 @@
}
bufferCache.closeFile(fileId);
-
- boolean exceptionThrown = false;
+
+ // This code is commented because the method pinSanityCheck in the BufferCache is commented.
+ /*boolean exceptionThrown = false;
// tryPin should fail since file is not open
try {
@@ -114,7 +115,7 @@
} catch (HyracksDataException e) {
exceptionThrown = true;
}
- Assert.assertTrue(exceptionThrown);
+ Assert.assertTrue(exceptionThrown);*/
// open file again
bufferCache.openFile(fileId);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 88edc91..b345e01 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -25,7 +25,9 @@
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.logging.Logger;
@@ -75,7 +77,7 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
@@ -136,6 +138,12 @@
protected int frameSize = ClusterConfig.getFrameSize();
protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
+ private static final Map<String, String> MERGE_POLICY_PROPERTIES;
+ static {
+ MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+ MERGE_POLICY_PROPERTIES.put("num-components", "3");
+ }
+
protected static final String SECONDARY_INDEX_ODD = "secondary1";
protected static final String SECONDARY_INDEX_EVEN = "secondary2";
@@ -478,9 +486,9 @@
protected IIndexDataflowHelperFactory getIndexDataflowHelperFactory() {
if (BspUtils.useLSM(conf)) {
- return new LSMBTreeDataflowHelperFactory(new VirtualBufferCacheProvider(), new ConstantMergePolicyProvider(
- 3), NoOpOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
- NoOpIOOperationCallback.INSTANCE, 0.01);
+ return new LSMBTreeDataflowHelperFactory(new VirtualBufferCacheProvider(),
+ new ConstantMergePolicyFactory(), MERGE_POLICY_PROPERTIES, NoOpOperationTrackerProvider.INSTANCE,
+ SynchronousSchedulerProvider.INSTANCE, NoOpIOOperationCallback.INSTANCE, 0.01);
} else {
return new BTreeDataflowHelperFactory();
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
index acd766e..d243c8a 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
@@ -99,7 +99,7 @@
}
@Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
@@ -113,6 +113,7 @@
throw new HyracksDataException(e);
}
}
+ return true;
}
@Override
@@ -121,7 +122,7 @@
}
@Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
throw new IllegalStateException("this method should not be called");
}