[ASTERIXDB-2516][RT] Modifty range map function of parallel sort

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Currently, the range map function of parallel sort passes "null"
to get a comparator to sort the samples and produce the range map.
The comparator provider will enforce providing a type and prevent
passing "null". The range map function needs to be updated to get
the types of the sort fields and use them to get comparators.
- changed the output type of the local sampling function from
list of list of ANY to binary. The old type computer was removed.
- added null writer aggregate function that just produces null as
an aggregate value. this is needed in order to propagate the type
of the sort fields from the local step to the global step so that
the range map function can know the types of the sort fields.

Change-Id: I7edbb10906cc4464210af87a5b1630ba3aecbde0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3333
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index f42f0d5..a7d40b4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -363,8 +363,8 @@
         physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
         physicalRewritesAllLevels.add(new AddEquivalenceClassForRecordConstructorRule());
         physicalRewritesAllLevels.add(new CheckFullParallelSortRule());
-        physicalRewritesAllLevels
-                .add(new EnforceStructuralPropertiesRule(BuiltinFunctions.RANGE_MAP, BuiltinFunctions.LOCAL_SAMPLING));
+        physicalRewritesAllLevels.add(new EnforceStructuralPropertiesRule(BuiltinFunctions.RANGE_MAP,
+                BuiltinFunctions.LOCAL_SAMPLING, BuiltinFunctions.NULL_WRITER));
         physicalRewritesAllLevels.add(new RemoveSortInFeedIngestionRule());
         physicalRewritesAllLevels.add(new RemoveUnnecessarySortMergeExchange());
         physicalRewritesAllLevels.add(new PushProjectDownRule());
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/BTreeAccessMethod.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/BTreeAccessMethod.java
index c5ff5d1..99412dd 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/BTreeAccessMethod.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/BTreeAccessMethod.java
@@ -742,7 +742,7 @@
                 unnestMapOp.setSourceLocation(dataSourceOp.getSourceLocation());
                 indexSearchOp = unnestMapOp;
             }
-
+            // TODO: shouldn't indexSearchOp execution mode be set to that of the input? the default is UNPARTITIONED
             indexSearchOp.getInputs().add(new MutableObject<>(inputOp));
 
             // Adds equivalence classes --- one equivalent class between a primary key
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index 382b1b2..e2ba1e1 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -18,7 +18,11 @@
  */
 package org.apache.asterix.om.functions;
 
-import static org.apache.asterix.om.functions.BuiltinFunctions.WindowFunctionProperty.*;
+import static org.apache.asterix.om.functions.BuiltinFunctions.WindowFunctionProperty.HAS_LIST_ARG;
+import static org.apache.asterix.om.functions.BuiltinFunctions.WindowFunctionProperty.INJECT_ORDER_ARGS;
+import static org.apache.asterix.om.functions.BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION;
+import static org.apache.asterix.om.functions.BuiltinFunctions.WindowFunctionProperty.NO_FRAME_CLAUSE;
+import static org.apache.asterix.om.functions.BuiltinFunctions.WindowFunctionProperty.NO_ORDER_CLAUSE;
 
 import java.util.Collections;
 import java.util.EnumSet;
@@ -84,7 +88,6 @@
 import org.apache.asterix.om.typecomputer.impl.IfNanOrInfTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.IfNullTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.InjectFailureTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.ListOfSamplesTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.LocalAvgTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.LocalSingleVarStatisticsTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.MinMaxAggTypeComputer;
@@ -507,7 +510,7 @@
     public static final FunctionIdentifier LOCAL_SAMPLING =
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-sampling", FunctionIdentifier.VARARGS);
     public static final FunctionIdentifier RANGE_MAP =
-            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-range-map", 1);
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-range-map", FunctionIdentifier.VARARGS);
     public static final FunctionIdentifier STDDEV_POP =
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-stddev_pop", 1);
     public static final FunctionIdentifier GLOBAL_STDDEV_POP =
@@ -548,6 +551,8 @@
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-intermediate-kurtosis", 1);
     public static final FunctionIdentifier LOCAL_KURTOSIS =
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-kurtosis", 1);
+    public static final FunctionIdentifier NULL_WRITER =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-null-writer", 1);
 
     public static final FunctionIdentifier SCALAR_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "avg", 1);
     public static final FunctionIdentifier SCALAR_COUNT =
@@ -1759,7 +1764,7 @@
         addPrivateFunction(LOCAL_STDDEV_SAMP, LocalSingleVarStatisticsTypeComputer.INSTANCE, true);
         addFunction(STDDEV_SAMP, NullableDoubleTypeComputer.INSTANCE, true);
         addPrivateFunction(GLOBAL_STDDEV_SAMP, NullableDoubleTypeComputer.INSTANCE, true);
-        addPrivateFunction(LOCAL_SAMPLING, ListOfSamplesTypeComputer.INSTANCE, true);
+        addPrivateFunction(LOCAL_SAMPLING, ABinaryTypeComputer.INSTANCE, true);
         addPrivateFunction(RANGE_MAP, ABinaryTypeComputer.INSTANCE, true);
         addPrivateFunction(LOCAL_STDDEV_POP, LocalSingleVarStatisticsTypeComputer.INSTANCE, true);
         addFunction(STDDEV_POP, NullableDoubleTypeComputer.INSTANCE, true);
@@ -1776,6 +1781,7 @@
         addPrivateFunction(LOCAL_KURTOSIS, LocalSingleVarStatisticsTypeComputer.INSTANCE, true);
         addFunction(KURTOSIS, NullableDoubleTypeComputer.INSTANCE, true);
         addPrivateFunction(GLOBAL_KURTOSIS, NullableDoubleTypeComputer.INSTANCE, true);
