merged master back
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
index 7efee75..b9ac62f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
@@ -23,12 +23,22 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
public class UnnestOperator extends AbstractUnnestOperator {
private LogicalVariable positionalVariable;
+
+ /**
+ * Used to set the position offset for positional variable
+ */
+ private ILogicalExpression positionOffsetExpr;
+
+ /**
+ * Specify the type of the positional variable
+ */
private Object positionalVariableType;
public UnnestOperator(LogicalVariable variable, Mutable<ILogicalExpression> expression) {
@@ -67,6 +77,14 @@
return positionalVariableType;
}
+ public void setPositionOffsetExpr(ILogicalExpression posOffsetExpr) {
+ this.positionOffsetExpr = posOffsetExpr;
+ }
+
+ public ILogicalExpression getPositionOffsetExpr() {
+ return this.positionOffsetExpr;
+ }
+
@Override
public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
return visitor.visitUnnestOperator(this, arg);
@@ -88,4 +106,24 @@
}
return env;
}
+
+ @Override
+ public VariablePropagationPolicy getVariablePropagationPolicy() {
+ return new VariablePropagationPolicy() {
+
+ @Override
+ public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
+ throws AlgebricksException {
+ if (sources.length > 0) {
+ target.addAllVariables(sources[0]);
+ }
+ for (LogicalVariable v : variables) {
+ target.addVariable(v);
+ }
+ if (positionalVariable != null) {
+ target.addVariable(positionalVariable);
+ }
+ }
+ };
+ }
}
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index cebddee..3a2617f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -404,7 +404,8 @@
@Override
public Void visitRunningAggregateOperator(RunningAggregateOperator op, IOptimizationContext ctx)
throws AlgebricksException {
- propagateFDsAndEquivClasses(op, ctx);
+ ctx.putEquivalenceClassMap(op, new HashMap<LogicalVariable, EquivalenceClass>());
+ ctx.putFDList(op, new ArrayList<FunctionalDependency>());
return null;
}
@@ -646,4 +647,4 @@
return null;
}
-}
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
index 51b009f..bcd998d 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
@@ -34,6 +35,7 @@
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.NestedPlansAccumulatingAggregatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.NestedPlansRunningAggregatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -76,13 +78,19 @@
}
// compile subplans and set the gby op. schema accordingly
AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], gby, opSchema, context);
- IAggregatorDescriptorFactory aggregatorFactory = new NestedPlansAccumulatingAggregatorFactory(subplans, keys,
- fdColumns);
+ IAggregatorDescriptorFactory aggregatorFactory;
+
+ if (((AbstractLogicalOperator) (gby.getNestedPlans().get(0).getRoots().get(0).getValue())).getOperatorTag() == LogicalOperatorTag.RUNNINGAGGREGATE) {
+ aggregatorFactory = new NestedPlansRunningAggregatorFactory(subplans, keys, fdColumns);
+ } else {
+ aggregatorFactory = new NestedPlansAccumulatingAggregatorFactory(subplans, keys, fdColumns);
+ }
IOperatorDescriptorRegistry spec = builder.getJobSpec();
IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
columnList, context.getTypeEnvironment(op), context);
- RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
+ RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
+ context);
PreclusteredGroupOperatorDescriptor opDesc = new PreclusteredGroupOperatorDescriptor(spec, keys,
comparatorFactories, aggregatorFactory, recordDescriptor);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
index 21def02..5cbcdeb 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
@@ -28,6 +28,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
@@ -62,22 +63,25 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
IPhysicalPropertiesVector reqdByParent) {
- IPartitioningProperty pp = null;
- RunningAggregateOperator ragg = (RunningAggregateOperator) op;
- for (Mutable<ILogicalExpression> exprRef : ragg.getExpressions()) {
- StatefulFunctionCallExpression f = (StatefulFunctionCallExpression) exprRef.getValue();
- IPartitioningProperty p = f.getRequiredPartitioningProperty();
- if (p != null) {
- if (pp == null) {
- pp = p;
- } else {
- throw new IllegalStateException("Two stateful functions want to set partitioning requirements: "
- + pp + " and " + p);
- }
- }
- }
- StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, null) };
- return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+
+ return emptyUnaryRequirements();
+
+ // IPartitioningProperty pp = null;
+ // RunningAggregateOperator ragg = (RunningAggregateOperator) op;
+ // for (Mutable<ILogicalExpression> exprRef : ragg.getExpressions()) {
+ // StatefulFunctionCallExpression f = (StatefulFunctionCallExpression) exprRef.getValue();
+ // IPartitioningProperty p = f.getRequiredPartitioningProperty();
+ // if (p != null) {
+ // if (pp == null) {
+ // pp = p;
+ // } else {
+ // throw new IllegalStateException("Two stateful functions want to set partitioning requirements: "
+ // + pp + " and " + p);
+ // }
+ // }
+ // }
+ // StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, null) };
+ // return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
}
@Override
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
index bf45e18..bf2f68a 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
@@ -30,6 +30,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.UnnestRuntimeFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -57,10 +58,7 @@
IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
UnnestOperator unnest = (UnnestOperator) op;
- if (unnest.getPositionalVariable() != null) {
- throw new AlgebricksException("Cannot generate runtime for unnest with positional variable "
- + unnest.getPositionalVariable());
- }
+
int outCol = opSchema.findVariable(unnest.getVariable());
ILogicalExpression unnestExpr = unnest.getExpressionRef().getValue();
IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
@@ -79,8 +77,18 @@
UnnestingFunctionCallExpression agg = (UnnestingFunctionCallExpression) unnestExpr;
IUnnestingEvaluatorFactory unnestingFactory = expressionRuntimeProvider.createUnnestingFunctionFactory(agg,
context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
+
+ // for position offset
+ ILogicalExpression posOffsetExpr = unnest.getPositionOffsetExpr();
+ IScalarEvaluatorFactory posOffsetExprEvalFactory = null;
+ if (posOffsetExpr != null) {
+ posOffsetExprEvalFactory = expressionRuntimeProvider.createEvaluatorFactory(posOffsetExpr,
+ context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
+ }
+
int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
- UnnestRuntimeFactory unnestRuntime = new UnnestRuntimeFactory(outCol, unnestingFactory, projectionList);
+ UnnestRuntimeFactory unnestRuntime = new UnnestRuntimeFactory(outCol, unnestingFactory, projectionList,
+ unnest.getPositionalVariable() != null, posOffsetExprEvalFactory);
RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
builder.contributeMicroOperator(unnest, unnestRuntime, recDesc);
ILogicalOperator src = unnest.getInputs().get(0).getValue();
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java
index af11b70..82e6970 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java
@@ -28,6 +28,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
public class IntroHashPartitionMergeExchange implements IAlgebraicRewriteRule {
@@ -60,6 +61,7 @@
hpe.getDomain());
op1.setPhysicalOperator(hpme);
op1.getInputs().get(0).setValue(op2.getInputs().get(0).getValue());
+ op1.computeDeliveredPhysicalProperties(context);
return true;
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index 8f2eaac..09354a5 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -28,11 +28,11 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-public class NestedPlansAccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulatingAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
private AlgebricksPipeline[] subplans;
@@ -95,7 +95,7 @@
}
@Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
for (int i = 0; i < pipelines.length; i++) {
outputWriter.setInputIdx(i);
@@ -114,6 +114,7 @@
offset = fieldEnds[i] - start;
tupleBuilder.addField(data, start, offset);
}
+ return true;
}
@Override
@@ -127,7 +128,7 @@
}
@Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
throw new IllegalStateException("this method should not be called");
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
new file mode 100644
index 0000000..dc9c805
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -0,0 +1,252 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class NestedPlansRunningAggregatorFactory implements IAggregatorDescriptorFactory {
+
+ private static final long serialVersionUID = 1L;
+ private AlgebricksPipeline[] subplans;
+ private int[] keyFieldIdx;
+ private int[] decorFieldIdx;
+
+ public NestedPlansRunningAggregatorFactory(AlgebricksPipeline[] subplans, int[] keyFieldIdx, int[] decorFieldIdx) {
+ this.subplans = subplans;
+ this.keyFieldIdx = keyFieldIdx;
+ this.decorFieldIdx = decorFieldIdx;
+ }
+
+ /* (non-Javadoc)
+ * @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[], int[])
+ */
+ @Override
+ public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults,
+ final IFrameWriter writer) throws HyracksDataException {
+ final RunningAggregatorOutput outputWriter = new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length,
+ decorFieldIdx.length, writer);
+ final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
+ for (int i = 0; i < subplans.length; i++) {
+ try {
+ pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], outputWriter, ctx);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ final ArrayTupleBuilder gbyTb = outputWriter.getGroupByTupleBuilder();
+
+ final ByteBuffer outputFrame = ctx.allocateFrame();
+ final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+ outputAppender.reset(outputFrame, true);
+
+ return new IAggregatorDescriptor() {
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+
+ for (int i = 0; i < pipelines.length; ++i) {
+ pipelines[i].open();
+ }
+
+ gbyTb.reset();
+ for (int i = 0; i < keyFieldIdx.length; ++i) {
+ gbyTb.addField(accessor, tIndex, keyFieldIdx[i]);
+ }
+ for (int i = 0; i < decorFieldIdx.length; ++i) {
+ gbyTb.addField(accessor, tIndex, decorFieldIdx[i]);
+ }
+
+ // aggregate the first tuple
+ for (int i = 0; i < pipelines.length; i++) {
+ outputWriter.setInputIdx(i);
+ pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
+ pipelines[i].forceFlush();
+ }
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ for (int i = 0; i < pipelines.length; i++) {
+ outputWriter.setInputIdx(i);
+ pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
+ pipelines[i].forceFlush();
+ }
+ }
+
+ @Override
+ public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ return false;
+ }
+
+ @Override
+ public AggregateState createAggregateStates() {
+ return new AggregateState();
+ }
+
+ @Override
+ public void reset() {
+
+ }
+
+ @Override
+ public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
+ int tIndex, AggregateState state) throws HyracksDataException {
+ throw new IllegalStateException("this method should not be called");
+ }
+
+ @Override
+ public void close() {
+
+ }
+ };
+ }
+
+ private IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, IHyracksTaskContext ctx)
+ throws AlgebricksException, HyracksDataException {
+ // plug the operators
+ IFrameWriter start = writer;
+ IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
+ RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
+ for (int i = runtimeFactories.length - 1; i >= 0; i--) {
+ IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx);
+ newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
+ if (i > 0) {
+ newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]);
+ } else {
+ // the nts has the same input and output rec. desc.
+ newRuntime.setInputRecordDescriptor(0, recordDescriptors[0]);
+ }
+ start = newRuntime;
+ }
+ return start;
+ }
+
+ private static class RunningAggregatorOutput implements IFrameWriter {
+
+ private FrameTupleAccessor[] tAccess;
+ private RecordDescriptor[] inputRecDesc;
+ private int inputIdx;
+ private ArrayTupleBuilder tb;
+ private ArrayTupleBuilder gbyTb;
+ private AlgebricksPipeline[] subplans;
+ private IFrameWriter outputWriter;
+ private ByteBuffer outputFrame;
+ private FrameTupleAppender outputAppender;
+
+ public RunningAggregatorOutput(IHyracksTaskContext ctx, AlgebricksPipeline[] subplans, int numKeys,
+ int numDecors, IFrameWriter outputWriter) throws HyracksDataException {
+ this.subplans = subplans;
+ this.outputWriter = outputWriter;
+
+ // this.keyFieldIndexes = keyFieldIndexes;
+ int totalAggFields = 0;
+ this.inputRecDesc = new RecordDescriptor[subplans.length];
+ for (int i = 0; i < subplans.length; i++) {
+ RecordDescriptor[] rd = subplans[i].getRecordDescriptors();
+ this.inputRecDesc[i] = rd[rd.length - 1];
+ totalAggFields += subplans[i].getOutputWidth();
+ }
+ tb = new ArrayTupleBuilder(numKeys + numDecors + totalAggFields);
+ gbyTb = new ArrayTupleBuilder(numKeys + numDecors);
+
+ this.tAccess = new FrameTupleAccessor[inputRecDesc.length];
+ for (int i = 0; i < inputRecDesc.length; i++) {
+ tAccess[i] = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc[i]);
+ }
+
+ this.outputFrame = ctx.allocateFrame();
+ this.outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+ this.outputAppender.reset(outputFrame, true);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ int w = subplans[inputIdx].getOutputWidth();
+ IFrameTupleAccessor accessor = tAccess[inputIdx];
+ accessor.reset(buffer);
+ for (int tIndex = 0; tIndex < accessor.getTupleCount(); tIndex++) {
+ tb.reset();
+ byte[] data = gbyTb.getByteArray();
+ int[] fieldEnds = gbyTb.getFieldEndOffsets();
+ int start = 0;
+ int offset = 0;
+ for (int i = 0; i < fieldEnds.length; i++) {
+ if (i > 0)
+ start = fieldEnds[i - 1];
+ offset = fieldEnds[i] - start;
+ tb.addField(data, start, offset);
+ }
+ for (int f = 0; f < w; f++) {
+ tb.addField(accessor, tIndex, f);
+ }
+ if (!outputAppender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(outputFrame, outputWriter);
+ outputAppender.reset(outputFrame, true);
+ if (!outputAppender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new HyracksDataException(
+ "Failed to write a running aggregation result into an empty frame: possibly the size of the result is too large.");
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ // clearFrame();
+ }
+
+ public void setInputIdx(int inputIdx) {
+ this.inputIdx = inputIdx;
+ }
+
+ public ArrayTupleBuilder getGroupByTupleBuilder() {
+ return gbyTb;
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+
+ }
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
index d4161fc..39db3e1 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
@@ -25,11 +25,11 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-public class SerializableAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+public class SerializableAggregatorDescriptorFactory extends AbstractAccumulatingAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
private ICopySerializableAggregateFunctionFactory[] aggFactories;
@@ -109,7 +109,7 @@
}
@Override
- public void outputPartialResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+ public boolean outputPartialResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
byte[] data = accessor.getBuffer().array();
int startOffset = accessor.getTupleStartOffset(tIndex);
@@ -125,10 +125,11 @@
throw new HyracksDataException(e);
}
}
+ return true;
}
@Override
- public void outputFinalResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+ public boolean outputFinalResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
byte[] data = accessor.getBuffer().array();
int startOffset = accessor.getTupleStartOffset(tIndex);
@@ -144,6 +145,7 @@
throw new HyracksDataException(e);
}
}
+ return true;
}
@Override
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
index 8e472ec..9b638b4 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
@@ -25,11 +25,11 @@
import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-public class SimpleAlgebricksAccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+public class SimpleAlgebricksAccumulatingAggregatorFactory extends AbstractAccumulatingAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
private IAggregateEvaluatorFactory[] aggFactories;
@@ -86,7 +86,7 @@
}
@Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
IAggregateEvaluator[] agg = (IAggregateEvaluator[]) state.state;
for (int i = 0; i < agg.length; i++) {
@@ -97,6 +97,7 @@
throw new HyracksDataException(e);
}
}
+ return true;
}
@Override
@@ -118,7 +119,7 @@
}
@Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
throw new IllegalStateException("this method should not be called");
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 6c1dd5e..b079c3e 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -51,8 +51,8 @@
FrameUtils.flushFrame(frame, writer);
appender.reset(frame, true);
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException(
- "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendToFrameFromTupleBuilder).");
+ throw new HyracksDataException(
+ "Could not write frame: the size of the tuple is too long to be fit into a single frame. (AbstractOneInputOneOutputOneFramePushRuntime.appendToFrameFromTupleBuilder)");
}
}
if (flushFrame) {
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
index 200e5c5..7ecb288 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
@@ -63,8 +63,6 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- final IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc,
- outRecordDesc, groupFields, groupFields);
final ByteBuffer copyFrame = ctx.allocateFrame();
final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
copyFrameAccessor.reset(copyFrame);
@@ -78,6 +76,8 @@
@Override
public void open() throws HyracksDataException {
+ IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc,
+ outRecordDesc, groupFields, groupFields, writer);
pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregator, inRecordDesc,
outRecordDesc, writer);
pgw.open();
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 0499684..e5bede2 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -107,7 +107,8 @@
FrameUtils.flushFrame(frame, writer);
appender.reset(frame, true);
if (!appender.appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t)) {
- throw new IllegalStateException("Could not write frame.");
+ throw new HyracksDataException(
+ "Could not write frame: subplan result is larger than the single-frame limit.");
}
}
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index 5486fae..55dbcbb 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -21,6 +21,7 @@
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
public class NestedTupleSourceRuntimeFactory implements IPushRuntimeFactory {
@@ -64,5 +65,10 @@
public void fail() throws HyracksDataException {
writer.fail();
}
+
+ public void forceFlush() throws HyracksDataException {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ }
}
}
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index e92724a..cfc2bb6 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -28,6 +28,7 @@
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
@@ -89,6 +90,10 @@
@Override
public void open() throws HyracksDataException {
+ if(!first){
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ }
initAccessAppendRef(ctx);
if (first) {
first = false;
@@ -96,12 +101,18 @@
for (int i = 0; i < n; i++) {
try {
raggs[i] = runningAggregates[i].createRunningAggregateEvaluator();
- raggs[i].init();
} catch (AlgebricksException ae) {
throw new HyracksDataException(ae);
}
}
}
+ for (int i = 0; i < runningAggregates.length; i++) {
+ try {
+ raggs[i].init();
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ }
writer.open();
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index df70437..88dc19f 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -14,11 +14,15 @@
*/
package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+import java.io.IOException;
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.evaluators.ConstantEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -26,6 +30,7 @@
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
@@ -36,12 +41,20 @@
private int outColPos;
private final boolean outColIsProjected;
+ private final boolean hasPositionalVariable;
+ private IScalarEvaluatorFactory posOffsetEvalFactory;
+
// Each time step() is called on the aggregate, a new value is written in
// its output. One byte is written before that value and is neglected.
// By convention, if the aggregate function writes nothing, it means it
// produced the last value.
public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory unnestingFactory, int[] projectionList) {
+ this(outCol, unnestingFactory, projectionList, false, null);
+ }
+
+ public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory unnestingFactory, int[] projectionList,
+ boolean hashPositionalVariable, IScalarEvaluatorFactory posOffsetEvalFactory) {
super(projectionList);
this.outCol = outCol;
this.unnestingFactory = unnestingFactory;
@@ -52,6 +65,11 @@
}
}
outColIsProjected = outColPos >= 0;
+ this.hasPositionalVariable = hashPositionalVariable;
+ this.posOffsetEvalFactory = posOffsetEvalFactory;
+ if (this.posOffsetEvalFactory == null) {
+ this.posOffsetEvalFactory = new ConstantEvaluatorFactory(new byte[5]);
+ }
}
@Override
@@ -68,6 +86,9 @@
private IUnnestingEvaluator agg;
private ArrayTupleBuilder tupleBuilder;
+ private int tupleCount;
+ private IScalarEvaluator offsetEval = posOffsetEvalFactory.createScalarEvaluator(ctx);
+
@Override
public void open() throws HyracksDataException {
initAccessAppendRef(ctx);
@@ -77,6 +98,7 @@
throw new HyracksDataException(ae);
}
tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+ tupleCount = 1;
writer.open();
}
@@ -86,6 +108,16 @@
int nTuple = tAccess.getTupleCount();
for (int t = 0; t < nTuple; t++) {
tRef.reset(tAccess, t);
+
+ try {
+ offsetEval.evaluate(tRef, p);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+
+ @SuppressWarnings("static-access")
+ int offset = IntegerSerializerDeserializer.INSTANCE.getInt(p.getByteArray(), p.getStartOffset());
+
try {
agg.init(tRef);
boolean goon = true;
@@ -94,21 +126,33 @@
if (!agg.step(p)) {
goon = false;
} else {
- if (!outColIsProjected) {
+
+ if (!outColIsProjected && !hasPositionalVariable) {
appendProjectionToFrame(t, projectionList);
} else {
for (int f = 0; f < outColPos; f++) {
tupleBuilder.addField(tAccess, t, f);
}
- tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
- for (int f = outColPos + 1; f < projectionList.length; f++) {
+ if (outColIsProjected) {
+ tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
+ } else {
+ tupleBuilder.addField(tAccess, t, outColPos);
+ }
+ for (int f = outColPos + 1; f < (hasPositionalVariable ? projectionList.length - 1
+ : projectionList.length); f++) {
tupleBuilder.addField(tAccess, t, f);
}
}
+ if (hasPositionalVariable) {
+ // Write the positional variable as an INT32
+ tupleBuilder.getDataOutput().writeByte(3);
+ tupleBuilder.getDataOutput().writeInt(offset + tupleCount++);
+ tupleBuilder.addFieldEndOffset();
+ }
appendToFrameFromTupleBuilder(tupleBuilder);
}
} while (goon);
- } catch (AlgebricksException ae) {
+ } catch (AlgebricksException | IOException ae) {
throw new HyracksDataException(ae);
}
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
index c75f9f9..1af32de 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
@@ -25,4 +25,6 @@
public IIOManager getIOManager();
public ByteBuffer allocateFrame() throws HyracksDataException;
+
+ public void deallocateFrames(int frameCount);
}
\ No newline at end of file
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
index aa7008f..3ea81ef 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
@@ -41,4 +41,10 @@
return ByteBuffer.allocate(frameSize);
}
+ @Override
+ public void deallocateFrames(int frameCount) {
+ // TODO Auto-generated method stub
+
+ }
+
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index c72ced1..20075ff 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -115,6 +115,11 @@
}
@Override
+ public void deallocateFrames(int frameCount) {
+ joblet.deallocateFrames(frameCount);
+ }
+
+ @Override
public int getFrameSize() {
return joblet.getFrameSize();
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/memory/MemoryManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/memory/MemoryManager.java
index 8add260..4829173 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/memory/MemoryManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/memory/MemoryManager.java
@@ -26,10 +26,11 @@
@Override
public boolean allocate(long memory) {
- if (this.memory.addAndGet(-memory) < 0) {
- this.memory.addAndGet(memory);
- return false;
- }
+ // commented as now the deallocation is not implemented yet.
+ // if (this.memory.addAndGet(-memory) < 0) {
+ // this.memory.addAndGet(memory);
+ // return false;
+ // }
return true;
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..e2fe892
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractAccumulatingAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ /* (non-Javadoc)
+ * @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[], int[], edu.uci.ics.hyracks.api.comm.IFrameWriter)
+ */
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults, IFrameWriter writer)
+ throws HyracksDataException {
+ return this
+ .createAggregator(ctx, inRecordDescriptor, outRecordDescriptor, keyFields, keyFieldsInPartialResults);
+ }
+
+ abstract public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
+ RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int[] keyFields,
+ final int[] keyFieldsInPartialResults) throws HyracksDataException;
+
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractRunningAggregatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractRunningAggregatorDescriptor.java
new file mode 100644
index 0000000..3dcd4c3
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractRunningAggregatorDescriptor.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public abstract class AbstractRunningAggregatorDescriptor implements IAggregatorDescriptor {
+
+ /* (non-Javadoc)
+ * @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor#outputPartialResult(edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder, edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor, int, edu.uci.ics.hyracks.dataflow.std.group.AggregateState)
+ */
+ @Override
+ public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor#outputFinalResult(edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder, edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor, int, edu.uci.ics.hyracks.dataflow.std.group.AggregateState)
+ */
+ @Override
+ public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ return false;
+ }
+
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index ecd5284..df74a93 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -38,9 +38,6 @@
import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
-/**
- *
- */
public class HashSpillableTableFactory implements ISpillableTableFactory {
private static final long serialVersionUID = 1L;
@@ -104,7 +101,7 @@
}
final IAggregatorDescriptor aggregator = aggregateFactory.createAggregator(ctx, inRecordDescriptor,
- outRecordDescriptor, keyFields, keyFieldsInPartialResults);
+ outRecordDescriptor, keyFields, keyFieldsInPartialResults, null);
final AggregateState aggregateState = aggregator.createAggregateStates();
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
index afe460c..f9d4cd3 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
@@ -18,9 +18,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-/**
- *
- */
public interface IAggregatorDescriptor {
/**
@@ -82,9 +79,10 @@
* @param offset
* @param state
* The aggregation state.
+ * @return TODO
* @throws HyracksDataException
*/
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException;
/**
@@ -97,9 +95,10 @@
* @param offset
* @param state
* The aggregation state.
+ * @return TODO
* @throws HyracksDataException
*/
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException;
public void close();
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
index a4c0564..241956b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
@@ -16,17 +16,15 @@
import java.io.Serializable;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-/**
- *
- */
public interface IAggregatorDescriptorFactory extends Serializable {
IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults)
- throws HyracksDataException;
+ RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults,
+ IFrameWriter writer) throws HyracksDataException;
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
index 239f24f..08b2218 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
@@ -21,16 +21,14 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
-/**
- *
- */
-public class MultiFieldsAggregatorFactory implements IAggregatorDescriptorFactory {
+public class MultiFieldsAggregatorFactory extends AbstractAccumulatingAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
private final IFieldAggregateDescriptorFactory[] aggregatorFactories;
@@ -78,8 +76,8 @@
}
@Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
+ public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
+ int tIndex, AggregateState state) throws HyracksDataException {
DataOutput dos = tupleBuilder.getDataOutput();
int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -90,11 +88,11 @@
((AggregateState[]) state.state)[i]);
tupleBuilder.addFieldEndOffset();
}
-
+ return true;
}
@Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
DataOutput dos = tupleBuilder.getDataOutput();
@@ -110,6 +108,7 @@
}
tupleBuilder.addFieldEndOffset();
}
+ return true;
}
@Override
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
index 21b3b60..c41e9ca 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
@@ -91,7 +91,7 @@
}
aggregator = mergerFactory.createAggregator(ctx, outRecordDescriptor, outRecordDescriptor, keyFields,
- keyFieldsInPartialResults);
+ keyFieldsInPartialResults, writer);
aggregateState = aggregator.createAggregateStates();
storedKeys = new int[keyFields.length];
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java
index f8ba003..3c0eb2b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java
@@ -121,7 +121,7 @@
}
this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDescriptor, outRecordDescriptor, fields,
- keyFieldsInPartialResults);
+ keyFieldsInPartialResults, null);
this.aggregateStates = new AggregateState[INIT_AGG_STATE_SIZE];
accumulatorSize = 0;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
index 35cb9e1..71af928 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
@@ -55,7 +55,7 @@
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
final IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDescriptor,
- outRecordDescriptor, groupFields, groupFields);
+ outRecordDescriptor, groupFields, groupFields, writer);
final ByteBuffer copyFrame = ctx.allocateFrame();
final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDescriptor);
copyFrameAccessor.reset(copyFrame);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 7773765..c7a9f6d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -44,6 +44,8 @@
private boolean first;
+ private boolean isFailed = false;
+
public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc,
IFrameWriter writer) throws HyracksDataException {
@@ -119,9 +121,9 @@
for (int j = 0; j < groupFields.length; j++) {
tupleBuilder.addField(lastTupleAccessor, lastTupleIndex, groupFields[j]);
}
- aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState);
+ boolean hasOutput = aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState);
- if (!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ if (hasOutput && !appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
tupleBuilder.getSize())) {
FrameUtils.flushFrame(outFrame, writer);
appender.reset(outFrame, true);
@@ -149,12 +151,13 @@
@Override
public void fail() throws HyracksDataException {
+ isFailed = true;
writer.fail();
}
@Override
public void close() throws HyracksDataException {
- if (!first) {
+ if (!isFailed && !first) {
writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
if (appender.getTupleCount() > 0) {
FrameUtils.flushFrame(outFrame, writer);
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 67d1796..4f586f9 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -565,6 +565,12 @@
public ByteBuffer allocateFrame() {
return ByteBuffer.allocate(FRAME_SIZE);
}
+
+ @Override
+ public void deallocateFrames(int frameCount) {
+ // TODO Auto-generated method stub
+
+ }
}
@Override
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
index 10b5582..f18099b 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
@@ -46,6 +46,10 @@
public ByteBuffer allocateFrame() {
return jobletContext.allocateFrame();
}
+
+ @Override
+ public void deallocateFrames(int frameCount) {
+ }
@Override
public int getFrameSize() {
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
index acd766e..d243c8a 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
@@ -99,7 +99,7 @@
}
@Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
@@ -113,6 +113,7 @@
throw new HyracksDataException(e);
}
}
+ return true;
}
@Override
@@ -121,7 +122,7 @@
}
@Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
throw new IllegalStateException("this method should not be called");
}