Merge branch 'master' into jarodwen/features/positionvar_local
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
index 7efee75..b9ac62f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
@@ -23,12 +23,22 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
public class UnnestOperator extends AbstractUnnestOperator {
private LogicalVariable positionalVariable;
+
+ /**
+ * Used to set the position offset for positional variable
+ */
+ private ILogicalExpression positionOffsetExpr;
+
+ /**
+ * Specify the type of the positional variable
+ */
private Object positionalVariableType;
public UnnestOperator(LogicalVariable variable, Mutable<ILogicalExpression> expression) {
@@ -67,6 +77,14 @@
return positionalVariableType;
}
+ public void setPositionOffsetExpr(ILogicalExpression posOffsetExpr) {
+ this.positionOffsetExpr = posOffsetExpr;
+ }
+
+ public ILogicalExpression getPositionOffsetExpr() {
+ return this.positionOffsetExpr;
+ }
+
@Override
public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
return visitor.visitUnnestOperator(this, arg);
@@ -88,4 +106,24 @@
}
return env;
}
+
+ @Override
+ public VariablePropagationPolicy getVariablePropagationPolicy() {
+ return new VariablePropagationPolicy() {
+
+ @Override
+ public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
+ throws AlgebricksException {
+ if (sources.length > 0) {
+ target.addAllVariables(sources[0]);
+ }
+ for (LogicalVariable v : variables) {
+ target.addVariable(v);
+ }
+ if (positionalVariable != null) {
+ target.addVariable(positionalVariable);
+ }
+ }
+ };
+ }
}
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
index bf45e18..bf2f68a 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
@@ -30,6 +30,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.UnnestRuntimeFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -57,10 +58,7 @@
IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
UnnestOperator unnest = (UnnestOperator) op;
- if (unnest.getPositionalVariable() != null) {
- throw new AlgebricksException("Cannot generate runtime for unnest with positional variable "
- + unnest.getPositionalVariable());
- }
+
int outCol = opSchema.findVariable(unnest.getVariable());
ILogicalExpression unnestExpr = unnest.getExpressionRef().getValue();
IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
@@ -79,8 +77,18 @@
UnnestingFunctionCallExpression agg = (UnnestingFunctionCallExpression) unnestExpr;
IUnnestingEvaluatorFactory unnestingFactory = expressionRuntimeProvider.createUnnestingFunctionFactory(agg,
context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
+
+ // for position offset
+ ILogicalExpression posOffsetExpr = unnest.getPositionOffsetExpr();
+ IScalarEvaluatorFactory posOffsetExprEvalFactory = null;
+ if (posOffsetExpr != null) {
+ posOffsetExprEvalFactory = expressionRuntimeProvider.createEvaluatorFactory(posOffsetExpr,
+ context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
+ }
+
int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
- UnnestRuntimeFactory unnestRuntime = new UnnestRuntimeFactory(outCol, unnestingFactory, projectionList);
+ UnnestRuntimeFactory unnestRuntime = new UnnestRuntimeFactory(outCol, unnestingFactory, projectionList,
+ unnest.getPositionalVariable() != null, posOffsetExprEvalFactory);
RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
builder.contributeMicroOperator(unnest, unnestRuntime, recDesc);
ILogicalOperator src = unnest.getInputs().get(0).getValue();
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 6c1dd5e..b079c3e 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -51,8 +51,8 @@
FrameUtils.flushFrame(frame, writer);
appender.reset(frame, true);
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException(
- "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendToFrameFromTupleBuilder).");
+ throw new HyracksDataException(
+ "Could not write frame: the size of the tuple is too long to be fit into a single frame. (AbstractOneInputOneOutputOneFramePushRuntime.appendToFrameFromTupleBuilder)");
}
}
if (flushFrame) {
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 0499684..e5bede2 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -107,7 +107,8 @@
FrameUtils.flushFrame(frame, writer);
appender.reset(frame, true);
if (!appender.appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t)) {
- throw new IllegalStateException("Could not write frame.");
+ throw new HyracksDataException(
+ "Could not write frame: subplan result is larger than the single-frame limit.");
}
}
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index df70437..88dc19f 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -14,11 +14,15 @@
*/
package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+import java.io.IOException;
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.evaluators.ConstantEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -26,6 +30,7 @@
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
@@ -36,12 +41,20 @@
private int outColPos;
private final boolean outColIsProjected;
+ private final boolean hasPositionalVariable;
+ private IScalarEvaluatorFactory posOffsetEvalFactory;
+
// Each time step() is called on the aggregate, a new value is written in
// its output. One byte is written before that value and is neglected.
// By convention, if the aggregate function writes nothing, it means it
// produced the last value.
public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory unnestingFactory, int[] projectionList) {
+ this(outCol, unnestingFactory, projectionList, false, null);
+ }
+
+ public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory unnestingFactory, int[] projectionList,
+ boolean hashPositionalVariable, IScalarEvaluatorFactory posOffsetEvalFactory) {
super(projectionList);
this.outCol = outCol;
this.unnestingFactory = unnestingFactory;
@@ -52,6 +65,11 @@
}
}
outColIsProjected = outColPos >= 0;
+ this.hasPositionalVariable = hashPositionalVariable;
+ this.posOffsetEvalFactory = posOffsetEvalFactory;
+ if (this.posOffsetEvalFactory == null) {
+ this.posOffsetEvalFactory = new ConstantEvaluatorFactory(new byte[5]);
+ }
}
@Override
@@ -68,6 +86,9 @@
private IUnnestingEvaluator agg;
private ArrayTupleBuilder tupleBuilder;
+ private int tupleCount;
+ private IScalarEvaluator offsetEval = posOffsetEvalFactory.createScalarEvaluator(ctx);
+
@Override
public void open() throws HyracksDataException {
initAccessAppendRef(ctx);
@@ -77,6 +98,7 @@
throw new HyracksDataException(ae);
}
tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+ tupleCount = 1;
writer.open();
}
@@ -86,6 +108,16 @@
int nTuple = tAccess.getTupleCount();
for (int t = 0; t < nTuple; t++) {
tRef.reset(tAccess, t);
+
+ try {
+ offsetEval.evaluate(tRef, p);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+
+ @SuppressWarnings("static-access")
+ int offset = IntegerSerializerDeserializer.INSTANCE.getInt(p.getByteArray(), p.getStartOffset());
+
try {
agg.init(tRef);
boolean goon = true;
@@ -94,21 +126,33 @@
if (!agg.step(p)) {
goon = false;
} else {
- if (!outColIsProjected) {
+
+ if (!outColIsProjected && !hasPositionalVariable) {
appendProjectionToFrame(t, projectionList);
} else {
for (int f = 0; f < outColPos; f++) {
tupleBuilder.addField(tAccess, t, f);
}
- tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
- for (int f = outColPos + 1; f < projectionList.length; f++) {
+ if (outColIsProjected) {
+ tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
+ } else {
+ tupleBuilder.addField(tAccess, t, outColPos);
+ }
+ for (int f = outColPos + 1; f < (hasPositionalVariable ? projectionList.length - 1
+ : projectionList.length); f++) {
tupleBuilder.addField(tAccess, t, f);
}
}
+ if (hasPositionalVariable) {
+ // Write the positional variable as an INT32
+ tupleBuilder.getDataOutput().writeByte(3);
+ tupleBuilder.getDataOutput().writeInt(offset + tupleCount++);
+ tupleBuilder.addFieldEndOffset();
+ }
appendToFrameFromTupleBuilder(tupleBuilder);
}
} while (goon);
- } catch (AlgebricksException ae) {
+ } catch (AlgebricksException | IOException ae) {
throw new HyracksDataException(ae);
}
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
index c75f9f9..1af32de 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
@@ -25,4 +25,6 @@
public IIOManager getIOManager();
public ByteBuffer allocateFrame() throws HyracksDataException;
+
+ public void deallocateFrames(int frameCount);
}
\ No newline at end of file
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
index aa7008f..3ea81ef 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
@@ -41,4 +41,10 @@
return ByteBuffer.allocate(frameSize);
}
+ @Override
+ public void deallocateFrames(int frameCount) {
+ // TODO Auto-generated method stub
+
+ }
+
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 53e5a01..bbbe0f5 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -114,6 +114,11 @@
}
@Override
+ public void deallocateFrames(int frameCount) {
+ joblet.deallocateFrames(frameCount);
+ }
+
+ @Override
public int getFrameSize() {
return joblet.getFrameSize();
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 7773765..66efc03 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -44,6 +44,8 @@
private boolean first;
+ private boolean isFailed = false;
+
public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc,
IFrameWriter writer) throws HyracksDataException {
@@ -149,12 +151,13 @@
@Override
public void fail() throws HyracksDataException {
+ isFailed = true;
writer.fail();
}
@Override
public void close() throws HyracksDataException {
- if (!first) {
+ if (!isFailed && !first) {
writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
if (appender.getTupleCount() > 0) {
FrameUtils.flushFrame(outFrame, writer);
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 0ab2f4d..b03489c 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -561,6 +561,12 @@
public ByteBuffer allocateFrame() {
return ByteBuffer.allocate(FRAME_SIZE);
}
+
+ @Override
+ public void deallocateFrames(int frameCount) {
+ // TODO Auto-generated method stub
+
+ }
}
@Override
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
index 10b5582..f18099b 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
@@ -46,6 +46,10 @@
public ByteBuffer allocateFrame() {
return jobletContext.allocateFrame();
}
+
+ @Override
+ public void deallocateFrames(int frameCount) {
+ }
@Override
public int getFrameSize() {