+        addPrivateFunction(NULL_WRITER, PropagateTypeComputer.INSTANCE_NULLABLE, true);
 
         // SUM
         addFunction(SUM, NumericSumAggTypeComputer.INSTANCE, true);
@@ -2579,6 +2585,11 @@
         addIntermediateAgg(RANGE_MAP, RANGE_MAP);
         addGlobalAgg(RANGE_MAP, RANGE_MAP);
 
+        addAgg(NULL_WRITER);
+        addLocalAgg(NULL_WRITER, NULL_WRITER);
+        addIntermediateAgg(NULL_WRITER, NULL_WRITER);
+        addGlobalAgg(NULL_WRITER, NULL_WRITER);
+
         // MIN
 
         addAgg(MIN);
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ListOfSamplesTypeComputer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ListOfSamplesTypeComputer.java
deleted file mode 100644
index 1ae72e4..0000000
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ListOfSamplesTypeComputer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.om.typecomputer.impl;
-
-import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
-import org.apache.asterix.om.typecomputer.base.AbstractResultTypeComputer;
-import org.apache.asterix.om.types.AOrderedListType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-
-/**
- * List of samples type: [[ANY], [ANY],...]. Each inner list constitutes one sample. Inside the inner list (the sample),
- * each item (or field) has its type tag.
- */
-public class ListOfSamplesTypeComputer extends AbstractResultTypeComputer {
-
-    public static final AOrderedListType TYPE =
-            new AOrderedListType(DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE, null);
-    public static final ListOfSamplesTypeComputer INSTANCE = new ListOfSamplesTypeComputer();
-
-    private ListOfSamplesTypeComputer() {
-    }
-
-    @Override
-    protected IAType getResultType(ILogicalExpression expr, IAType... strippedInputTypes) throws AlgebricksException {
-        return TYPE;
-    }
-}
diff --git a/asterixdb/asterix-om/src/test/java/org/apache/asterix/om/typecomputer/TypeComputerTest.java b/asterixdb/asterix-om/src/test/java/org/apache/asterix/om/typecomputer/TypeComputerTest.java
index 4733c2f..bd77580 100644
--- a/asterixdb/asterix-om/src/test/java/org/apache/asterix/om/typecomputer/TypeComputerTest.java
+++ b/asterixdb/asterix-om/src/test/java/org/apache/asterix/om/typecomputer/TypeComputerTest.java
@@ -48,6 +48,7 @@
 import org.reflections.scanners.SubTypesScanner;
 
 // Tests if all type computers can handle input type ANY properly.
+// TODO: this test should be fixed/updated/modified/enhanced
 public class TypeComputerTest {
 
     @Test
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/collections/NullWriterAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/collections/NullWriterAggregateDescriptor.java
new file mode 100644
index 0000000..019465f
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/collections/NullWriterAggregateDescriptor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.collections;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import org.apache.asterix.runtime.aggregates.std.AbstractAggregateFunction;
+import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class NullWriterAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = NullWriterAggregateDescriptor::new;
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.NULL_WRITER;
+    }
+
+    @Override
+    public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) {
+        return new IAggregateEvaluatorFactory() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) {
+                return new AbstractAggregateFunction(sourceLoc) {
+
+                    @Override
+                    public void init() {
+                        // do nothing
+                    }
+
+                    @Override
+                    public void step(IFrameTupleReference tuple) {
+                        // do nothing
+                    }
+
+                    @Override
+                    public void finish(IPointable result) {
+                        PointableHelper.setNull(result);
+                    }
+
+                    @Override
+                    public void finishPartial(IPointable result) {
+                        finish(result);
+                    }
+                };
+            }
+        };
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java
index 55d381d..25256c1 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java
@@ -20,16 +20,16 @@
 
 import java.io.IOException;
 
-import org.apache.asterix.builders.IAsterixListBuilder;
-import org.apache.asterix.builders.OrderedListBuilder;
 import org.apache.asterix.common.config.CompilerProperties;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.ABinary;
+import org.apache.asterix.om.base.AMutableBinary;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.om.functions.IFunctionTypeInferer;
-import org.apache.asterix.om.typecomputer.impl.ListOfSamplesTypeComputer;
 import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
 import org.apache.asterix.runtime.functions.FunctionTypeInferers;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
@@ -38,11 +38,15 @@
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
 public class LocalSamplingAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
     private static final long serialVersionUID = 1L;
@@ -78,80 +82,76 @@
             @Override
             public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx)
                     throws HyracksDataException {
-                return new LocalSamplingAggregateFunction(args, ctx, numSamples);
+                return new LocalSamplingAggregateFunction(args, ctx, numSamples, sourceLoc);
             }
         };
     }
 
