Move to non-copy-based evaluator interfaces for scalar functions, aggregate functions, running aggregate functions and unnest functions.

Change-Id: I92a630550f3d45a7a5f00cfbc93e7b049b06330d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/614
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IExpressionRuntimeProvider.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IExpressionRuntimeProvider.java
index 3c0e1e9..c584b47 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IExpressionRuntimeProvider.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IExpressionRuntimeProvider.java
@@ -23,7 +23,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
@@ -36,7 +36,7 @@
             IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
             throws AlgebricksException;
 
-    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+    public ISerializedAggregateEvaluatorFactory createSerializableAggregateFunctionFactory(
             AggregateFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas,
             JobGenContext context) throws AlgebricksException;
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ILogicalExpressionJobGen.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ILogicalExpressionJobGen.java
index 0996b9c..7463896 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ILogicalExpressionJobGen.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ILogicalExpressionJobGen.java
@@ -22,30 +22,30 @@
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyRunningAggregateFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyUnnestingFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
 
 public interface ILogicalExpressionJobGen {
 
-    public ICopyEvaluatorFactory createEvaluatorFactory(ILogicalExpression expr, IVariableTypeEnvironment env,
+    public IScalarEvaluatorFactory createEvaluatorFactory(ILogicalExpression expr, IVariableTypeEnvironment env,
             IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException;
 
-    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(AggregateFunctionCallExpression expr,
+    public IAggregateEvaluatorFactory createAggregateFunctionFactory(AggregateFunctionCallExpression expr,
             IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
             throws AlgebricksException;
 
-    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+    public ISerializedAggregateEvaluatorFactory createSerializableAggregateFunctionFactory(
             AggregateFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas,
             JobGenContext context) throws AlgebricksException;
 
-    public ICopyRunningAggregateFunctionFactory createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr,
+    public IRunningAggregateEvaluatorFactory createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr,
             IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
             throws AlgebricksException;
 
-    public ICopyUnnestingFunctionFactory createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr,
+    public IUnnestingEvaluatorFactory createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr,
             IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
             throws AlgebricksException;
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java
index 50e4f3d..02993f7 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java
@@ -22,27 +22,11 @@
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyRunningAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopyRunningAggregateFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyUnnestingFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopyUnnestingFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class LogicalExpressionJobGenToExpressionRuntimeProviderAdapter implements IExpressionRuntimeProvider {
     private final ILogicalExpressionJobGen lejg;
@@ -54,20 +38,18 @@
     @Override
     public IScalarEvaluatorFactory createEvaluatorFactory(ILogicalExpression expr, IVariableTypeEnvironment env,
             IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException {
-        ICopyEvaluatorFactory cef = lejg.createEvaluatorFactory(expr, env, inputSchemas, context);
-        return new ScalarEvaluatorFactoryAdapter(cef);
+        return lejg.createEvaluatorFactory(expr, env, inputSchemas, context);
     }
 
     @Override
     public IAggregateEvaluatorFactory createAggregateFunctionFactory(AggregateFunctionCallExpression expr,
             IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
-            throws AlgebricksException {
-        ICopyAggregateFunctionFactory caff = lejg.createAggregateFunctionFactory(expr, env, inputSchemas, context);
-        return new AggregateFunctionFactoryAdapter(caff);
+                    throws AlgebricksException {
+        return lejg.createAggregateFunctionFactory(expr, env, inputSchemas, context);
     }
 
     @Override
-    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+    public ISerializedAggregateEvaluatorFactory createSerializableAggregateFunctionFactory(
             AggregateFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas,
             JobGenContext context) throws AlgebricksException {
         return lejg.createSerializableAggregateFunctionFactory(expr, env, inputSchemas, context);
@@ -76,143 +58,14 @@
     @Override
     public IRunningAggregateEvaluatorFactory createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr,
             IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
-            throws AlgebricksException {
-        ICopyRunningAggregateFunctionFactory craff = lejg.createRunningAggregateFunctionFactory(expr, env,
-                inputSchemas, context);
-        return new RunningAggregateFunctionFactoryAdapter(craff);
+                    throws AlgebricksException {
+        return lejg.createRunningAggregateFunctionFactory(expr, env, inputSchemas, context);
     }
 
     @Override
     public IUnnestingEvaluatorFactory createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr,
             IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
-            throws AlgebricksException {
-        ICopyUnnestingFunctionFactory cuff = lejg.createUnnestingFunctionFactory(expr, env, inputSchemas, context);
-        return new UnnestingFunctionFactoryAdapter(cuff);
-    }
-
-    public static final class ScalarEvaluatorFactoryAdapter implements IScalarEvaluatorFactory {
-        private static final long serialVersionUID = 1L;
-
-        private final ICopyEvaluatorFactory cef;
-
-        public ScalarEvaluatorFactoryAdapter(ICopyEvaluatorFactory cef) {
-            this.cef = cef;
-        }
-
-        @Override
-        public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
-            final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
-            final ICopyEvaluator ce = cef.createEvaluator(abvs);
-            return new IScalarEvaluator() {
-                @Override
-                public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
-                    abvs.reset();
-                    ce.evaluate(tuple);
-                    result.set(abvs);
-                }
-            };
-        }
-    }
-
-    public static final class AggregateFunctionFactoryAdapter implements IAggregateEvaluatorFactory {
-        private static final long serialVersionUID = 1L;
-
-        private final ICopyAggregateFunctionFactory caff;
-
-        public AggregateFunctionFactoryAdapter(ICopyAggregateFunctionFactory caff) {
-            this.caff = caff;
-        }
-
-        @Override
-        public IAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
-            final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
-            final ICopyAggregateFunction caf = caff.createAggregateFunction(abvs);
-            return new IAggregateEvaluator() {
-                @Override
-                public void step(IFrameTupleReference tuple) throws AlgebricksException {
-                    caf.step(tuple);
-                }
-
-                @Override
-                public void init() throws AlgebricksException {
-                    abvs.reset();
-                    caf.init();
-                }
-
-                @Override
-                public void finishPartial(IPointable result) throws AlgebricksException {
-                    caf.finishPartial();
-                    result.set(abvs);
-                }
-
-                @Override
-                public void finish(IPointable result) throws AlgebricksException {
-                    caf.finish();
-                    result.set(abvs);
-                }
-
-            };
-        }
-    }
-
-    public static final class RunningAggregateFunctionFactoryAdapter implements IRunningAggregateEvaluatorFactory {
-        private static final long serialVersionUID = 1L;
-
-        private final ICopyRunningAggregateFunctionFactory craff;
-
-        public RunningAggregateFunctionFactoryAdapter(ICopyRunningAggregateFunctionFactory craff) {
-            this.craff = craff;
-        }
-
-        @Override
-        public IRunningAggregateEvaluator createRunningAggregateEvaluator() throws AlgebricksException {
-            final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
-            final ICopyRunningAggregateFunction craf = craff.createRunningAggregateFunction(abvs);
-            return new IRunningAggregateEvaluator() {
-                @Override
-                public void step(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
-                    abvs.reset();
-                    craf.step(tuple);
-                    result.set(abvs);
-                }
-
-                @Override
-                public void init() throws AlgebricksException {
-                    craf.init();
-                }
-            };
-        }
-    }
-
-    public static final class UnnestingFunctionFactoryAdapter implements IUnnestingEvaluatorFactory {
-        private static final long serialVersionUID = 1L;
-
-        private final ICopyUnnestingFunctionFactory cuff;
-
-        public UnnestingFunctionFactoryAdapter(ICopyUnnestingFunctionFactory cuff) {
-            this.cuff = cuff;
-        }
-
-        @Override
-        public IUnnestingEvaluator createUnnestingEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
-            final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
-            final ICopyUnnestingFunction cuf = cuff.createUnnestingFunction(abvs);
-            return new IUnnestingEvaluator() {
-                @Override
-                public boolean step(IPointable result) throws AlgebricksException {
-                    abvs.reset();
-                    if (cuf.step()) {
-                        result.set(abvs);
-                        return true;
-                    }
-                    return false;
-                }
-
-                @Override
-                public void init(IFrameTupleReference tuple) throws AlgebricksException {
-                    cuf.init(tuple);
-                }
-            };
-        }
+                    throws AlgebricksException {
+        return lejg.createUnnestingFunctionFactory(expr, env, inputSchemas, context);
     }
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index 672e6d0..efb9681 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -56,7 +56,7 @@
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import org.apache.hyracks.algebricks.core.jobgen.impl.OperatorSchemaImpl;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.operators.aggreg.SerializableAggregatorDescriptorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
@@ -185,7 +185,7 @@
         IPartialAggregationTypeComputer partialAggregationTypeComputer = context.getPartialAggregationTypeComputer();
         List<Object> intermediateTypes = new ArrayList<Object>();
         int n = aggOp.getExpressions().size();
-        ICopySerializableAggregateFunctionFactory[] aff = new ICopySerializableAggregateFunctionFactory[n];
+        ISerializedAggregateEvaluatorFactory[] aff = new ISerializedAggregateEvaluatorFactory[n];
         int i = 0;
         IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
         IVariableTypeEnvironment aggOpInputEnv = context.getTypeEnvironment(aggOp.getInputs().get(0).getValue());
@@ -223,7 +223,7 @@
         IBinaryHashFunctionFactory[] hashFunctionFactories = JobGenHelper.variablesToBinaryHashFunctionFactories(
                 gbyCols, aggOpInputEnv, context);
 
-        ICopySerializableAggregateFunctionFactory[] merges = new ICopySerializableAggregateFunctionFactory[n];
+        ISerializedAggregateEvaluatorFactory[] merges = new ISerializedAggregateEvaluatorFactory[n];
         List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
         IOperatorSchema[] localInputSchemas = new IOperatorSchema[1];
         localInputSchemas[0] = new OperatorSchemaImpl();
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/PigletExpressionJobGen.java b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/PigletExpressionJobGen.java
index 1c3f9b8..e7d96ac 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/PigletExpressionJobGen.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/PigletExpressionJobGen.java
@@ -41,11 +41,11 @@
 import org.apache.hyracks.algebricks.examples.piglet.exceptions.PigletException;
 import org.apache.hyracks.algebricks.examples.piglet.runtime.functions.PigletFunctionRegistry;
 import org.apache.hyracks.algebricks.examples.piglet.types.Type;
-import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyRunningAggregateFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyUnnestingFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory;
 import org.apache.hyracks.algebricks.runtime.evaluators.ConstantEvalFactory;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
@@ -56,7 +56,7 @@
     private final UTF8StringSerializerDeserializer utf8SerDer = new UTF8StringSerializerDeserializer();
 
     @Override
-    public ICopyEvaluatorFactory createEvaluatorFactory(ILogicalExpression expr, IVariableTypeEnvironment env,
+    public IScalarEvaluatorFactory createEvaluatorFactory(ILogicalExpression expr, IVariableTypeEnvironment env,
             IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException {
         switch (expr.getExpressionTag()) {
             case CONSTANT: {
@@ -92,12 +92,12 @@
                 ScalarFunctionCallExpression sfce = (ScalarFunctionCallExpression) expr;
 
                 List<Mutable<ILogicalExpression>> argExprs = sfce.getArguments();
-                ICopyEvaluatorFactory argEvalFactories[] = new ICopyEvaluatorFactory[argExprs.size()];
+                IScalarEvaluatorFactory argEvalFactories[] = new IScalarEvaluatorFactory[argExprs.size()];
                 for (int i = 0; i < argEvalFactories.length; ++i) {
                     Mutable<ILogicalExpression> er = argExprs.get(i);
                     argEvalFactories[i] = createEvaluatorFactory(er.getValue(), env, inputSchemas, context);
                 }
-                ICopyEvaluatorFactory funcEvalFactory;
+                IScalarEvaluatorFactory funcEvalFactory;
                 try {
                     funcEvalFactory = PigletFunctionRegistry.createFunctionEvaluatorFactory(
                             sfce.getFunctionIdentifier(), argEvalFactories);
@@ -117,28 +117,28 @@
     }
 
     @Override
-    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(AggregateFunctionCallExpression expr,
+    public IAggregateEvaluatorFactory createAggregateFunctionFactory(AggregateFunctionCallExpression expr,
             IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
             throws AlgebricksException {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+    public ISerializedAggregateEvaluatorFactory createSerializableAggregateFunctionFactory(
             AggregateFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas,
             JobGenContext context) throws AlgebricksException {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public ICopyRunningAggregateFunctionFactory createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr,
+    public IRunningAggregateEvaluatorFactory createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr,
             IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
             throws AlgebricksException {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public ICopyUnnestingFunctionFactory createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr,
+    public IUnnestingEvaluatorFactory createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr,
             IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
             throws AlgebricksException {
         throw new UnsupportedOperationException();
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IPigletFunctionEvaluatorFactoryBuilder.java b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IPigletFunctionEvaluatorFactoryBuilder.java
index c6624eb..6af6177 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IPigletFunctionEvaluatorFactoryBuilder.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IPigletFunctionEvaluatorFactoryBuilder.java
@@ -19,8 +19,8 @@
 package org.apache.hyracks.algebricks.examples.piglet.runtime.functions;
 
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 
 public interface IPigletFunctionEvaluatorFactoryBuilder {
-    public ICopyEvaluatorFactory buildEvaluatorFactory(FunctionIdentifier fid, ICopyEvaluatorFactory[] arguments);
+    public IScalarEvaluatorFactory buildEvaluatorFactory(FunctionIdentifier fid, IScalarEvaluatorFactory[] arguments);
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IntegerEqFunctionEvaluatorFactory.java b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IntegerEqFunctionEvaluatorFactory.java
index 534fc33..2e10438 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IntegerEqFunctionEvaluatorFactory.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IntegerEqFunctionEvaluatorFactory.java
@@ -18,53 +18,45 @@
  */
 package org.apache.hyracks.algebricks.examples.piglet.runtime.functions;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
-public class IntegerEqFunctionEvaluatorFactory implements ICopyEvaluatorFactory {
+public class IntegerEqFunctionEvaluatorFactory implements IScalarEvaluatorFactory {
     private static final long serialVersionUID = 1L;
 
-    private final ICopyEvaluatorFactory arg1Factory;
+    private final IScalarEvaluatorFactory arg1Factory;
 
-    private final ICopyEvaluatorFactory arg2Factory;
+    private final IScalarEvaluatorFactory arg2Factory;
 
-    public IntegerEqFunctionEvaluatorFactory(ICopyEvaluatorFactory arg1Factory, ICopyEvaluatorFactory arg2Factory) {
+    public IntegerEqFunctionEvaluatorFactory(IScalarEvaluatorFactory arg1Factory, IScalarEvaluatorFactory arg2Factory) {
         this.arg1Factory = arg1Factory;
         this.arg2Factory = arg2Factory;
     }
 
     @Override
-    public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
-        return new ICopyEvaluator() {
-            private DataOutput dataout = output.getDataOutput();
-            private ArrayBackedValueStorage out1 = new ArrayBackedValueStorage();
-            private ArrayBackedValueStorage out2 = new ArrayBackedValueStorage();
-            private ICopyEvaluator eval1 = arg1Factory.createEvaluator(out1);
-            private ICopyEvaluator eval2 = arg2Factory.createEvaluator(out2);
+    public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException {
+        return new IScalarEvaluator() {
+            private IPointable out1 = new VoidPointable();
+            private IPointable out2 = new VoidPointable();
+            private IScalarEvaluator eval1 = arg1Factory.createScalarEvaluator(ctx);
+            private IScalarEvaluator eval2 = arg2Factory.createScalarEvaluator(ctx);
+            private byte[] resultData = new byte[1];
 
             @Override
-            public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
-                out1.reset();
-                eval1.evaluate(tuple);
-                out2.reset();
-                eval2.evaluate(tuple);
-                int v1 = IntegerPointable.getInteger(out1.getByteArray(), 0);
-                int v2 = IntegerPointable.getInteger(out2.getByteArray(), 0);
+            public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
+                eval1.evaluate(tuple, out1);
+                eval2.evaluate(tuple, out2);
+                int v1 = IntegerPointable.getInteger(out1.getByteArray(), out1.getStartOffset());
+                int v2 = IntegerPointable.getInteger(out2.getByteArray(), out2.getStartOffset());
                 boolean r = v1 == v2;
-                try {
-                    dataout.writeBoolean(r);
-                } catch (IOException ioe) {
-                    throw new AlgebricksException(ioe);
-                }
+                resultData[0] = r ? (byte) 1 : (byte) 0;
+                result.set(resultData, 0, 1);
             }
         };
     }
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/PigletFunctionRegistry.java b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/PigletFunctionRegistry.java
index 1ebe9cf..340e7f5 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/PigletFunctionRegistry.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/PigletFunctionRegistry.java
@@ -25,7 +25,7 @@
 import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.examples.piglet.exceptions.PigletException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 
 public class PigletFunctionRegistry {
     private static final Map<FunctionIdentifier, IPigletFunctionEvaluatorFactoryBuilder> builderMap;
@@ -35,7 +35,7 @@
 
         temp.put(AlgebricksBuiltinFunctions.EQ, new IPigletFunctionEvaluatorFactoryBuilder() {
             @Override
-            public ICopyEvaluatorFactory buildEvaluatorFactory(FunctionIdentifier fid, ICopyEvaluatorFactory[] arguments) {
+            public IScalarEvaluatorFactory buildEvaluatorFactory(FunctionIdentifier fid, IScalarEvaluatorFactory[] arguments) {
                 return new IntegerEqFunctionEvaluatorFactory(arguments[0], arguments[1]);
             }
         });
@@ -43,7 +43,7 @@
         builderMap = Collections.unmodifiableMap(temp);
     }
 
-    public static ICopyEvaluatorFactory createFunctionEvaluatorFactory(FunctionIdentifier fid, ICopyEvaluatorFactory[] args)
+    public static IScalarEvaluatorFactory createFunctionEvaluatorFactory(FunctionIdentifier fid, IScalarEvaluatorFactory[] args)
             throws PigletException {
         IPigletFunctionEvaluatorFactoryBuilder builder = builderMap.get(fid);
         if (builder == null) {
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java
index 9b2919e..01f1b88 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java
@@ -23,6 +23,7 @@
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
@@ -32,7 +33,8 @@
     private static final long serialVersionUID = 1L;
 
     @Override
-    public IRunningAggregateEvaluator createRunningAggregateEvaluator() throws AlgebricksException {
+    public IRunningAggregateEvaluator createRunningAggregateEvaluator(IHyracksTaskContext ctx)
+            throws AlgebricksException {
         final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
         return new IRunningAggregateEvaluator() {
 
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluator.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluator.java
index 41e2fee..08aea9a 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluator.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluator.java
@@ -23,11 +23,12 @@
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public interface IAggregateEvaluator {
+    /** should be called each time a new aggregate value is computed */
     public void init() throws AlgebricksException;
 
     public void step(IFrameTupleReference tuple) throws AlgebricksException;
 
-    public void finishPartial(IPointable result) throws AlgebricksException;
-
     public void finish(IPointable result) throws AlgebricksException;
-}
\ No newline at end of file
+
+    public void finishPartial(IPointable result) throws AlgebricksException;
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java
index 8bdbed7..8a61280 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java
@@ -25,4 +25,4 @@
 
 public interface IAggregateEvaluatorFactory extends Serializable {
     public IAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) throws AlgebricksException;
-}
\ No newline at end of file
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunction.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunction.java
deleted file mode 100644
index 2222e0b..0000000
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunction.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.base;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-
-public interface ICopyAggregateFunction {
-    /** should be called each time a new aggregate value is computed */
-    public void init() throws AlgebricksException;
-
-    public void step(IFrameTupleReference tuple) throws AlgebricksException;
-
-    public void finish() throws AlgebricksException;
-
-    public void finishPartial() throws AlgebricksException;
-}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunctionFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunctionFactory.java
deleted file mode 100644
index cbb6732..0000000
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunctionFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.base;
-
-import java.io.Serializable;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
-
-public interface ICopyAggregateFunctionFactory extends Serializable {
-    public ICopyAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws AlgebricksException;
-}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluator.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluator.java
deleted file mode 100644
index 03480e8..0000000
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluator.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.base;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-
-public interface ICopyEvaluator {
-    public void evaluate(IFrameTupleReference tuple) throws AlgebricksException;
-}
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluatorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluatorFactory.java
deleted file mode 100644
index a81f351..0000000
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluatorFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.base;
-
-import java.io.Serializable;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
-
-public interface ICopyEvaluatorFactory extends Serializable {
-    public ICopyEvaluator createEvaluator(IDataOutputProvider output) throws AlgebricksException;
-}
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunction.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunction.java
deleted file mode 100644
index 19cab14..0000000
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunction.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.base;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-
-public interface ICopyRunningAggregateFunction {
-    public void init() throws AlgebricksException;
-
-    public void step(IFrameTupleReference tuple) throws AlgebricksException;
-}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunctionFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunctionFactory.java
deleted file mode 100644
index 1fe3595..0000000
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunctionFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.base;
-
-import java.io.Serializable;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
-
-public interface ICopyRunningAggregateFunctionFactory extends Serializable {
-    public ICopyRunningAggregateFunction createRunningAggregateFunction(IDataOutputProvider provider)
-            throws AlgebricksException;
-}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunctionFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunctionFactory.java
deleted file mode 100644
index 0959811..0000000
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunctionFactory.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.base;
-
-import java.io.Serializable;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-
-public interface ICopySerializableAggregateFunctionFactory extends Serializable {
-    public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException;
-}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunction.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunction.java
deleted file mode 100644
index f4e3aea..0000000
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunction.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.base;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-
-public interface ICopyUnnestingFunction {
-    public void init(IFrameTupleReference tuple) throws AlgebricksException;
-
-    public boolean step() throws AlgebricksException;
-
-}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IRunningAggregateEvaluatorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IRunningAggregateEvaluatorFactory.java
index 0fe86a8..c71b41d 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IRunningAggregateEvaluatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IRunningAggregateEvaluatorFactory.java
@@ -21,7 +21,9 @@
 import java.io.Serializable;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public interface IRunningAggregateEvaluatorFactory extends Serializable {
-    public IRunningAggregateEvaluator createRunningAggregateEvaluator() throws AlgebricksException;
-}
\ No newline at end of file
+    public IRunningAggregateEvaluator createRunningAggregateEvaluator(IHyracksTaskContext ctx)
+            throws AlgebricksException;
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunction.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ISerializedAggregateEvaluator.java
similarity index 94%
rename from algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunction.java
rename to algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ISerializedAggregateEvaluator.java
index 078a51d..473afbc 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunction.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ISerializedAggregateEvaluator.java
@@ -23,13 +23,12 @@
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-public interface ICopySerializableAggregateFunction {
+public interface ISerializedAggregateEvaluator {
     /**
      * initialize the space occupied by internal state
      *
      * @param state
      * @throws AlgebricksException
-     * @return length of the intermediate state
      */
     public void init(DataOutput state) throws AlgebricksException;
 
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunctionFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ISerializedAggregateEvaluatorFactory.java
similarity index 79%
rename from algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunctionFactory.java
rename to algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ISerializedAggregateEvaluatorFactory.java
index 1a09fcf..084f39d 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunctionFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ISerializedAggregateEvaluatorFactory.java
@@ -21,8 +21,8 @@
 import java.io.Serializable;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
-public interface ICopyUnnestingFunctionFactory extends Serializable {
-    public ICopyUnnestingFunction createUnnestingFunction(IDataOutputProvider provider) throws AlgebricksException;
+public interface ISerializedAggregateEvaluatorFactory extends Serializable {
+    public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) throws AlgebricksException;
 }
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluator.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluator.java
index f29e65e..67aede4 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluator.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluator.java
@@ -26,4 +26,5 @@
     public void init(IFrameTupleReference tuple) throws AlgebricksException;
 
     public boolean step(IPointable result) throws AlgebricksException;
-}
\ No newline at end of file
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java
index eef98b5..53ca8b5 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java
@@ -25,4 +25,4 @@
 
 public interface IUnnestingEvaluatorFactory extends Serializable {
     public IUnnestingEvaluator createUnnestingEvaluator(IHyracksTaskContext ctx) throws AlgebricksException;
-}
\ No newline at end of file
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java
index 5bb206c..1b7753c 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java
@@ -18,16 +18,14 @@
  */
 package org.apache.hyracks.algebricks.runtime.evaluators;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-public class ColumnAccessEvalFactory implements ICopyEvaluatorFactory {
+public class ColumnAccessEvalFactory implements IScalarEvaluatorFactory {
 
     private static final long serialVersionUID = 1L;
 
@@ -43,21 +41,15 @@
     }
 
     @Override
-    public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
-        return new ICopyEvaluator() {
-
-            private DataOutput out = output.getDataOutput();
+    public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException {
+        return new IScalarEvaluator() {
 
             @Override
-            public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+            public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
                 byte[] buffer = tuple.getFieldData(fieldIndex);
                 int start = tuple.getFieldStart(fieldIndex);
                 int length = tuple.getFieldLength(fieldIndex);
-                try {
-                    out.write(buffer, start, length);
-                } catch (IOException ioe) {
-                    throw new AlgebricksException(ioe);
-                }
+                result.set(buffer, start, length);
             }
         };
     }
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java
index b0eebd9..da7d42b 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java
@@ -18,16 +18,14 @@
  */
 package org.apache.hyracks.algebricks.runtime.evaluators;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-public class ConstantEvalFactory implements ICopyEvaluatorFactory {
+public class ConstantEvalFactory implements IScalarEvaluatorFactory {
     private static final long serialVersionUID = 1L;
 
     private byte[] value;
@@ -42,18 +40,12 @@
     }
 
     @Override
-    public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
-        return new ICopyEvaluator() {
-
-            private DataOutput out = output.getDataOutput();
+    public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException {
+        return new IScalarEvaluator() {
 
             @Override
-            public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
-                try {
-                    out.write(value, 0, value.length);
-                } catch (IOException ioe) {
-                    throw new AlgebricksException(ioe);
-                }
+            public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
+                result.set(value, 0, value.length);
             }
         };
     }
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvaluatorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvaluatorFactory.java
deleted file mode 100644
index 05229fc..0000000
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvaluatorFactory.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.evaluators;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-
-public class ConstantEvaluatorFactory implements IScalarEvaluatorFactory {
-    private static final long serialVersionUID = 1L;
-
-    private byte[] value;
-
-    public ConstantEvaluatorFactory(byte[] value) {
-        this.value = value;
-    }
-
-    @Override
-    public String toString() {
-        return "Constant";
-    }
-
-    @Override
-    public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
-        return new IScalarEvaluator() {
-            @Override
-            public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
-                result.set(value, 0, value.length);
-            }
-        };
-    }
-
-}
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
index 82e5f50..fe42878 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
@@ -21,8 +21,8 @@
 import java.io.DataOutput;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -35,16 +35,16 @@
 
 public class SerializableAggregatorDescriptorFactory extends AbstractAccumulatingAggregatorDescriptorFactory {
     private static final long serialVersionUID = 1L;
-    private ICopySerializableAggregateFunctionFactory[] aggFactories;
+    private ISerializedAggregateEvaluatorFactory[] aggFactories;
 
-    public SerializableAggregatorDescriptorFactory(ICopySerializableAggregateFunctionFactory[] aggFactories) {
+    public SerializableAggregatorDescriptorFactory(ISerializedAggregateEvaluatorFactory[] aggFactories) {
         this.aggFactories = aggFactories;
     }
 
     @Override
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         final int[] keys = keyFields;
 
         /**
@@ -52,7 +52,7 @@
          */
         return new IAggregatorDescriptor() {
             private FrameTupleReference ftr = new FrameTupleReference();
-            private ICopySerializableAggregateFunction[] aggs = new ICopySerializableAggregateFunction[aggFactories.length];
+            private ISerializedAggregateEvaluator[] aggs = new ISerializedAggregateEvaluator[aggFactories.length];
             private int offsetFieldIndex = keys.length;
             private int stateFieldLength[] = new int[aggFactories.length];
 
@@ -70,7 +70,7 @@
                     try {
                         int begin = tb.getSize();
                         if (aggs[i] == null) {
-                            aggs[i] = aggFactories[i].createAggregateFunction();
+                            aggs[i] = aggFactories[i].createAggregateEvaluator(ctx);
                         }
                         aggs[i].init(output);
                         tb.addFieldEndOffset();
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
index b7f11d8..2d5c929 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
@@ -24,8 +24,8 @@
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -35,7 +35,8 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -48,12 +49,13 @@
     private static final long serialVersionUID = 1L;
     public static int NO_DEFAULT_BRANCH = -1;
 
-    private final ICopyEvaluatorFactory[] evalFactories;
+    private final IScalarEvaluatorFactory[] evalFactories;
     private final IBinaryBooleanInspector boolInspector;
     private final int defaultBranchIndex;
 
-    public PartitioningSplitOperatorDescriptor(IOperatorDescriptorRegistry spec, ICopyEvaluatorFactory[] evalFactories,
-            IBinaryBooleanInspector boolInspector, int defaultBranchIndex, RecordDescriptor rDesc) {
+    public PartitioningSplitOperatorDescriptor(IOperatorDescriptorRegistry spec,
+            IScalarEvaluatorFactory[] evalFactories, IBinaryBooleanInspector boolInspector, int defaultBranchIndex,
+            RecordDescriptor rDesc) {
         super(spec, 1, (defaultBranchIndex == evalFactories.length) ? evalFactories.length + 1 : evalFactories.length);
         for (int i = 0; i < evalFactories.length; i++) {
             recordDescriptors[i] = rDesc;
@@ -71,8 +73,8 @@
             private final IFrameWriter[] writers = new IFrameWriter[outputArity];
             private final boolean[] isOpen = new boolean[outputArity];
             private final IFrame[] writeBuffers = new IFrame[outputArity];
-            private final ICopyEvaluator[] evals = new ICopyEvaluator[outputArity];
-            private final ArrayBackedValueStorage evalBuf = new ArrayBackedValueStorage();
+            private final IScalarEvaluator[] evals = new IScalarEvaluator[outputArity];
+            private final IPointable evalPointable = new VoidPointable();
             private final RecordDescriptor inOutRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(),
                     0);
             private final FrameTupleAccessor accessor = new FrameTupleAccessor(inOutRecDesc);
@@ -149,12 +151,12 @@
                     boolean found = false;
                     for (int j = 0; j < evals.length; j++) {
                         try {
-                            evalBuf.reset();
-                            evals[j].evaluate(frameTuple);
+                            evals[j].evaluate(frameTuple, evalPointable);
                         } catch (AlgebricksException e) {
                             throw new HyracksDataException(e);
                         }
-                        found = boolInspector.getBooleanValue(evalBuf.getByteArray(), 0, 1);
+                        found = boolInspector.getBooleanValue(evalPointable.getByteArray(),
+                                evalPointable.getStartOffset(), evalPointable.getLength());
                         if (found) {
                             copyAndAppendTuple(j);
                             break;
@@ -199,7 +201,7 @@
                 // Create evaluators for partitioning.
                 try {
                     for (int i = 0; i < evalFactories.length; i++) {
-                        evals[i] = evalFactories[i].createEvaluator(evalBuf);
+                        evals[i] = evalFactories[i].createScalarEvaluator(ctx);
                     }
                 } catch (AlgebricksException e) {
                     throw new HyracksDataException(e);
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index 5a26f36..bb6cc73 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -100,7 +100,7 @@
                     int n = runningAggregates.length;
                     for (int i = 0; i < n; i++) {
                         try {
-                            raggs[i] = runningAggregates[i].createRunningAggregateEvaluator();
+                            raggs[i] = runningAggregates[i].createRunningAggregateEvaluator(ctx);
                         } catch (AlgebricksException ae) {
                             throw new HyracksDataException(ae);
                         }
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 2c04003..e9d4ec3 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -27,7 +27,7 @@
 import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.base.IUnnestingPositionWriter;
-import org.apache.hyracks.algebricks.runtime.evaluators.ConstantEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.evaluators.ConstantEvalFactory;
 import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -73,7 +73,7 @@
         this.positionWriter = positionWriter;
         this.posOffsetEvalFactory = posOffsetEvalFactory;
         if (this.posOffsetEvalFactory == null) {
-            this.posOffsetEvalFactory = new ConstantEvaluatorFactory(new byte[5]);
+            this.posOffsetEvalFactory = new ConstantEvalFactory(new byte[5]);
         }
     }
 
@@ -88,7 +88,7 @@
 
         return new AbstractOneInputOneOutputOneFramePushRuntime() {
             private IPointable p = VoidPointable.FACTORY.createPointable();
-            private IUnnestingEvaluator agg;
+            private IUnnestingEvaluator unnest;
             private ArrayTupleBuilder tupleBuilder;
 
             private IScalarEvaluator offsetEval = posOffsetEvalFactory.createScalarEvaluator(ctx);
@@ -98,7 +98,7 @@
                 writer.open();
                 initAccessAppendRef(ctx);
                 try {
-                    agg = unnestingFactory.createUnnestingEvaluator(ctx);
+                    unnest = unnestingFactory.createUnnestingEvaluator(ctx);
                 } catch (AlgebricksException ae) {
                     throw new HyracksDataException(ae);
                 }
@@ -118,7 +118,7 @@
                     }
                     int offset = IntegerPointable.getInteger(p.getByteArray(), p.getStartOffset());
                     try {
-                        agg.init(tRef);
+                        unnest.init(tRef);
                         // assume that when unnesting the tuple, each step() call for each element
                         // in the tuple will increase the positionIndex, and the positionIndex will
                         // be reset when a new tuple is to be processed.
@@ -126,7 +126,7 @@
                         boolean goon = true;
                         do {
                             tupleBuilder.reset();
-                            if (!agg.step(p)) {
+                            if (!unnest.step(p)) {
                                 goon = false;
                             } else {
 
diff --git a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java
index fd979fc..f7a97f4 100644
--- a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java
+++ b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java
@@ -29,7 +29,6 @@
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
 public class IntegerAddEvalFactory implements IScalarEvaluatorFactory {
 
@@ -52,7 +51,6 @@
             private IScalarEvaluator evalLeft = evalLeftFactory.createScalarEvaluator(ctx);
             private IScalarEvaluator evalRight = evalRightFactory.createScalarEvaluator(ctx);
 
-            @SuppressWarnings("static-access")
             @Override
             public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
                 evalLeft.evaluate(tuple, p);
diff --git a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java
index ea415c8..66d3848 100644
--- a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java
+++ b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java
@@ -27,7 +27,6 @@
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
 public class IntegerEqualsEvalFactory implements IScalarEvaluatorFactory {
 
diff --git a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java
index aebc406..46a7ab6 100644
--- a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java
+++ b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java
@@ -27,7 +27,6 @@
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
 public class IntegerGreaterThanEvalFactory implements IScalarEvaluatorFactory {
 
diff --git a/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java b/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
index e311fa6..7e834db 100644
--- a/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
+++ b/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
@@ -94,7 +94,8 @@
      * Returns the character at the given byte offset. The caller is responsible for making sure that
      * the provided offset is within bounds and points to the beginning of a valid UTF8 character.
      *
-     * @param offset - Byte offset
+     * @param offset
+     *            - Byte offset
      * @return Character at the given offset.
      */
     public char charAt(int offset) {
@@ -157,6 +158,7 @@
         UTF8StringUtil.toString(buffer, bytes, start);
     }
 
+    @Override
     public String toString() {
         return new String(this.bytes, this.getCharStartOffset(), this.getUTF8Length(), Charset.forName("UTF-8"));
     }
@@ -166,8 +168,8 @@
      */
 
     public int ignoreCaseCompareTo(UTF8StringPointable other) {
-        return UTF8StringUtil.lowerCaseCompareTo(this.getByteArray(), this.getStartOffset(),
-                other.getByteArray(), other.getStartOffset());
+        return UTF8StringUtil.lowerCaseCompareTo(this.getByteArray(), this.getStartOffset(), other.getByteArray(),
+                other.getStartOffset());
     }
 
     public int find(UTF8StringPointable pattern, boolean ignoreCase) {
@@ -228,8 +230,9 @@
     public static boolean startsWith(UTF8StringPointable src, UTF8StringPointable pattern, boolean ignoreCase) {
         int utflen1 = src.getUTF8Length();
         int utflen2 = pattern.getUTF8Length();
-        if (utflen2 > utflen1)
+        if (utflen2 > utflen1) {
             return false;
+        }
 
         int s1Start = src.getMetaDataLength();
         int s2Start = pattern.getMetaDataLength();
@@ -257,8 +260,9 @@
     public static boolean endsWith(UTF8StringPointable src, UTF8StringPointable pattern, boolean ignoreCase) {
         int len1 = src.getUTF8Length();
         int len2 = pattern.getUTF8Length();
-        if (len2 > len1)
+        if (len2 > len1) {
             return false;
+        }
 
         int s1Start = src.getMetaDataLength();
         int s2Start = pattern.getMetaDataLength();
@@ -351,10 +355,7 @@
      * @param out
      * @throws IOException
      */
-    public static void substrBefore(
-            UTF8StringPointable src,
-            UTF8StringPointable match,
-            UTF8StringBuilder builder,
+    public static void substrBefore(UTF8StringPointable src, UTF8StringPointable match, UTF8StringBuilder builder,
             GrowableArray out) throws IOException {
 
         int byteOffset = find(src, match, false);
@@ -367,7 +368,7 @@
         final int srcMetaLen = src.getMetaDataLength();
 
         builder.reset(out, byteOffset);
-        for (int idx = 0; idx < byteOffset; ) {
+        for (int idx = 0; idx < byteOffset;) {
             builder.appendChar(src.charAt(srcMetaLen + idx));
             idx += src.charSize(srcMetaLen + idx);
         }
@@ -387,10 +388,7 @@
      * @param builder
      * @param out
      */
-    public static void substrAfter(
-            UTF8StringPointable src,
-            UTF8StringPointable match,
-            UTF8StringBuilder builder,
+    public static void substrAfter(UTF8StringPointable src, UTF8StringPointable match, UTF8StringBuilder builder,
             GrowableArray out) throws IOException {
 
         int byteOffset = find(src, match, false);
diff --git a/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/encoding/VarLenIntEncoderDecoder.java b/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/encoding/VarLenIntEncoderDecoder.java
index 5a716b4..0e70273 100644
--- a/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/encoding/VarLenIntEncoderDecoder.java
+++ b/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/encoding/VarLenIntEncoderDecoder.java
@@ -24,17 +24,14 @@
 
 /**
  * Encodes positive integers in a variable-bytes format.
- *
  * Each byte stores seven bits of the number. The first bit of each byte notifies if it is the last byte.
  * Specifically, if the first bit is set, then we need to shift the current value by seven and
  * continue to read the next byte util we meet a byte whose first byte is unset.
- *
  * e.g. if the number is < 128, it will be stored using one byte and the byte value keeps as original.
  * To store the number 255 (0xff) , it will be encoded as [0x81,0x7f]. To decode that value, it reads the 0x81
  * to know that the current value is (0x81 & 0x7f)= 0x01, and the first bit tells that there are more bytes to
  * be read. When it meets 0x7f, whose first flag is unset, it knows that it is the final byte to decode.
  * Finally it will return ( 0x01 << 7) + 0x7f === 255.
- *
  */
 public class VarLenIntEncoderDecoder {
     // sometimes the dec number is easier to get the sense of how big it is.
@@ -75,11 +72,16 @@
 
     public static int decode(byte[] srcBytes, int startPos) {
         int sum = 0;
-        while ((srcBytes[startPos] & CONTINUE_CHUNK) == CONTINUE_CHUNK) {
+        while (startPos < srcBytes.length && (srcBytes[startPos] & CONTINUE_CHUNK) == CONTINUE_CHUNK) {
             sum = (sum + (srcBytes[startPos] & DECODE_MASK)) << 7;
             startPos++;
         }
-        sum += srcBytes[startPos++];
+        if (startPos < srcBytes.length) {
+            sum += srcBytes[startPos];
+        } else {
+            throw new IllegalStateException("Corrupted string bytes: trying to access entry " + startPos
+                    + " in a byte array of length " + srcBytes.length);
+        }
         return sum;
     }