Move to non-copy-based evaluator interfaces for scalar functions, aggregate functions, running aggregate functions and unnest functions.
Change-Id: I92a630550f3d45a7a5f00cfbc93e7b049b06330d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/614
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IExpressionRuntimeProvider.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IExpressionRuntimeProvider.java
index 3c0e1e9..c584b47 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IExpressionRuntimeProvider.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IExpressionRuntimeProvider.java
@@ -23,7 +23,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
@@ -36,7 +36,7 @@
IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
throws AlgebricksException;
- public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+ public ISerializedAggregateEvaluatorFactory createSerializableAggregateFunctionFactory(
AggregateFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas,
JobGenContext context) throws AlgebricksException;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ILogicalExpressionJobGen.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ILogicalExpressionJobGen.java
index 0996b9c..7463896 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ILogicalExpressionJobGen.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ILogicalExpressionJobGen.java
@@ -22,30 +22,30 @@
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyRunningAggregateFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyUnnestingFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
public interface ILogicalExpressionJobGen {
- public ICopyEvaluatorFactory createEvaluatorFactory(ILogicalExpression expr, IVariableTypeEnvironment env,
+ public IScalarEvaluatorFactory createEvaluatorFactory(ILogicalExpression expr, IVariableTypeEnvironment env,
IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException;
- public ICopyAggregateFunctionFactory createAggregateFunctionFactory(AggregateFunctionCallExpression expr,
+ public IAggregateEvaluatorFactory createAggregateFunctionFactory(AggregateFunctionCallExpression expr,
IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
throws AlgebricksException;
- public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+ public ISerializedAggregateEvaluatorFactory createSerializableAggregateFunctionFactory(
AggregateFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas,
JobGenContext context) throws AlgebricksException;
- public ICopyRunningAggregateFunctionFactory createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr,
+ public IRunningAggregateEvaluatorFactory createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr,
IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
throws AlgebricksException;
- public ICopyUnnestingFunctionFactory createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr,
+ public IUnnestingEvaluatorFactory createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr,
IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
throws AlgebricksException;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java
index 50e4f3d..02993f7 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java
@@ -22,27 +22,11 @@
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyRunningAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopyRunningAggregateFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyUnnestingFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopyUnnestingFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class LogicalExpressionJobGenToExpressionRuntimeProviderAdapter implements IExpressionRuntimeProvider {
private final ILogicalExpressionJobGen lejg;
@@ -54,20 +38,18 @@
@Override
public IScalarEvaluatorFactory createEvaluatorFactory(ILogicalExpression expr, IVariableTypeEnvironment env,
IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException {
- ICopyEvaluatorFactory cef = lejg.createEvaluatorFactory(expr, env, inputSchemas, context);
- return new ScalarEvaluatorFactoryAdapter(cef);
+ return lejg.createEvaluatorFactory(expr, env, inputSchemas, context);
}
@Override
public IAggregateEvaluatorFactory createAggregateFunctionFactory(AggregateFunctionCallExpression expr,
IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
- throws AlgebricksException {
- ICopyAggregateFunctionFactory caff = lejg.createAggregateFunctionFactory(expr, env, inputSchemas, context);
- return new AggregateFunctionFactoryAdapter(caff);
+ throws AlgebricksException {
+ return lejg.createAggregateFunctionFactory(expr, env, inputSchemas, context);
}
@Override
- public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+ public ISerializedAggregateEvaluatorFactory createSerializableAggregateFunctionFactory(
AggregateFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas,
JobGenContext context) throws AlgebricksException {
return lejg.createSerializableAggregateFunctionFactory(expr, env, inputSchemas, context);
@@ -76,143 +58,14 @@
@Override
public IRunningAggregateEvaluatorFactory createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr,
IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
- throws AlgebricksException {
- ICopyRunningAggregateFunctionFactory craff = lejg.createRunningAggregateFunctionFactory(expr, env,
- inputSchemas, context);
- return new RunningAggregateFunctionFactoryAdapter(craff);
+ throws AlgebricksException {
+ return lejg.createRunningAggregateFunctionFactory(expr, env, inputSchemas, context);
}
@Override
public IUnnestingEvaluatorFactory createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr,
IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
- throws AlgebricksException {
- ICopyUnnestingFunctionFactory cuff = lejg.createUnnestingFunctionFactory(expr, env, inputSchemas, context);
- return new UnnestingFunctionFactoryAdapter(cuff);
- }
-
- public static final class ScalarEvaluatorFactoryAdapter implements IScalarEvaluatorFactory {
- private static final long serialVersionUID = 1L;
-
- private final ICopyEvaluatorFactory cef;
-
- public ScalarEvaluatorFactoryAdapter(ICopyEvaluatorFactory cef) {
- this.cef = cef;
- }
-
- @Override
- public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
- final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
- final ICopyEvaluator ce = cef.createEvaluator(abvs);
- return new IScalarEvaluator() {
- @Override
- public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
- abvs.reset();
- ce.evaluate(tuple);
- result.set(abvs);
- }
- };
- }
- }
-
- public static final class AggregateFunctionFactoryAdapter implements IAggregateEvaluatorFactory {
- private static final long serialVersionUID = 1L;
-
- private final ICopyAggregateFunctionFactory caff;
-
- public AggregateFunctionFactoryAdapter(ICopyAggregateFunctionFactory caff) {
- this.caff = caff;
- }
-
- @Override
- public IAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
- final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
- final ICopyAggregateFunction caf = caff.createAggregateFunction(abvs);
- return new IAggregateEvaluator() {
- @Override
- public void step(IFrameTupleReference tuple) throws AlgebricksException {
- caf.step(tuple);
- }
-
- @Override
- public void init() throws AlgebricksException {
- abvs.reset();
- caf.init();
- }
-
- @Override
- public void finishPartial(IPointable result) throws AlgebricksException {
- caf.finishPartial();
- result.set(abvs);
- }
-
- @Override
- public void finish(IPointable result) throws AlgebricksException {
- caf.finish();
- result.set(abvs);
- }
-
- };
- }
- }
-
- public static final class RunningAggregateFunctionFactoryAdapter implements IRunningAggregateEvaluatorFactory {
- private static final long serialVersionUID = 1L;
-
- private final ICopyRunningAggregateFunctionFactory craff;
-
- public RunningAggregateFunctionFactoryAdapter(ICopyRunningAggregateFunctionFactory craff) {
- this.craff = craff;
- }
-
- @Override
- public IRunningAggregateEvaluator createRunningAggregateEvaluator() throws AlgebricksException {
- final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
- final ICopyRunningAggregateFunction craf = craff.createRunningAggregateFunction(abvs);
- return new IRunningAggregateEvaluator() {
- @Override
- public void step(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
- abvs.reset();
- craf.step(tuple);
- result.set(abvs);
- }
-
- @Override
- public void init() throws AlgebricksException {
- craf.init();
- }
- };
- }
- }
-
- public static final class UnnestingFunctionFactoryAdapter implements IUnnestingEvaluatorFactory {
- private static final long serialVersionUID = 1L;
-
- private final ICopyUnnestingFunctionFactory cuff;
-
- public UnnestingFunctionFactoryAdapter(ICopyUnnestingFunctionFactory cuff) {
- this.cuff = cuff;
- }
-
- @Override
- public IUnnestingEvaluator createUnnestingEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
- final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
- final ICopyUnnestingFunction cuf = cuff.createUnnestingFunction(abvs);
- return new IUnnestingEvaluator() {
- @Override
- public boolean step(IPointable result) throws AlgebricksException {
- abvs.reset();
- if (cuf.step()) {
- result.set(abvs);
- return true;
- }
- return false;
- }
-
- @Override
- public void init(IFrameTupleReference tuple) throws AlgebricksException {
- cuf.init(tuple);
- }
- };
- }
+ throws AlgebricksException {
+ return lejg.createUnnestingFunctionFactory(expr, env, inputSchemas, context);
}
}
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index 672e6d0..efb9681 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -56,7 +56,7 @@
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import org.apache.hyracks.algebricks.core.jobgen.impl.OperatorSchemaImpl;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.aggreg.SerializableAggregatorDescriptorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
@@ -185,7 +185,7 @@
IPartialAggregationTypeComputer partialAggregationTypeComputer = context.getPartialAggregationTypeComputer();
List<Object> intermediateTypes = new ArrayList<Object>();
int n = aggOp.getExpressions().size();
- ICopySerializableAggregateFunctionFactory[] aff = new ICopySerializableAggregateFunctionFactory[n];
+ ISerializedAggregateEvaluatorFactory[] aff = new ISerializedAggregateEvaluatorFactory[n];
int i = 0;
IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
IVariableTypeEnvironment aggOpInputEnv = context.getTypeEnvironment(aggOp.getInputs().get(0).getValue());
@@ -223,7 +223,7 @@
IBinaryHashFunctionFactory[] hashFunctionFactories = JobGenHelper.variablesToBinaryHashFunctionFactories(
gbyCols, aggOpInputEnv, context);
- ICopySerializableAggregateFunctionFactory[] merges = new ICopySerializableAggregateFunctionFactory[n];
+ ISerializedAggregateEvaluatorFactory[] merges = new ISerializedAggregateEvaluatorFactory[n];
List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
IOperatorSchema[] localInputSchemas = new IOperatorSchema[1];
localInputSchemas[0] = new OperatorSchemaImpl();
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/PigletExpressionJobGen.java b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/PigletExpressionJobGen.java
index 1c3f9b8..e7d96ac 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/PigletExpressionJobGen.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/PigletExpressionJobGen.java
@@ -41,11 +41,11 @@
import org.apache.hyracks.algebricks.examples.piglet.exceptions.PigletException;
import org.apache.hyracks.algebricks.examples.piglet.runtime.functions.PigletFunctionRegistry;
import org.apache.hyracks.algebricks.examples.piglet.types.Type;
-import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyRunningAggregateFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyUnnestingFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory;
import org.apache.hyracks.algebricks.runtime.evaluators.ConstantEvalFactory;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
@@ -56,7 +56,7 @@
private final UTF8StringSerializerDeserializer utf8SerDer = new UTF8StringSerializerDeserializer();
@Override
- public ICopyEvaluatorFactory createEvaluatorFactory(ILogicalExpression expr, IVariableTypeEnvironment env,
+ public IScalarEvaluatorFactory createEvaluatorFactory(ILogicalExpression expr, IVariableTypeEnvironment env,
IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException {
switch (expr.getExpressionTag()) {
case CONSTANT: {
@@ -92,12 +92,12 @@
ScalarFunctionCallExpression sfce = (ScalarFunctionCallExpression) expr;
List<Mutable<ILogicalExpression>> argExprs = sfce.getArguments();
- ICopyEvaluatorFactory argEvalFactories[] = new ICopyEvaluatorFactory[argExprs.size()];
+ IScalarEvaluatorFactory argEvalFactories[] = new IScalarEvaluatorFactory[argExprs.size()];
for (int i = 0; i < argEvalFactories.length; ++i) {
Mutable<ILogicalExpression> er = argExprs.get(i);
argEvalFactories[i] = createEvaluatorFactory(er.getValue(), env, inputSchemas, context);
}
- ICopyEvaluatorFactory funcEvalFactory;
+ IScalarEvaluatorFactory funcEvalFactory;
try {
funcEvalFactory = PigletFunctionRegistry.createFunctionEvaluatorFactory(
sfce.getFunctionIdentifier(), argEvalFactories);
@@ -117,28 +117,28 @@
}
@Override
- public ICopyAggregateFunctionFactory createAggregateFunctionFactory(AggregateFunctionCallExpression expr,
+ public IAggregateEvaluatorFactory createAggregateFunctionFactory(AggregateFunctionCallExpression expr,
IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
throws AlgebricksException {
throw new UnsupportedOperationException();
}
@Override
- public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+ public ISerializedAggregateEvaluatorFactory createSerializableAggregateFunctionFactory(
AggregateFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas,
JobGenContext context) throws AlgebricksException {
throw new UnsupportedOperationException();
}
@Override
- public ICopyRunningAggregateFunctionFactory createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr,
+ public IRunningAggregateEvaluatorFactory createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr,
IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
throws AlgebricksException {
throw new UnsupportedOperationException();
}
@Override
- public ICopyUnnestingFunctionFactory createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr,
+ public IUnnestingEvaluatorFactory createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr,
IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
throws AlgebricksException {
throw new UnsupportedOperationException();
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IPigletFunctionEvaluatorFactoryBuilder.java b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IPigletFunctionEvaluatorFactoryBuilder.java
index c6624eb..6af6177 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IPigletFunctionEvaluatorFactoryBuilder.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IPigletFunctionEvaluatorFactoryBuilder.java
@@ -19,8 +19,8 @@
package org.apache.hyracks.algebricks.examples.piglet.runtime.functions;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
public interface IPigletFunctionEvaluatorFactoryBuilder {
- public ICopyEvaluatorFactory buildEvaluatorFactory(FunctionIdentifier fid, ICopyEvaluatorFactory[] arguments);
+ public IScalarEvaluatorFactory buildEvaluatorFactory(FunctionIdentifier fid, IScalarEvaluatorFactory[] arguments);
}
\ No newline at end of file
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IntegerEqFunctionEvaluatorFactory.java b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IntegerEqFunctionEvaluatorFactory.java
index 534fc33..2e10438 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IntegerEqFunctionEvaluatorFactory.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IntegerEqFunctionEvaluatorFactory.java
@@ -18,53 +18,45 @@
*/
package org.apache.hyracks.algebricks.examples.piglet.runtime.functions;
-import java.io.DataOutput;
-import java.io.IOException;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.primitive.IntegerPointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-public class IntegerEqFunctionEvaluatorFactory implements ICopyEvaluatorFactory {
+public class IntegerEqFunctionEvaluatorFactory implements IScalarEvaluatorFactory {
private static final long serialVersionUID = 1L;
- private final ICopyEvaluatorFactory arg1Factory;
+ private final IScalarEvaluatorFactory arg1Factory;
- private final ICopyEvaluatorFactory arg2Factory;
+ private final IScalarEvaluatorFactory arg2Factory;
- public IntegerEqFunctionEvaluatorFactory(ICopyEvaluatorFactory arg1Factory, ICopyEvaluatorFactory arg2Factory) {
+ public IntegerEqFunctionEvaluatorFactory(IScalarEvaluatorFactory arg1Factory, IScalarEvaluatorFactory arg2Factory) {
this.arg1Factory = arg1Factory;
this.arg2Factory = arg2Factory;
}
@Override
- public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
- return new ICopyEvaluator() {
- private DataOutput dataout = output.getDataOutput();
- private ArrayBackedValueStorage out1 = new ArrayBackedValueStorage();
- private ArrayBackedValueStorage out2 = new ArrayBackedValueStorage();
- private ICopyEvaluator eval1 = arg1Factory.createEvaluator(out1);
- private ICopyEvaluator eval2 = arg2Factory.createEvaluator(out2);
+ public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException {
+ return new IScalarEvaluator() {
+ private IPointable out1 = new VoidPointable();
+ private IPointable out2 = new VoidPointable();
+ private IScalarEvaluator eval1 = arg1Factory.createScalarEvaluator(ctx);
+ private IScalarEvaluator eval2 = arg2Factory.createScalarEvaluator(ctx);
+ private byte[] resultData = new byte[1];
@Override
- public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
- out1.reset();
- eval1.evaluate(tuple);
- out2.reset();
- eval2.evaluate(tuple);
- int v1 = IntegerPointable.getInteger(out1.getByteArray(), 0);
- int v2 = IntegerPointable.getInteger(out2.getByteArray(), 0);
+ public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
+ eval1.evaluate(tuple, out1);
+ eval2.evaluate(tuple, out2);
+ int v1 = IntegerPointable.getInteger(out1.getByteArray(), out1.getStartOffset());
+ int v2 = IntegerPointable.getInteger(out2.getByteArray(), out2.getStartOffset());
boolean r = v1 == v2;
- try {
- dataout.writeBoolean(r);
- } catch (IOException ioe) {
- throw new AlgebricksException(ioe);
- }
+ resultData[0] = r ? (byte) 1 : (byte) 0;
+ result.set(resultData, 0, 1);
}
};
}
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/PigletFunctionRegistry.java b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/PigletFunctionRegistry.java
index 1ebe9cf..340e7f5 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/PigletFunctionRegistry.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/PigletFunctionRegistry.java
@@ -25,7 +25,7 @@
import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.examples.piglet.exceptions.PigletException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
public class PigletFunctionRegistry {
private static final Map<FunctionIdentifier, IPigletFunctionEvaluatorFactoryBuilder> builderMap;
@@ -35,7 +35,7 @@
temp.put(AlgebricksBuiltinFunctions.EQ, new IPigletFunctionEvaluatorFactoryBuilder() {
@Override
- public ICopyEvaluatorFactory buildEvaluatorFactory(FunctionIdentifier fid, ICopyEvaluatorFactory[] arguments) {
+ public IScalarEvaluatorFactory buildEvaluatorFactory(FunctionIdentifier fid, IScalarEvaluatorFactory[] arguments) {
return new IntegerEqFunctionEvaluatorFactory(arguments[0], arguments[1]);
}
});
@@ -43,7 +43,7 @@
builderMap = Collections.unmodifiableMap(temp);
}
- public static ICopyEvaluatorFactory createFunctionEvaluatorFactory(FunctionIdentifier fid, ICopyEvaluatorFactory[] args)
+ public static IScalarEvaluatorFactory createFunctionEvaluatorFactory(FunctionIdentifier fid, IScalarEvaluatorFactory[] args)
throws PigletException {
IPigletFunctionEvaluatorFactoryBuilder builder = builderMap.get(fid);
if (builder == null) {
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java
index 9b2919e..01f1b88 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
@@ -32,7 +33,8 @@
private static final long serialVersionUID = 1L;
@Override
- public IRunningAggregateEvaluator createRunningAggregateEvaluator() throws AlgebricksException {
+ public IRunningAggregateEvaluator createRunningAggregateEvaluator(IHyracksTaskContext ctx)
+ throws AlgebricksException {
final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
return new IRunningAggregateEvaluator() {
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluator.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluator.java
index 41e2fee..08aea9a 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluator.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluator.java
@@ -23,11 +23,12 @@
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public interface IAggregateEvaluator {
+ /** should be called each time a new aggregate value is computed */
public void init() throws AlgebricksException;
public void step(IFrameTupleReference tuple) throws AlgebricksException;
- public void finishPartial(IPointable result) throws AlgebricksException;
-
public void finish(IPointable result) throws AlgebricksException;
-}
\ No newline at end of file
+
+ public void finishPartial(IPointable result) throws AlgebricksException;
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java
index 8bdbed7..8a61280 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java
@@ -25,4 +25,4 @@
public interface IAggregateEvaluatorFactory extends Serializable {
public IAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) throws AlgebricksException;
-}
\ No newline at end of file
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunction.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunction.java
deleted file mode 100644
index 2222e0b..0000000
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunction.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.base;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-
-public interface ICopyAggregateFunction {
- /** should be called each time a new aggregate value is computed */
- public void init() throws AlgebricksException;
-
- public void step(IFrameTupleReference tuple) throws AlgebricksException;
-
- public void finish() throws AlgebricksException;
-
- public void finishPartial() throws AlgebricksException;
-}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunctionFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunctionFactory.java
deleted file mode 100644
index cbb6732..0000000
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunctionFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.base;
-
-import java.io.Serializable;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
-
-public interface ICopyAggregateFunctionFactory extends Serializable {
- public ICopyAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws AlgebricksException;
-}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluator.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluator.java
deleted file mode 100644
index 03480e8..0000000
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluator.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.base;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-
-public interface ICopyEvaluator {
- public void evaluate(IFrameTupleReference tuple) throws AlgebricksException;
-}
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluatorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluatorFactory.java
deleted file mode 100644
index a81f351..0000000
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluatorFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.base;
-
-import java.io.Serializable;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
-
-public interface ICopyEvaluatorFactory extends Serializable {
- public ICopyEvaluator createEvaluator(IDataOutputProvider output) throws AlgebricksException;
-}
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunction.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunction.java
deleted file mode 100644
index 19cab14..0000000
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunction.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.base;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-
-public interface ICopyRunningAggregateFunction {
- public void init() throws AlgebricksException;
-
- public void step(IFrameTupleReference tuple) throws AlgebricksException;
-}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunctionFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunctionFactory.java
deleted file mode 100644
index 1fe3595..0000000
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunctionFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.base;
-
-import java.io.Serializable;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
-
-public interface ICopyRunningAggregateFunctionFactory extends Serializable {
- public ICopyRunningAggregateFunction createRunningAggregateFunction(IDataOutputProvider provider)
- throws AlgebricksException;
-}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunctionFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunctionFactory.java
deleted file mode 100644
index 0959811..0000000
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunctionFactory.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.base;
-
-import java.io.Serializable;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-
-public interface ICopySerializableAggregateFunctionFactory extends Serializable {
- public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException;
-}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunction.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunction.java
deleted file mode 100644
index f4e3aea..0000000
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunction.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.base;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-
-public interface ICopyUnnestingFunction {
- public void init(IFrameTupleReference tuple) throws AlgebricksException;
-
- public boolean step() throws AlgebricksException;
-
-}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IRunningAggregateEvaluatorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IRunningAggregateEvaluatorFactory.java
index 0fe86a8..c71b41d 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IRunningAggregateEvaluatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IRunningAggregateEvaluatorFactory.java
@@ -21,7 +21,9 @@
import java.io.Serializable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
public interface IRunningAggregateEvaluatorFactory extends Serializable {
- public IRunningAggregateEvaluator createRunningAggregateEvaluator() throws AlgebricksException;
-}
\ No newline at end of file
+ public IRunningAggregateEvaluator createRunningAggregateEvaluator(IHyracksTaskContext ctx)
+ throws AlgebricksException;
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunction.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ISerializedAggregateEvaluator.java
similarity index 94%
rename from algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunction.java
rename to algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ISerializedAggregateEvaluator.java
index 078a51d..473afbc 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunction.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ISerializedAggregateEvaluator.java
@@ -23,13 +23,12 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-public interface ICopySerializableAggregateFunction {
+public interface ISerializedAggregateEvaluator {
/**
* initialize the space occupied by internal state
*
* @param state
* @throws AlgebricksException
- * @return length of the intermediate state
*/
public void init(DataOutput state) throws AlgebricksException;
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunctionFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ISerializedAggregateEvaluatorFactory.java
similarity index 79%
rename from algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunctionFactory.java
rename to algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ISerializedAggregateEvaluatorFactory.java
index 1a09fcf..084f39d 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunctionFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ISerializedAggregateEvaluatorFactory.java
@@ -21,8 +21,8 @@
import java.io.Serializable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
-public interface ICopyUnnestingFunctionFactory extends Serializable {
- public ICopyUnnestingFunction createUnnestingFunction(IDataOutputProvider provider) throws AlgebricksException;
+public interface ISerializedAggregateEvaluatorFactory extends Serializable {
+ public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) throws AlgebricksException;
}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluator.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluator.java
index f29e65e..67aede4 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluator.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluator.java
@@ -26,4 +26,5 @@
public void init(IFrameTupleReference tuple) throws AlgebricksException;
public boolean step(IPointable result) throws AlgebricksException;
-}
\ No newline at end of file
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java
index eef98b5..53ca8b5 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java
@@ -25,4 +25,4 @@
public interface IUnnestingEvaluatorFactory extends Serializable {
public IUnnestingEvaluator createUnnestingEvaluator(IHyracksTaskContext ctx) throws AlgebricksException;
-}
\ No newline at end of file
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java
index 5bb206c..1b7753c 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java
@@ -18,16 +18,14 @@
*/
package org.apache.hyracks.algebricks.runtime.evaluators;
-import java.io.DataOutput;
-import java.io.IOException;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-public class ColumnAccessEvalFactory implements ICopyEvaluatorFactory {
+public class ColumnAccessEvalFactory implements IScalarEvaluatorFactory {
private static final long serialVersionUID = 1L;
@@ -43,21 +41,15 @@
}
@Override
- public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
- return new ICopyEvaluator() {
-
- private DataOutput out = output.getDataOutput();
+ public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException {
+ return new IScalarEvaluator() {
@Override
- public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+ public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
byte[] buffer = tuple.getFieldData(fieldIndex);
int start = tuple.getFieldStart(fieldIndex);
int length = tuple.getFieldLength(fieldIndex);
- try {
- out.write(buffer, start, length);
- } catch (IOException ioe) {
- throw new AlgebricksException(ioe);
- }
+ result.set(buffer, start, length);
}
};
}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java
index b0eebd9..da7d42b 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java
@@ -18,16 +18,14 @@
*/
package org.apache.hyracks.algebricks.runtime.evaluators;
-import java.io.DataOutput;
-import java.io.IOException;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-public class ConstantEvalFactory implements ICopyEvaluatorFactory {
+public class ConstantEvalFactory implements IScalarEvaluatorFactory {
private static final long serialVersionUID = 1L;
private byte[] value;
@@ -42,18 +40,12 @@
}
@Override
- public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
- return new ICopyEvaluator() {
-
- private DataOutput out = output.getDataOutput();
+ public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException {
+ return new IScalarEvaluator() {
@Override
- public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
- try {
- out.write(value, 0, value.length);
- } catch (IOException ioe) {
- throw new AlgebricksException(ioe);
- }
+ public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
+ result.set(value, 0, value.length);
}
};
}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvaluatorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvaluatorFactory.java
deleted file mode 100644
index 05229fc..0000000
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvaluatorFactory.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.evaluators;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-
-public class ConstantEvaluatorFactory implements IScalarEvaluatorFactory {
- private static final long serialVersionUID = 1L;
-
- private byte[] value;
-
- public ConstantEvaluatorFactory(byte[] value) {
- this.value = value;
- }
-
- @Override
- public String toString() {
- return "Constant";
- }
-
- @Override
- public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
- return new IScalarEvaluator() {
- @Override
- public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
- result.set(value, 0, value.length);
- }
- };
- }
-
-}
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
index 82e5f50..fe42878 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
@@ -21,8 +21,8 @@
import java.io.DataOutput;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -35,16 +35,16 @@
public class SerializableAggregatorDescriptorFactory extends AbstractAccumulatingAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
- private ICopySerializableAggregateFunctionFactory[] aggFactories;
+ private ISerializedAggregateEvaluatorFactory[] aggFactories;
- public SerializableAggregatorDescriptorFactory(ICopySerializableAggregateFunctionFactory[] aggFactories) {
+ public SerializableAggregatorDescriptorFactory(ISerializedAggregateEvaluatorFactory[] aggFactories) {
this.aggFactories = aggFactories;
}
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults)
- throws HyracksDataException {
+ throws HyracksDataException {
final int[] keys = keyFields;
/**
@@ -52,7 +52,7 @@
*/
return new IAggregatorDescriptor() {
private FrameTupleReference ftr = new FrameTupleReference();
- private ICopySerializableAggregateFunction[] aggs = new ICopySerializableAggregateFunction[aggFactories.length];
+ private ISerializedAggregateEvaluator[] aggs = new ISerializedAggregateEvaluator[aggFactories.length];
private int offsetFieldIndex = keys.length;
private int stateFieldLength[] = new int[aggFactories.length];
@@ -70,7 +70,7 @@
try {
int begin = tb.getSize();
if (aggs[i] == null) {
- aggs[i] = aggFactories[i].createAggregateFunction();
+ aggs[i] = aggFactories[i].createAggregateEvaluator(ctx);
}
aggs[i].init(output);
tb.addFieldEndOffset();
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
index b7f11d8..2d5c929 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
@@ -24,8 +24,8 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
@@ -35,7 +35,8 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -48,12 +49,13 @@
private static final long serialVersionUID = 1L;
public static int NO_DEFAULT_BRANCH = -1;
- private final ICopyEvaluatorFactory[] evalFactories;
+ private final IScalarEvaluatorFactory[] evalFactories;
private final IBinaryBooleanInspector boolInspector;
private final int defaultBranchIndex;
- public PartitioningSplitOperatorDescriptor(IOperatorDescriptorRegistry spec, ICopyEvaluatorFactory[] evalFactories,
- IBinaryBooleanInspector boolInspector, int defaultBranchIndex, RecordDescriptor rDesc) {
+ public PartitioningSplitOperatorDescriptor(IOperatorDescriptorRegistry spec,
+ IScalarEvaluatorFactory[] evalFactories, IBinaryBooleanInspector boolInspector, int defaultBranchIndex,
+ RecordDescriptor rDesc) {
super(spec, 1, (defaultBranchIndex == evalFactories.length) ? evalFactories.length + 1 : evalFactories.length);
for (int i = 0; i < evalFactories.length; i++) {
recordDescriptors[i] = rDesc;
@@ -71,8 +73,8 @@
private final IFrameWriter[] writers = new IFrameWriter[outputArity];
private final boolean[] isOpen = new boolean[outputArity];
private final IFrame[] writeBuffers = new IFrame[outputArity];
- private final ICopyEvaluator[] evals = new ICopyEvaluator[outputArity];
- private final ArrayBackedValueStorage evalBuf = new ArrayBackedValueStorage();
+ private final IScalarEvaluator[] evals = new IScalarEvaluator[outputArity];
+ private final IPointable evalPointable = new VoidPointable();
private final RecordDescriptor inOutRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(),
0);
private final FrameTupleAccessor accessor = new FrameTupleAccessor(inOutRecDesc);
@@ -149,12 +151,12 @@
boolean found = false;
for (int j = 0; j < evals.length; j++) {
try {
- evalBuf.reset();
- evals[j].evaluate(frameTuple);
+ evals[j].evaluate(frameTuple, evalPointable);
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
- found = boolInspector.getBooleanValue(evalBuf.getByteArray(), 0, 1);
+ found = boolInspector.getBooleanValue(evalPointable.getByteArray(),
+ evalPointable.getStartOffset(), evalPointable.getLength());
if (found) {
copyAndAppendTuple(j);
break;
@@ -199,7 +201,7 @@
// Create evaluators for partitioning.
try {
for (int i = 0; i < evalFactories.length; i++) {
- evals[i] = evalFactories[i].createEvaluator(evalBuf);
+ evals[i] = evalFactories[i].createScalarEvaluator(ctx);
}
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index 5a26f36..bb6cc73 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -100,7 +100,7 @@
int n = runningAggregates.length;
for (int i = 0; i < n; i++) {
try {
- raggs[i] = runningAggregates[i].createRunningAggregateEvaluator();
+ raggs[i] = runningAggregates[i].createRunningAggregateEvaluator(ctx);
} catch (AlgebricksException ae) {
throw new HyracksDataException(ae);
}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 2c04003..e9d4ec3 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -27,7 +27,7 @@
import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.base.IUnnestingPositionWriter;
-import org.apache.hyracks.algebricks.runtime.evaluators.ConstantEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.evaluators.ConstantEvalFactory;
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -73,7 +73,7 @@
this.positionWriter = positionWriter;
this.posOffsetEvalFactory = posOffsetEvalFactory;
if (this.posOffsetEvalFactory == null) {
- this.posOffsetEvalFactory = new ConstantEvaluatorFactory(new byte[5]);
+ this.posOffsetEvalFactory = new ConstantEvalFactory(new byte[5]);
}
}
@@ -88,7 +88,7 @@
return new AbstractOneInputOneOutputOneFramePushRuntime() {
private IPointable p = VoidPointable.FACTORY.createPointable();
- private IUnnestingEvaluator agg;
+ private IUnnestingEvaluator unnest;
private ArrayTupleBuilder tupleBuilder;
private IScalarEvaluator offsetEval = posOffsetEvalFactory.createScalarEvaluator(ctx);
@@ -98,7 +98,7 @@
writer.open();
initAccessAppendRef(ctx);
try {
- agg = unnestingFactory.createUnnestingEvaluator(ctx);
+ unnest = unnestingFactory.createUnnestingEvaluator(ctx);
} catch (AlgebricksException ae) {
throw new HyracksDataException(ae);
}
@@ -118,7 +118,7 @@
}
int offset = IntegerPointable.getInteger(p.getByteArray(), p.getStartOffset());
try {
- agg.init(tRef);
+ unnest.init(tRef);
// assume that when unnesting the tuple, each step() call for each element
// in the tuple will increase the positionIndex, and the positionIndex will
// be reset when a new tuple is to be processed.
@@ -126,7 +126,7 @@
boolean goon = true;
do {
tupleBuilder.reset();
- if (!agg.step(p)) {
+ if (!unnest.step(p)) {
goon = false;
} else {
diff --git a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java
index fd979fc..f7a97f4 100644
--- a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java
+++ b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java
@@ -29,7 +29,6 @@
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
public class IntegerAddEvalFactory implements IScalarEvaluatorFactory {
@@ -52,7 +51,6 @@
private IScalarEvaluator evalLeft = evalLeftFactory.createScalarEvaluator(ctx);
private IScalarEvaluator evalRight = evalRightFactory.createScalarEvaluator(ctx);
- @SuppressWarnings("static-access")
@Override
public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
evalLeft.evaluate(tuple, p);
diff --git a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java
index ea415c8..66d3848 100644
--- a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java
+++ b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java
@@ -27,7 +27,6 @@
import org.apache.hyracks.data.std.primitive.IntegerPointable;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
public class IntegerEqualsEvalFactory implements IScalarEvaluatorFactory {
diff --git a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java
index aebc406..46a7ab6 100644
--- a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java
+++ b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java
@@ -27,7 +27,6 @@
import org.apache.hyracks.data.std.primitive.IntegerPointable;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
public class IntegerGreaterThanEvalFactory implements IScalarEvaluatorFactory {
diff --git a/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java b/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
index e311fa6..7e834db 100644
--- a/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
+++ b/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
@@ -94,7 +94,8 @@
* Returns the character at the given byte offset. The caller is responsible for making sure that
* the provided offset is within bounds and points to the beginning of a valid UTF8 character.
*
- * @param offset - Byte offset
+ * @param offset
+ * - Byte offset
* @return Character at the given offset.
*/
public char charAt(int offset) {
@@ -157,6 +158,7 @@
UTF8StringUtil.toString(buffer, bytes, start);
}
+ @Override
public String toString() {
return new String(this.bytes, this.getCharStartOffset(), this.getUTF8Length(), Charset.forName("UTF-8"));
}
@@ -166,8 +168,8 @@
*/
public int ignoreCaseCompareTo(UTF8StringPointable other) {
- return UTF8StringUtil.lowerCaseCompareTo(this.getByteArray(), this.getStartOffset(),
- other.getByteArray(), other.getStartOffset());
+ return UTF8StringUtil.lowerCaseCompareTo(this.getByteArray(), this.getStartOffset(), other.getByteArray(),
+ other.getStartOffset());
}
public int find(UTF8StringPointable pattern, boolean ignoreCase) {
@@ -228,8 +230,9 @@
public static boolean startsWith(UTF8StringPointable src, UTF8StringPointable pattern, boolean ignoreCase) {
int utflen1 = src.getUTF8Length();
int utflen2 = pattern.getUTF8Length();
- if (utflen2 > utflen1)
+ if (utflen2 > utflen1) {
return false;
+ }
int s1Start = src.getMetaDataLength();
int s2Start = pattern.getMetaDataLength();
@@ -257,8 +260,9 @@
public static boolean endsWith(UTF8StringPointable src, UTF8StringPointable pattern, boolean ignoreCase) {
int len1 = src.getUTF8Length();
int len2 = pattern.getUTF8Length();
- if (len2 > len1)
+ if (len2 > len1) {
return false;
+ }
int s1Start = src.getMetaDataLength();
int s2Start = pattern.getMetaDataLength();
@@ -351,10 +355,7 @@
* @param out
* @throws IOException
*/
- public static void substrBefore(
- UTF8StringPointable src,
- UTF8StringPointable match,
- UTF8StringBuilder builder,
+ public static void substrBefore(UTF8StringPointable src, UTF8StringPointable match, UTF8StringBuilder builder,
GrowableArray out) throws IOException {
int byteOffset = find(src, match, false);
@@ -367,7 +368,7 @@
final int srcMetaLen = src.getMetaDataLength();
builder.reset(out, byteOffset);
- for (int idx = 0; idx < byteOffset; ) {
+ for (int idx = 0; idx < byteOffset;) {
builder.appendChar(src.charAt(srcMetaLen + idx));
idx += src.charSize(srcMetaLen + idx);
}
@@ -387,10 +388,7 @@
* @param builder
* @param out
*/
- public static void substrAfter(
- UTF8StringPointable src,
- UTF8StringPointable match,
- UTF8StringBuilder builder,
+ public static void substrAfter(UTF8StringPointable src, UTF8StringPointable match, UTF8StringBuilder builder,
GrowableArray out) throws IOException {
int byteOffset = find(src, match, false);
diff --git a/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/encoding/VarLenIntEncoderDecoder.java b/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/encoding/VarLenIntEncoderDecoder.java
index 5a716b4..0e70273 100644
--- a/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/encoding/VarLenIntEncoderDecoder.java
+++ b/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/encoding/VarLenIntEncoderDecoder.java
@@ -24,17 +24,14 @@
/**
* Encodes positive integers in a variable-bytes format.
- *
* Each byte stores seven bits of the number. The first bit of each byte notifies if it is the last byte.
* Specifically, if the first bit is set, then we need to shift the current value by seven and
* continue to read the next byte util we meet a byte whose first byte is unset.
- *
* e.g. if the number is < 128, it will be stored using one byte and the byte value keeps as original.
* To store the number 255 (0xff) , it will be encoded as [0x81,0x7f]. To decode that value, it reads the 0x81
* to know that the current value is (0x81 & 0x7f)= 0x01, and the first bit tells that there are more bytes to
* be read. When it meets 0x7f, whose first flag is unset, it knows that it is the final byte to decode.
* Finally it will return ( 0x01 << 7) + 0x7f === 255.
- *
*/
public class VarLenIntEncoderDecoder {
// sometimes the dec number is easier to get the sense of how big it is.
@@ -75,11 +72,16 @@
public static int decode(byte[] srcBytes, int startPos) {
int sum = 0;
- while ((srcBytes[startPos] & CONTINUE_CHUNK) == CONTINUE_CHUNK) {
+ while (startPos < srcBytes.length && (srcBytes[startPos] & CONTINUE_CHUNK) == CONTINUE_CHUNK) {
sum = (sum + (srcBytes[startPos] & DECODE_MASK)) << 7;
startPos++;
}
- sum += srcBytes[startPos++];
+ if (startPos < srcBytes.length) {
+ sum += srcBytes[startPos];
+ } else {
+ throw new IllegalStateException("Corrupted string bytes: trying to access entry " + startPos
+ + " in a byte array of length " + srcBytes.length);
+ }
return sum;
}