-    private class LocalSamplingAggregateFunction implements IAggregateEvaluator {
+    private static class LocalSamplingAggregateFunction extends AbstractAggregateFunction {
+        @SuppressWarnings("unchecked")
+        private ISerializerDeserializer<ABinary> binarySerde =
+                SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ABINARY);
+        private final AMutableBinary binary = new AMutableBinary(null, 0, 0);
+        private final ArrayBackedValueStorage storage = new ArrayBackedValueStorage();
+        private final ArrayBackedValueStorage rangeMapBits = new ArrayBackedValueStorage();
+        private final IPointable inputFieldValue = new VoidPointable();
         private final int numSamplesRequired;
-        private final ArrayBackedValueStorage storage;
-        private final IAsterixListBuilder listOfSamplesBuilder;
-        private final IAsterixListBuilder oneSampleBuilder;
         private final IScalarEvaluator[] sampledFieldsEval;
-        private final IPointable inputFieldValue;
-        private int numSamplesTaken;
+        private int numSamples;
 
         /**
          * @param args the fields that constitute a sample, e.g., $$1, $$2
          * @param context Hyracks task
+         * @param numSamples number of samples to take
+         * @param srcLoc source location
          * @throws HyracksDataException
          */
         private LocalSamplingAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context,
-                int numSamplesRequired) throws HyracksDataException {
-            storage = new ArrayBackedValueStorage();
-            inputFieldValue = new VoidPointable();
+                int numSamples, SourceLocation srcLoc) throws HyracksDataException {
+            super(srcLoc);
             sampledFieldsEval = new IScalarEvaluator[args.length];
             for (int i = 0; i < args.length; i++) {
                 sampledFieldsEval[i] = args[i].createScalarEvaluator(context);
             }
-            oneSampleBuilder = new OrderedListBuilder();
-            listOfSamplesBuilder = new OrderedListBuilder();
-            listOfSamplesBuilder.reset(ListOfSamplesTypeComputer.TYPE);
-            this.numSamplesRequired = numSamplesRequired > 0 ? numSamplesRequired
-                    : (int) CompilerProperties.Option.COMPILER_SORT_SAMPLES.defaultValue();
+            this.numSamplesRequired =
+                    numSamples > 0 ? numSamples : (int) CompilerProperties.Option.COMPILER_SORT_SAMPLES.defaultValue();
         }
 
         @Override
         public void init() throws HyracksDataException {
-            numSamplesTaken = 0;
-            listOfSamplesBuilder.reset(ListOfSamplesTypeComputer.TYPE);
+            numSamples = 0;
+            rangeMapBits.reset();
+            // write a dummy integer at the beginning to be filled later with the actual number of samples taken
+            IntegerSerializerDeserializer.write(0, rangeMapBits.getDataOutput());
         }
 
         /**
          * Receives data stream one tuple at a time from a data source and records samples.
          * @param tuple one sample
-         * @throws HyracksDataException
+         * @throws HyracksDataException IO exception
          */
         @Override
         public void step(IFrameTupleReference tuple) throws HyracksDataException {
-            if (numSamplesTaken >= numSamplesRequired) {
+            if (numSamples >= numSamplesRequired) {
                 return;
             }
-            // start over for a new sample
-            oneSampleBuilder.reset((AbstractCollectionType) ListOfSamplesTypeComputer.TYPE.getItemType());
-
-            for (IScalarEvaluator fieldEval : sampledFieldsEval) {
-                // add fields to make up one sample
-                fieldEval.evaluate(tuple, inputFieldValue);
-                oneSampleBuilder.addItem(inputFieldValue);
+            for (int i = 0; i < sampledFieldsEval.length; i++) {
+                sampledFieldsEval[i].evaluate(tuple, inputFieldValue);
+                IntegerSerializerDeserializer.write(inputFieldValue.getLength(), rangeMapBits.getDataOutput());
+                rangeMapBits.append(inputFieldValue);
             }
-            // prepare the sample to add it to the list of samples
-            storage.reset();
-            oneSampleBuilder.write(storage.getDataOutput(), true);
-            listOfSamplesBuilder.addItem(storage);
-            numSamplesTaken++;
+            numSamples++;
         }
 
         /**
          * Sends the list of samples to the global range-map generator.
-         * @param result list of samples
-         * @throws HyracksDataException
+         * @param result will store the list of samples
+         * @throws HyracksDataException IO exception
          */
         @Override
         public void finish(IPointable result) throws HyracksDataException {
             storage.reset();
-            if (numSamplesTaken == 0) {
+            if (numSamples == 0) {
                 // empty partition? then send system null as an indication of empty partition.
                 try {
                     storage.getDataOutput().writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
@@ -160,7 +160,9 @@
                     throw HyracksDataException.create(e);
                 }
             } else {
-                listOfSamplesBuilder.write(storage.getDataOutput(), true);
+                IntegerPointable.setInteger(rangeMapBits.getByteArray(), rangeMapBits.getStartOffset(), numSamples);
+                binary.setValue(rangeMapBits.getByteArray(), rangeMapBits.getStartOffset(), rangeMapBits.getLength());
+                binarySerde.serialize(binary, storage.getDataOutput());
                 result.set(storage);
             }
         }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
index bf8cf49..b174d07 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
@@ -26,13 +26,17 @@
 import java.util.List;
 
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.ABinary;
+import org.apache.asterix.om.base.AMutableBinary;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.om.functions.IFunctionTypeInferer;
 import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
-import org.apache.asterix.runtime.evaluators.common.ListAccessor;
 import org.apache.asterix.runtime.functions.FunctionTypeInferers;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
@@ -41,8 +45,12 @@
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.ByteArrayPointable;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
@@ -74,9 +82,10 @@
  */
 public class RangeMapAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
     private static final long serialVersionUID = 1L;
-    private boolean[] ascendingFlags;
-    private int numOfPartitions;
+    private boolean[] ascFlags;
+    private int numPartitions;
     private int numOrderFields;
+    private IAType[] argsTypes;
 
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
         @Override
@@ -100,13 +109,14 @@
      * determine how many split points to pick out of the samples. It also needs to know the ascending/descending of
      * each sort field so that it can sort the samples accordingly first and then pick the (number of partitions - 1)
      * split points out of the sorted samples.
-     * @param states states[0]: number of partitions, states[1]: ascending flags
+     * @param states states[0]: number of partitions, states[1]: ascending flags, states[2]: inputs types
      */
     @Override
     public void setImmutableStates(Object... states) {
-        numOfPartitions = (int) states[0];
-        ascendingFlags = (boolean[]) states[1];
-        numOrderFields = ascendingFlags.length;
+        numPartitions = (int) states[0];
+        ascFlags = (boolean[]) states[1];
+        numOrderFields = ascFlags.length;
+        argsTypes = (IAType[]) states[2];
     }
 
     @Override
@@ -117,40 +127,34 @@
             @Override
             public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx)
                     throws HyracksDataException {
-                return new RangeMapFunction(args, ctx, ascendingFlags, numOfPartitions, numOrderFields);
+                return new RangeMapFunction(args, ctx, ascFlags, numPartitions, numOrderFields, sourceLoc, argsTypes);
             }
         };
     }
 
-    private class RangeMapFunction implements IAggregateEvaluator {
+    private static class RangeMapFunction extends AbstractAggregateFunction {
+        @SuppressWarnings("unchecked")
+        private ISerializerDeserializer<ABinary> binarySerde =
+                SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ABINARY);
+        private final AMutableBinary binary = new AMutableBinary(null, 0, 0);
+        private final List<List<byte[]>> finalSamples = new ArrayList<>();
+        private final ArrayBackedValueStorage storage = new ArrayBackedValueStorage();
+        private final IPointable input = new VoidPointable();
+        private final ByteArrayPointable rangeMapPointable = new ByteArrayPointable();
         private final IScalarEvaluator localSamplesEval;
-        private final IPointable localSamples;
-        private final List<List<byte[]>> finalSamples;
         private final Comparator<List<byte[]>> comparator;
         private final int numOfPartitions;
         private final int numOrderByFields;
-        private final ListAccessor listOfSamples;
-        private final ListAccessor oneSample;
-        private final IPointable oneSamplePointable;
-        private final ArrayBackedValueStorage oneSampleStorage;
-        private final IPointable field;
-        private final ArrayBackedValueStorage storage;
 
         @SuppressWarnings("unchecked")
         private RangeMapFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, boolean[] ascending,
-                int numOfPartitions, int numOrderByFields) throws HyracksDataException {
-            localSamples = new VoidPointable();
-            localSamplesEval = args[0].createScalarEvaluator(context);
-            finalSamples = new ArrayList<>();
-            comparator = createComparator(ascending);
+                int numOfPartitions, int numOrderByFields, SourceLocation sourceLocation, IAType[] argsTypes)
+                throws HyracksDataException {
+            super(sourceLocation);
+            this.localSamplesEval = args[0].createScalarEvaluator(context);
+            this.comparator = createComparator(ascending, argsTypes);
             this.numOfPartitions = numOfPartitions;
             this.numOrderByFields = numOrderByFields;
-            listOfSamples = new ListAccessor();
-            oneSample = new ListAccessor();
-            oneSamplePointable = new VoidPointable();
-            oneSampleStorage = new ArrayBackedValueStorage();
-            field = new VoidPointable();
-            storage = new ArrayBackedValueStorage();
         }
 
         @Override
@@ -161,41 +165,29 @@
         /**
          * Receives the local samples and appends them to the final list of samples.
          * @param tuple the partition's samples
-         * @throws HyracksDataException
+         * @throws HyracksDataException IO Exception
          */
         @Override
         public void step(IFrameTupleReference tuple) throws HyracksDataException {
             // check if empty stream (system_null), i.e. partition is empty, so no samples
-            localSamplesEval.evaluate(tuple, localSamples);
-            byte tag = localSamples.getByteArray()[localSamples.getStartOffset()];
-            if (tag == ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG) {
+            localSamplesEval.evaluate(tuple, input);
+            if (input.getByteArray()[input.getStartOffset()] == ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG) {
                 return;
             }
-            // deserialize the samples received from the local partition
-            listOfSamples.reset(localSamples.getByteArray(), localSamples.getStartOffset());
-            int numberOfSamples = listOfSamples.size();
-
-            // "sample" & "addedSample" are lists to support multi-column instead of one value, i.e. <3,"dept">
-            List<byte[]> addedSample;
-            int numberOfFields;
-            // add the samples to the final samples
-            try {
-                for (int i = 0; i < numberOfSamples; i++) {
-                    oneSampleStorage.reset();
-                    listOfSamples.getOrWriteItem(i, oneSamplePointable, oneSampleStorage);
-                    oneSample.reset(oneSamplePointable.getByteArray(), oneSamplePointable.getStartOffset());
-                    numberOfFields = oneSample.size();
-                    addedSample = new ArrayList<>(numberOfFields);
-                    for (int j = 0; j < numberOfFields; j++) {
-                        storage.reset();
-                        oneSample.getOrWriteItem(j, field, storage);
-                        addedSample.add(Arrays.copyOfRange(field.getByteArray(), field.getStartOffset(),
-                                field.getStartOffset() + field.getLength()));
-                    }
-                    finalSamples.add(addedSample);
+            rangeMapPointable.set(input.getByteArray(), input.getStartOffset() + 1, input.getLength() - 1);
+            byte[] rangeMapBytes = rangeMapPointable.getByteArray();
+            int pointer = rangeMapPointable.getContentStartOffset();
+            int numSamples = IntegerPointable.getInteger(rangeMapBytes, pointer);
+            pointer += Integer.BYTES; // eat the 4 bytes of the integer (number of samples)
+            for (int i = 0; i < numSamples; i++) {
+                List<byte[]> oneSample = new ArrayList<>(numOrderByFields);
+                for (int j = 0; j < numOrderByFields; j++) {
+                    int valueLength = IntegerPointable.getInteger(rangeMapBytes, pointer);
+                    pointer += Integer.BYTES; // eat the 4 bytes of the integer and move to the value
+                    oneSample.add(Arrays.copyOfRange(rangeMapBytes, pointer, pointer + valueLength));
+                    pointer += valueLength; // eat the length of the value and move to the next pair length:value
                 }
-            } catch (IOException e) {
-                throw HyracksDataException.create(e);
+                finalSamples.add(oneSample);
             }
         }
 
@@ -203,7 +195,7 @@
          * Produces the range map out of the collected samples from each partition. The final list of samples is sorted
          * first. Then, we select the split points by dividing the samples evenly.
          * @param result contains the serialized range map.
-         * @throws HyracksDataException
+         * @throws HyracksDataException IO Exception
          */
         @Override
         public void finish(IPointable result) throws HyracksDataException {
@@ -247,8 +239,7 @@
             } catch (IOException e) {
                 throw HyracksDataException.create(e);
             }
-
-            serializeRangemap(numOrderByFields, storage.getByteArray(), endOffsets, result);
+            serializeRangeMap(numOrderByFields, storage.getByteArray(), endOffsets, result);
         }
 
         @Override
@@ -258,18 +249,17 @@
 
         /**
          * Creates the comparator that sorts all the collected samples before picking split points.
-         * @param ascending ascending or descending flag for each sort column.
+         * @param asc ascending or descending flag for each sort column.
+         * @param types types of inputs to range map function produced by the local step and holding sort fields types
          * @return the described comparator
          */
-        private Comparator<List<byte[]>> createComparator(boolean[] ascending) {
-            // create the generic comparator for each sort field
-            IBinaryComparator[] fieldsComparators = new IBinaryComparator[ascending.length];
-            for (int i = 0; i < ascending.length; i++) {
-                // TODO(ali): this is temporary
+        private static Comparator<List<byte[]>> createComparator(boolean[] asc, IAType[] types) {
+            // create the generic comparator for each sort field, sort fields types start at index 1
+            IBinaryComparator[] fieldsComparators = new IBinaryComparator[asc.length];
+            for (int i = 0, fieldIdx = 1; fieldIdx < types.length; i++, fieldIdx++) {
                 fieldsComparators[i] = BinaryComparatorFactoryProvider.INSTANCE
-                        .getBinaryComparatorFactory(null, null, ascending[i]).createBinaryComparator();
+                        .getBinaryComparatorFactory(types[fieldIdx], types[fieldIdx], asc[i]).createBinaryComparator();
             }
-
             return (splitPoint1, splitPoint2) -> {
                 try {
                     // two split points must have the same num of fields
@@ -299,16 +289,18 @@
          * @param splitValues the serialized split values stored one after the other
          * @param endOffsets the end offsets of each split value
          * @param result where the range map object is serialized
-         * @throws HyracksDataException
+         * @throws HyracksDataException IO Exception
          */
-        private void serializeRangemap(int numberFields, byte[] splitValues, int[] endOffsets, IPointable result)
+        private void serializeRangeMap(int numberFields, byte[] splitValues, int[] endOffsets, IPointable result)
                 throws HyracksDataException {
             ArrayBackedValueStorage serRangeMap = new ArrayBackedValueStorage();
             IntegerSerializerDeserializer.write(numberFields, serRangeMap.getDataOutput());
             ByteArraySerializerDeserializer.write(splitValues, serRangeMap.getDataOutput());
             IntArraySerializerDeserializer.write(endOffsets, serRangeMap.getDataOutput());
-
-            result.set(serRangeMap.getByteArray(), serRangeMap.getStartOffset(), serRangeMap.getLength());
+            binary.setValue(serRangeMap.getByteArray(), serRangeMap.getStartOffset(), serRangeMap.getLength());
+            storage.reset();
+            binarySerde.serialize(binary, storage.getDataOutput());
+            result.set(storage);
         }
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
index 8c884ca..488ee76 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
@@ -32,6 +32,7 @@
 import org.apache.asterix.runtime.aggregates.collections.LastElementAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.collections.ListifyAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.collections.LocalFirstElementAggregateDescriptor;
+import org.apache.asterix.runtime.aggregates.collections.NullWriterAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.scalar.ScalarAvgAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.scalar.ScalarAvgDistinctAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.scalar.ScalarCountAggregateDescriptor;
@@ -623,6 +624,7 @@
         fc.add(GlobalSkewnessAggregateDescriptor.FACTORY);
         fc.add(EmptyStreamAggregateDescriptor.FACTORY);
         fc.add(NonEmptyStreamAggregateDescriptor.FACTORY);
+        fc.add(NullWriterAggregateDescriptor.FACTORY);
 
         // serializable aggregates
         fc.add(SerializableCountAggregateDescriptor.FACTORY);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java
index 44e3eb7..4be71e3 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java
@@ -85,23 +85,18 @@
         public void infer(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment context,
                 CompilerProperties compilerProps) throws AlgebricksException {
             AbstractFunctionCallExpression fce = (AbstractFunctionCallExpression) expr;
-            IAType[] argsTypes = new IAType[fce.getArguments().size()];
-            int i = 0;
-            for (Mutable<ILogicalExpression> arg : fce.getArguments()) {
-                argsTypes[i] = TypeComputeUtils.getActualType((IAType) context.getType(arg.getValue()));
-                i++;
-            }
-            fd.setImmutableStates((Object[]) argsTypes);
+            fd.setImmutableStates((Object[]) getArgumentsTypes(fce, context));
         }
     };
 
     public static final IFunctionTypeInferer SET_SORTING_PARAMETERS = new IFunctionTypeInferer() {
         @Override
-        public void infer(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment context,
+        public void infer(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment ctx,
                 CompilerProperties compilerProps) throws AlgebricksException {
-            AbstractFunctionCallExpression funCallExpr = (AbstractFunctionCallExpression) expr;
-            Object[] sortingParameters = funCallExpr.getOpaqueParameters();
-            fd.setImmutableStates(sortingParameters[0], sortingParameters[1]);
+            // sets the type of the input range map produced by the local sampling expression and types of sort fields
+            AbstractFunctionCallExpression funExp = (AbstractFunctionCallExpression) expr;
+            Object[] sortingParameters = funExp.getOpaqueParameters();
+            fd.setImmutableStates(sortingParameters[0], sortingParameters[1], getArgumentsTypes(funExp, ctx));
         }
     };
 
@@ -331,4 +326,15 @@
             fd.setImmutableStates((Object[]) argRecordTypes);
         }
     }
+
+    private static IAType[] getArgumentsTypes(AbstractFunctionCallExpression funExp, IVariableTypeEnvironment ctx)
+            throws AlgebricksException {
+        IAType[] argsTypes = new IAType[funExp.getArguments().size()];
+        int i = 0;
+        for (Mutable<ILogicalExpression> arg : funExp.getArguments()) {
+            argsTypes[i] = TypeComputeUtils.getActualType((IAType) ctx.getType(arg.getValue()));
+            i++;
+        }
+        return argsTypes;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/VariableReferenceExpression.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/VariableReferenceExpression.java
index 1c7da34..aae4864 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/VariableReferenceExpression.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/VariableReferenceExpression.java
@@ -27,6 +27,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public class VariableReferenceExpression extends AbstractLogicalExpression {
     private int tupleRef;
@@ -44,6 +45,11 @@
         this(0, variable);
     }
 
+    public VariableReferenceExpression(LogicalVariable variable, SourceLocation sourceLoc) {
+        this(variable);
+        this.sourceLoc = sourceLoc;
+    }
+
     public LogicalVariable getVariableReference() {
         return variable;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index bff7431..f66a6b8 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -114,10 +114,13 @@
     private PhysicalOptimizationConfig physicalOptimizationConfig;
     private final FunctionIdentifier rangeMapFunction;
     private final FunctionIdentifier localSamplingFun;
+    private final FunctionIdentifier typePropagatingFun;
 
-    public EnforceStructuralPropertiesRule(FunctionIdentifier rangeMapFunction, FunctionIdentifier localSamplingFun) {
+    public EnforceStructuralPropertiesRule(FunctionIdentifier rangeMapFunction, FunctionIdentifier localSamplingFun,
+            FunctionIdentifier typePropagatingFun) {
         this.rangeMapFunction = rangeMapFunction;
         this.localSamplingFun = localSamplingFun;
+        this.typePropagatingFun = typePropagatingFun;
     }
 
     @Override
@@ -711,9 +714,9 @@
     private IPhysicalOperator createDynamicRangePartitionExchangePOperator(AbstractLogicalOperator parentOp,
             IOptimizationContext ctx, INodeDomain targetDomain, List<OrderColumn> partitioningColumns, int childIndex)
             throws AlgebricksException {
-        SourceLocation sourceLoc = parentOp.getSourceLocation();
+        SourceLocation srcLoc = parentOp.getSourceLocation();
         // #1. create the replicate operator and add it above the source op feeding parent operator
-        ReplicateOperator replicateOp = createReplicateOperator(parentOp.getInputs().get(childIndex), ctx, sourceLoc);
+        ReplicateOperator replicateOp = createReplicateOperator(parentOp.getInputs().get(childIndex), ctx, srcLoc);
 
         // these two exchange ops are needed so that the parents of replicate stay the same during later optimizations.
         // This is because replicate operator has references to its parents. If any later optimizations add new parents,
@@ -724,40 +727,34 @@
         MutableObject<ILogicalOperator> exchToLocalAggRef = new MutableObject<>(exchToLocalAgg);
         MutableObject<ILogicalOperator> exchToForwardRef = new MutableObject<>(exchToForward);
 
-        // add the exchange--to-forward at output 0, the exchange-to-local-aggregate at output 1
+        // add the exchange-to-forward at output 0, the exchange-to-local-aggregate at output 1
         replicateOp.getOutputs().add(exchToForwardRef);
         replicateOp.getOutputs().add(exchToLocalAggRef);
         // materialize the data to be able to re-read the data again after sampling is done
         replicateOp.getOutputMaterializationFlags()[0] = true;
 
         // #2. create the aggregate operators and their sampling functions
-        // $$samplingResultVar = local_samplingFun($$partitioning_column)
-        // $$rangeMapResultVar = global_rangeMapFun($$samplingResultVar)
-        List<LogicalVariable> samplingResultVar = new ArrayList<>(1);
+        List<LogicalVariable> localVars = new ArrayList<>();
         List<LogicalVariable> rangeMapResultVar = new ArrayList<>(1);
-        List<Mutable<ILogicalExpression>> samplingFun = new ArrayList<>(1);
+        List<Mutable<ILogicalExpression>> localFuns = new ArrayList<>();
         List<Mutable<ILogicalExpression>> rangeMapFun = new ArrayList<>(1);
-
-        createAggregateFunction(ctx, samplingResultVar, samplingFun, rangeMapResultVar, rangeMapFun,
-                targetDomain.cardinality(), partitioningColumns, sourceLoc);
-
-        AggregateOperator localAggOp =
-                createAggregate(samplingResultVar, false, samplingFun, exchToLocalAggRef, ctx, sourceLoc);
+        createAggregateFunction(ctx, localVars, localFuns, rangeMapResultVar, rangeMapFun, targetDomain.cardinality(),
+                partitioningColumns, srcLoc);
+        AggregateOperator localAggOp = createAggregate(localVars, false, localFuns, exchToLocalAggRef, ctx, srcLoc);
         MutableObject<ILogicalOperator> localAgg = new MutableObject<>(localAggOp);
-        AggregateOperator globalAggOp = createAggregate(rangeMapResultVar, true, rangeMapFun, localAgg, ctx, sourceLoc);
+        AggregateOperator globalAggOp = createAggregate(rangeMapResultVar, true, rangeMapFun, localAgg, ctx, srcLoc);
         MutableObject<ILogicalOperator> globalAgg = new MutableObject<>(globalAggOp);
 
         // #3. create the forward operator
         String rangeMapKey = UUID.randomUUID().toString();
         LogicalVariable rangeMapVar = rangeMapResultVar.get(0);
-        ForwardOperator forward = createForward(rangeMapKey, rangeMapVar, exchToForwardRef, globalAgg, ctx, sourceLoc);
+        ForwardOperator forward = createForward(rangeMapKey, rangeMapVar, exchToForwardRef, globalAgg, ctx, srcLoc);
         MutableObject<ILogicalOperator> forwardRef = new MutableObject<>(forward);
 
         // replace the old input of parentOp requiring the range partitioning with the new forward op
         parentOp.getInputs().set(childIndex, forwardRef);
         parentOp.recomputeSchema();
         ctx.computeAndSetTypeEnvironmentForOperator(parentOp);
-
         return new RangePartitionExchangePOperator(partitioningColumns, rangeMapKey, targetDomain);
     }
 
@@ -780,56 +777,61 @@
      * will be used when creating the corresponding aggregate operators.
      * @param context used to get new variables which will be assigned the samples & the range map
      * @param localResultVariables the variable to which the stats (e.g. samples) info is assigned
-     * @param localAggFunctions the local sampling expression is added to this list
-     * @param globalResultVariables the variable to which the range map is assigned
-     * @param globalAggFunctions the expression generating a range map is added to this list
+     * @param localAggFunctions the local sampling expression and columns expressions are added to this list
+     * @param globalResultVariable the variable to which the range map is assigned
+     * @param globalAggFunction the expression generating a range map is added to this list
      * @param numPartitions passed to the expression generating a range map to know how many split points are needed
-     * @param partFields the fields based on which the partitioner partitions the tuples, also sampled fields
+     * @param partitionFields the fields based on which the partitioner partitions the tuples, also sampled fields
      * @param sourceLocation source location
      */
     private void createAggregateFunction(IOptimizationContext context, List<LogicalVariable> localResultVariables,
-            List<Mutable<ILogicalExpression>> localAggFunctions, List<LogicalVariable> globalResultVariables,
-            List<Mutable<ILogicalExpression>> globalAggFunctions, int numPartitions, List<OrderColumn> partFields,
+            List<Mutable<ILogicalExpression>> localAggFunctions, List<LogicalVariable> globalResultVariable,
+            List<Mutable<ILogicalExpression>> globalAggFunction, int numPartitions, List<OrderColumn> partitionFields,
             SourceLocation sourceLocation) {
-        // prepare the arguments of the local sampling function: sampled fields
-        List<Mutable<ILogicalExpression>> sampledFields = new ArrayList<>(partFields.size());
-        partFields.forEach(f -> {
-            AbstractLogicalExpression sampledField = new VariableReferenceExpression(f.getColumn());
-            sampledField.setSourceLocation(sourceLocation);
-            sampledFields.add(new MutableObject<>(sampledField));
-        });
-
-        // local info
+        // prepare the arguments to the local sampling function: sampled fields (e.g. $col1, $col2)
+        // local info: local agg [$1, $2, $3] = [local-sampling-fun($col1, $col2), type_expr($col1), type_expr($col2)]
+        // global info: global agg [$RM] = [global-range-map($1, $2, $3)]
         IFunctionInfo samplingFun = context.getMetadataProvider().lookupFunction(localSamplingFun);
-        AbstractFunctionCallExpression samplingExp =
-                new AggregateFunctionCallExpression(samplingFun, false, sampledFields);
-        samplingExp.setSourceLocation(sourceLocation);
-        LogicalVariable samplingResultVar = context.newVar();
-        localResultVariables.add(samplingResultVar);
-        localAggFunctions.add(new MutableObject<>(samplingExp));
-        Object[] samplingParam = { context.getPhysicalOptimizationConfig().getSortSamples() };
-        samplingExp.setOpaqueParameters(samplingParam);
-
-        // prepare the argument of the global range map generator function: the result of the local function
-        List<Mutable<ILogicalExpression>> arg = new ArrayList<>(1);
-        AbstractLogicalExpression samplingResultVarExp = new VariableReferenceExpression(samplingResultVar);
-        samplingResultVarExp.setSourceLocation(sourceLocation);
-        arg.add(new MutableObject<>(samplingResultVarExp));
-
-        // global info
-        IFunctionInfo rangeMapFun = context.getMetadataProvider().lookupFunction(rangeMapFunction);
-        AbstractFunctionCallExpression rangeMapExp = new AggregateFunctionCallExpression(rangeMapFun, true, arg);
-        rangeMapExp.setSourceLocation(sourceLocation);
-        globalResultVariables.add(context.newVar());
-        globalAggFunctions.add(new MutableObject<>(rangeMapExp));
-
+        List<Mutable<ILogicalExpression>> fields = new ArrayList<>(partitionFields.size());
+        List<Mutable<ILogicalExpression>> argsToRM = new ArrayList<>(1 + partitionFields.size());
+        AbstractFunctionCallExpression expr = new AggregateFunctionCallExpression(samplingFun, false, fields);
+        expr.setSourceLocation(sourceLocation);
+        expr.setOpaqueParameters(new Object[] { context.getPhysicalOptimizationConfig().getSortSamples() });
+        // add the sampling function to the list of the local functions
+        LogicalVariable localOutVariable = context.newVar();
+        localResultVariables.add(localOutVariable);
+        localAggFunctions.add(new MutableObject<>(expr));
+        // add the local result variable as input to the global range map function
+        AbstractLogicalExpression varExprRef = new VariableReferenceExpression(localOutVariable, sourceLocation);
+        argsToRM.add(new MutableObject<>(varExprRef));
         int i = 0;
-        boolean[] ascendingFlags = new boolean[partFields.size()];
-        for (OrderColumn column : partFields) {
-            ascendingFlags[i] = column.getOrder() == OrderOperator.IOrder.OrderKind.ASC;
+        boolean[] ascendingFlags = new boolean[partitionFields.size()];
+        IFunctionInfo typeFun = context.getMetadataProvider().lookupFunction(typePropagatingFun);
+        for (OrderColumn field : partitionFields) {
+            // add the field to the "fields" which is the input to the local sampling function
+            varExprRef = new VariableReferenceExpression(field.getColumn(), sourceLocation);
+            fields.add(new MutableObject<>(varExprRef));
+            // add the same field as input to the corresponding local function propagating the type of the field
+            expr = new AggregateFunctionCallExpression(typeFun, false,
+                    Collections.singletonList(new MutableObject<>(varExprRef)));
+            // add the type propagating function to the list of the local functions
+            localOutVariable = context.newVar();
+            localResultVariables.add(localOutVariable);
+            localAggFunctions.add(new MutableObject<>(expr));
+            // add the local result variable as input to the global range map function
+            varExprRef = new VariableReferenceExpression(localOutVariable, sourceLocation);
+            argsToRM.add(new MutableObject<>(varExprRef));
+            ascendingFlags[i] = field.getOrder() == OrderOperator.IOrder.OrderKind.ASC;
             i++;
         }
+        IFunctionInfo rangeMapFun = context.getMetadataProvider().lookupFunction(rangeMapFunction);
+        AggregateFunctionCallExpression rangeMapExp = new AggregateFunctionCallExpression(rangeMapFun, true, argsToRM);
+        rangeMapExp.setStepOneAggregate(samplingFun);
+        rangeMapExp.setStepTwoAggregate(rangeMapFun);
+        rangeMapExp.setSourceLocation(sourceLocation);
         rangeMapExp.setOpaqueParameters(new Object[] { numPartitions, ascendingFlags });
+        globalResultVariable.add(context.newVar());
+        globalAggFunction.add(new MutableObject<>(rangeMapExp));
     }
 
     /**
@@ -874,11 +876,10 @@
 
     private static ForwardOperator createForward(String rangeMapKey, LogicalVariable rangeMapVariable,
             MutableObject<ILogicalOperator> exchangeOpFromReplicate, MutableObject<ILogicalOperator> globalAggInput,
-            IOptimizationContext context, SourceLocation sourceLocation) throws AlgebricksException {
-        AbstractLogicalExpression rangeMapExpression = new VariableReferenceExpression(rangeMapVariable);
-        rangeMapExpression.setSourceLocation(sourceLocation);
+            IOptimizationContext context, SourceLocation sourceLoc) throws AlgebricksException {
+        AbstractLogicalExpression rangeMapExpression = new VariableReferenceExpression(rangeMapVariable, sourceLoc);
         ForwardOperator forwardOperator = new ForwardOperator(rangeMapKey, new MutableObject<>(rangeMapExpression));
-        forwardOperator.setSourceLocation(sourceLocation);
+        forwardOperator.setSourceLocation(sourceLoc);
         forwardOperator.setPhysicalOperator(new ForwardPOperator());
         forwardOperator.getInputs().add(exchangeOpFromReplicate);
         forwardOperator.getInputs().add(globalAggInput);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
index 24c5cae..49eea0a 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
@@ -33,6 +33,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.data.std.primitive.ByteArrayPointable;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.marshalling.ByteArraySerializerDeserializer;
@@ -46,6 +47,7 @@
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
+// TODO(ali): forward operator should probably be moved to asterix layer
 public class ForwardOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final long serialVersionUID = 1L;
     private static final int FORWARD_DATA_ACTIVITY_ID = 0;
@@ -173,8 +175,10 @@
             byte[] rangeMap = frameTupleReference.getFieldData(0);
             int offset = frameTupleReference.getFieldStart(0);
             int length = frameTupleReference.getFieldLength(0);
-
-            ByteArrayInputStream rangeMapIn = new ByteArrayInputStream(rangeMap, offset, length);
+            ByteArrayPointable pointable = new ByteArrayPointable();
+            pointable.set(rangeMap, offset + 1, length - 1);
+            ByteArrayInputStream rangeMapIn = new ByteArrayInputStream(pointable.getByteArray(),
+                    pointable.getContentStartOffset(), pointable.getContentLength());
             DataInputStream dataInputStream = new DataInputStream(rangeMapIn);
             numFields = IntegerSerializerDeserializer.read(dataInputStream);
             splitValues = ByteArraySerializerDeserializer.read(dataInputStream);
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IntSerDeUtils.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IntSerDeUtils.java
index 05e694c..821707f 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IntSerDeUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IntSerDeUtils.java
@@ -37,7 +37,7 @@
      *            value to write to {@code bytes[offset]}
      */
     public static void putInt(byte[] bytes, int offset, int value) {
-
+        // TODO: there is another implementation in IntegerPointable
         bytes[offset++] = (byte) (value >> 24);
         bytes[offset++] = (byte) (value >> 16);
         bytes[offset++] = (byte) (value >> 8);