[ASTERIXDB-3369][FUN] Implement SQL median()

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
Implement SQL median() function. Each local partition will generate a sorted
run file. Those run files can be used to determine the median.

- add ability to generate unique ids at the joblet level in IHyracksJobletContext.
- median() will use the unique ids for file ids when generating local run files.
- references to the generated local run files are kept in JobFileState at Joblet level.
- use NetworkManager to receive file reading requests.
- make PartitionManager keep track of file requests.
- currently, median() will use the same sort memory configured for the cluster.
- pass the sort memory to the median() via the type inferer.
- account for the memory required by median() when calculating the
  required capacity of a query.
- median() used as a WINDOW function does not support ORDER BY and FRAME.
- refactor MaterializedPartition to PartitionFileReader.

Change-Id: Id8d03f42d54b5ed4cf316c31f0b4cce9dd7c1dc0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18210
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
index 1d9c57b..037aca3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
@@ -1219,10 +1219,14 @@
         boolean isWin = BuiltinFunctions.isWindowFunction(fi);
         boolean isWinAgg = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
                 BuiltinFunctions.WindowFunctionProperty.HAS_LIST_ARG);
-        boolean prohibitOrderClause = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
-                BuiltinFunctions.WindowFunctionProperty.NO_ORDER_CLAUSE);
-        boolean prohibitFrameClause = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
-                BuiltinFunctions.WindowFunctionProperty.NO_FRAME_CLAUSE);
+        boolean prohibitOrderClause = (isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
+                BuiltinFunctions.WindowFunctionProperty.NO_ORDER_CLAUSE))
+                || (!isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
+                        BuiltinFunctions.AggregateFunctionProperty.NO_ORDER_CLAUSE));
+        boolean prohibitFrameClause = (isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
+                BuiltinFunctions.WindowFunctionProperty.NO_FRAME_CLAUSE))
+                || (!isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
+                        BuiltinFunctions.AggregateFunctionProperty.NO_FRAME_CLAUSE));
         boolean allowRespectIgnoreNulls = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
                 BuiltinFunctions.WindowFunctionProperty.ALLOW_RESPECT_IGNORE_NULLS);
         boolean allowFromFirstLast = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 99678da..cb86e35 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -332,7 +332,7 @@
                     final AlgebricksAbsolutePartitionConstraint jobLocations =
                             getJobLocations(spec, nodeJobTracker, computationLocations);
                     final IClusterCapacity jobRequiredCapacity =
-                            ResourceUtils.getRequiredCapacity(plan, jobLocations, physOptConf);
+                            ResourceUtils.getRequiredCapacity(plan, jobLocations, physOptConf, compilerProperties);
                     spec.setRequiredClusterCapacity(jobRequiredCapacity);
                 }
             }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
index a16ac84..9e77922 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
@@ -18,25 +18,42 @@
  */
 package org.apache.asterix.app.resource;
 
+import org.apache.asterix.common.config.CompilerProperties;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 public class OperatorResourcesComputer {
 
+    private static final Logger LOGGER = LogManager.getLogger();
     public static final int MIN_OPERATOR_CORES = 1;
     private static final long MAX_BUFFER_PER_CONNECTION = 1L;
 
     private final int numComputationPartitions;
     private final long frameSize;
+    private final ExpressionMemoryComputer exprMemoryComputer;
+    private final CompilerProperties compilerProperties;
 
-    public OperatorResourcesComputer(int numComputationPartitions, long frameSize) {
+    public OperatorResourcesComputer(int numComputationPartitions, long frameSize,
+            CompilerProperties compilerProperties) {
         this.numComputationPartitions = numComputationPartitions;
         this.frameSize = frameSize;
+        this.exprMemoryComputer = new ExpressionMemoryComputer();
+        this.compilerProperties = compilerProperties;
     }
 
     public int getOperatorRequiredCores(ILogicalOperator operator) {
@@ -52,10 +69,22 @@
             return getExchangeRequiredMemory((ExchangeOperator) operator);
         } else {
             IPhysicalOperator physOp = ((AbstractLogicalOperator) operator).getPhysicalOperator();
-            return getOperatorRequiredMemory(operator.getExecutionMode(), physOp.getLocalMemoryRequirements());
+            return getOperatorRequiredMemory(operator.getExecutionMode(), physOp.getLocalMemoryRequirements())
+                    + getOperatorExpressionsRequiredMemory(operator);
         }
     }
 
+    private long getOperatorExpressionsRequiredMemory(ILogicalOperator operator) {
+        exprMemoryComputer.reset(operator);
+        try {
+            operator.acceptExpressionTransform(exprMemoryComputer);
+        } catch (Throwable e) {
+            // ignore
+            LOGGER.warn("encountered error while computing operator expressions required memory", e);
+        }
+        return exprMemoryComputer.requiredMemory;
+    }
+
     private long getOperatorRequiredMemory(AbstractLogicalOperator.ExecutionMode opExecMode, long memorySize) {
         if (opExecMode == AbstractLogicalOperator.ExecutionMode.PARTITIONED
                 || opExecMode == AbstractLogicalOperator.ExecutionMode.LOCAL) {
@@ -78,4 +107,42 @@
         }
         return 2L * MAX_BUFFER_PER_CONNECTION * numComputationPartitions * numComputationPartitions * frameSize;
     }
+
+    class ExpressionMemoryComputer implements ILogicalExpressionReferenceTransform {
+
+        private long requiredMemory;
+        private ILogicalOperator operator;
+
+        public ExpressionMemoryComputer() {
+        }
+
+        @Override
+        public boolean transform(Mutable<ILogicalExpression> expression) throws AlgebricksException {
+            ILogicalExpression expr = expression.getValue();
+            if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+                AbstractFunctionCallExpression funExpr = (AbstractFunctionCallExpression) expr;
+                if (funExpr.getKind() == AbstractFunctionCallExpression.FunctionKind.AGGREGATE) {
+                    if (isMedian(funExpr.getFunctionIdentifier())) {
+                        requiredMemory +=
+                                (compilerProperties.getSortMemorySize() * numCompute(operator.getExecutionMode()));
+                    }
+                }
+            }
+            return false;
+        }
+
+        private int numCompute(AbstractLogicalOperator.ExecutionMode executionMode) {
+            return (executionMode == AbstractLogicalOperator.ExecutionMode.PARTITIONED
+                    || executionMode == AbstractLogicalOperator.ExecutionMode.LOCAL) ? numComputationPartitions : 1;
+        }
+
+        private boolean isMedian(FunctionIdentifier funId) {
+            return BuiltinFunctions.LOCAL_SQL_MEDIAN.equals(funId) || BuiltinFunctions.SQL_MEDIAN.equals(funId);
+        }
+
+        public void reset(ILogicalOperator op) {
+            requiredMemory = 0;
+            operator = op;
+        }
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
index ba11956..353bebf 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
@@ -24,6 +24,7 @@
 import org.apache.asterix.app.resource.OperatorResourcesComputer;
 import org.apache.asterix.app.resource.PlanStage;
 import org.apache.asterix.app.resource.PlanStagesGenerator;
+import org.apache.asterix.common.config.CompilerProperties;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -38,25 +39,28 @@
     }
 
     /**
-     * Calculates the required cluster capacity from a given query plan, the computation locations,
-     * the operator memory budgets, and frame size.
+     * Calculates the required cluster capacity from a given query plan, the computation locations, the operator memory
+     * budgets, and frame size.
      *
      * @param plan,
-     *            a given query plan.
+     *         a given query plan.
      * @param computationLocations,
-     *            the partitions for computation.
+     *         the partitions for computation.
      * @param physicalOptimizationConfig,
-     *            a PhysicalOptimizationConfig.
+     *         a PhysicalOptimizationConfig.
+     * @param compilerProperties
      * @return the required cluster capacity for executing the query.
      * @throws AlgebricksException
-     *             if the query plan is malformed.
+     *         if the query plan is malformed.
      */
     public static IClusterCapacity getRequiredCapacity(ILogicalPlan plan,
             AlgebricksAbsolutePartitionConstraint computationLocations,
-            PhysicalOptimizationConfig physicalOptimizationConfig) throws AlgebricksException {
+            PhysicalOptimizationConfig physicalOptimizationConfig, CompilerProperties compilerProperties)
+            throws AlgebricksException {
         final int frameSize = physicalOptimizationConfig.getFrameSize();
         final List<PlanStage> planStages = getStages(plan);
-        return getStageBasedRequiredCapacity(planStages, computationLocations.getLocations().length, frameSize);
+        return getStageBasedRequiredCapacity(planStages, computationLocations.getLocations().length, frameSize,
+                compilerProperties);
     }
 
     public static List<PlanStage> getStages(ILogicalPlan plan) throws AlgebricksException {
@@ -68,8 +72,9 @@
     }
 
     public static IClusterCapacity getStageBasedRequiredCapacity(List<PlanStage> stages, int computationLocations,
-            int frameSize) {
-        final OperatorResourcesComputer computer = new OperatorResourcesComputer(computationLocations, frameSize);
+            int frameSize, CompilerProperties compilerProperties) {
+        final OperatorResourcesComputer computer =
+                new OperatorResourcesComputer(computationLocations, frameSize, compilerProperties);
         final IClusterCapacity clusterCapacity = new ClusterCapacity();
         final long maxRequiredMemory = stages.stream().mapToLong(stage -> stage.getRequiredMemory(computer)).max()
                 .orElseThrow(IllegalStateException::new);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
index b0de85e..77d0e60 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
@@ -340,7 +340,7 @@
             }
         }
         final IClusterCapacity clusterCapacity =
-                ResourceUtils.getStageBasedRequiredCapacity(stages, PARALLELISM, FRAME_SIZE);
+                ResourceUtils.getStageBasedRequiredCapacity(stages, PARALLELISM, FRAME_SIZE, null);
         Assert.assertEquals(clusterCapacity.getAggregatedMemoryByteSize(), expectedMemory);
     }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java
index 218ef97..f8dbaad 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java
@@ -60,7 +60,7 @@
     public void failedJobPartitionRequestTest() throws Exception {
         final NodeControllerService nc1 = integrationUtil.ncs[0];
         final NodeControllerService nc2 = integrationUtil.ncs[1];
-        final JobId failedJob = new JobId(-1);
+        final JobId failedJob = new JobId(10);
         nc2.getPartitionManager().jobCompleted(failedJob, JobStatus.FAILURE);
         final NetworkAddress localNetworkAddress = nc2.getNetworkManager().getPublicNetworkAddress();
         final InetSocketAddress nc2Address =
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
index 325c238..5ed4621 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
@@ -286,4 +286,9 @@
             return AlgebricksConfig.QUERY_PLAN_SHAPE_DEFAULT;
         return queryPlanShapeMode;
     }
+
+    public int getSortMemoryFrames() {
+        int numFrames = (int) getSortMemorySize() / getFrameSize();
+        return Math.max(numFrames, OptimizationConfUtil.MIN_FRAME_LIMIT_FOR_SORT);
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
index ae70475..9307469 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
@@ -34,7 +34,7 @@
 
 public class OptimizationConfUtil {
 
-    private static final int MIN_FRAME_LIMIT_FOR_SORT = AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT;
+    public static final int MIN_FRAME_LIMIT_FOR_SORT = AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT;
     private static final int MIN_FRAME_LIMIT_FOR_GROUP_BY = AbstractGroupByPOperator.MIN_FRAME_LIMIT_FOR_GROUP_BY;
     private static final int MIN_FRAME_LIMIT_FOR_JOIN = AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN;
     private static final int MIN_FRAME_LIMIT_FOR_WINDOW = WindowPOperator.MIN_FRAME_LIMIT_FOR_WINDOW;
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index 4db68c3..bb05ba6 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -95,6 +95,7 @@
 import org.apache.asterix.om.typecomputer.impl.InjectFailureTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.Int64ArrayToStringTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.LocalAvgTypeComputer;
+import org.apache.asterix.om.typecomputer.impl.LocalMedianTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.LocalSingleVarStatisticsTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.MinMaxAggTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.MissingIfTypeComputer;
@@ -551,6 +552,8 @@
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-intermediate-avg", 1);
     public static final FunctionIdentifier LOCAL_AVG =
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-avg", 1);
+    public static final FunctionIdentifier MEDIAN =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-median", 1);
     public static final FunctionIdentifier FIRST_ELEMENT =
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-first-element", 1);
     public static final FunctionIdentifier LOCAL_FIRST_ELEMENT =
@@ -628,6 +631,8 @@
     public static final FunctionIdentifier SCALAR_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "sum", 1);
     public static final FunctionIdentifier SCALAR_MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "max", 1);
     public static final FunctionIdentifier SCALAR_MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "min", 1);
+    public static final FunctionIdentifier SCALAR_MEDIAN =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "median", 1);
     public static final FunctionIdentifier SCALAR_FIRST_ELEMENT =
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "first-element", 1);
     public static final FunctionIdentifier SCALAR_LOCAL_FIRST_ELEMENT =
@@ -802,6 +807,14 @@
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-global-sql-avg", 1);
     public static final FunctionIdentifier LOCAL_SQL_AVG =
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-sql-avg", 1);
+    public static final FunctionIdentifier SQL_MEDIAN =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-sql-median", 1);
+    public static final FunctionIdentifier LOCAL_SQL_MEDIAN =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-sql-median", 1);
+    public static final FunctionIdentifier INTERMEDIATE_SQL_MEDIAN =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-intermediate-sql-median", 1);
+    public static final FunctionIdentifier GLOBAL_SQL_MEDIAN =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-global-sql-median", 1);
     public static final FunctionIdentifier SQL_STDDEV_SAMP =
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-sql-stddev_samp", 1);
     public static final FunctionIdentifier INTERMEDIATE_SQL_STDDEV_SAMP =
@@ -869,6 +882,8 @@
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "sql-max", 1);
     public static final FunctionIdentifier SCALAR_SQL_MIN =
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "sql-min", 1);
+    public static final FunctionIdentifier SCALAR_SQL_MEDIAN =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "sql-median", 1);
     public static final FunctionIdentifier SCALAR_SQL_STDDEV_SAMP =
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "sql-stddev_samp", 1);
     public static final FunctionIdentifier SCALAR_SQL_STDDEV_POP =
@@ -2143,6 +2158,13 @@
         addPrivateFunction(GLOBAL_SQL_UNION_MBR, ARectangleTypeComputer.INSTANCE, true);
         addFunction(SCALAR_SQL_UNION_MBR, ARectangleTypeComputer.INSTANCE, true);
 
+        addFunction(SQL_MEDIAN, NullableDoubleTypeComputer.INSTANCE, true);
+        addFunction(SCALAR_MEDIAN, NullableDoubleTypeComputer.INSTANCE, true);
+        addFunction(SCALAR_SQL_MEDIAN, NullableDoubleTypeComputer.INSTANCE, true);
+        addPrivateFunction(LOCAL_SQL_MEDIAN, LocalMedianTypeComputer.INSTANCE, true);
+        addPrivateFunction(INTERMEDIATE_SQL_MEDIAN, LocalMedianTypeComputer.INSTANCE, true);
+        addPrivateFunction(GLOBAL_SQL_MEDIAN, NullableDoubleTypeComputer.INSTANCE, true);
+
         addPrivateFunction(SERIAL_AVG, NullableDoubleTypeComputer.INSTANCE, true);
         addPrivateFunction(SERIAL_COUNT, AInt64TypeComputer.INSTANCE, true);
         addPrivateFunction(SERIAL_GLOBAL_AVG, NullableDoubleTypeComputer.INSTANCE, true);
@@ -3169,6 +3191,25 @@
         addDistinctAgg(SQL_SUM_DISTINCT, SQL_SUM);
         addScalarAgg(SQL_SUM_DISTINCT, SCALAR_SQL_SUM_DISTINCT);
 
+        // SQL MEDIAN
+        addAgg(SQL_MEDIAN);
+        addAgg(LOCAL_SQL_MEDIAN);
+        addAgg(GLOBAL_SQL_MEDIAN);
+
+        addLocalAgg(SQL_MEDIAN, LOCAL_SQL_MEDIAN);
+
+        addIntermediateAgg(SQL_MEDIAN, INTERMEDIATE_SQL_MEDIAN);
+        addIntermediateAgg(LOCAL_SQL_MEDIAN, INTERMEDIATE_SQL_MEDIAN);
+        addIntermediateAgg(GLOBAL_SQL_MEDIAN, INTERMEDIATE_SQL_MEDIAN);
+
+        addGlobalAgg(SQL_MEDIAN, GLOBAL_SQL_MEDIAN);
+
+        addScalarAgg(MEDIAN, SCALAR_MEDIAN);
+        addScalarAgg(SQL_MEDIAN, SCALAR_SQL_MEDIAN);
+
+        registerAggFunctionProperties(SCALAR_SQL_MEDIAN, AggregateFunctionProperty.NO_FRAME_CLAUSE,
+                AggregateFunctionProperty.NO_ORDER_CLAUSE);
+
         // SPATIAL AGGREGATES
 
         addAgg(ST_UNION_AGG);
@@ -3203,6 +3244,13 @@
     interface BuiltinFunctionProperty {
     }
 
+    public enum AggregateFunctionProperty implements BuiltinFunctionProperty {
+        /** Whether the order clause is prohibited */
+        NO_ORDER_CLAUSE,
+        /** Whether the frame clause is prohibited */
+        NO_FRAME_CLAUSE
+    }
+
     public enum WindowFunctionProperty implements BuiltinFunctionProperty {
         /** Whether the order clause is prohibited */
         NO_ORDER_CLAUSE,
@@ -3365,6 +3413,11 @@
         registeredFunctions.put(functionInfo.getFunctionIdentifier(), functionInfo);
     }
 
+    private static <T extends Enum<T> & BuiltinFunctionProperty> void registerAggFunctionProperties(
+            FunctionIdentifier fid, AggregateFunctionProperty... properties) {
+        registerFunctionProperties(fid, AggregateFunctionProperty.class, properties);
+    }
+
     private static <T extends Enum<T> & BuiltinFunctionProperty> void registerFunctionProperties(FunctionIdentifier fid,
             Class<T> propertyClass, T[] properties) {
         if (properties == null) {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/LocalMedianTypeComputer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/LocalMedianTypeComputer.java
new file mode 100644
index 0000000..3411ef4
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/LocalMedianTypeComputer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.typecomputer.impl;
+
+import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+
+public class LocalMedianTypeComputer implements IResultTypeComputer {
+
+    public static final LocalMedianTypeComputer INSTANCE = new LocalMedianTypeComputer();
+
+    public static final ARecordType REC_TYPE = new ARecordType(null,
+            new String[] { "count", "handle", "address", "port" },
+            new IAType[] { BuiltinType.AINT64, BuiltinType.AINT64, BuiltinType.ASTRING, BuiltinType.AINT32 }, false);
+
+    @Override
+    public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
+            IMetadataProvider<?, ?> metadataProvider) throws AlgebricksException {
+        return REC_TYPE;
+    }
+}
diff --git a/asterixdb/asterix-om/src/test/java/org/apache/asterix/test/om/typecomputer/TypeComputerTest.java b/asterixdb/asterix-om/src/test/java/org/apache/asterix/test/om/typecomputer/TypeComputerTest.java
index a857981..07f2021 100644
--- a/asterixdb/asterix-om/src/test/java/org/apache/asterix/test/om/typecomputer/TypeComputerTest.java
+++ b/asterixdb/asterix-om/src/test/java/org/apache/asterix/test/om/typecomputer/TypeComputerTest.java
@@ -36,6 +36,7 @@
 import org.apache.asterix.om.typecomputer.impl.ClosedRecordConstructorResultType;
 import org.apache.asterix.om.typecomputer.impl.InjectFailureTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.LocalAvgTypeComputer;
+import org.apache.asterix.om.typecomputer.impl.LocalMedianTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.LocalSingleVarStatisticsTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.NullableDoubleTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.OpenRecordConstructorResultType;
@@ -190,6 +191,7 @@
         differentBehaviorFunctions.add(RecordRemoveFieldsTypeComputer.class.getSimpleName());
         differentBehaviorFunctions.add(ClosedRecordConstructorResultType.class.getSimpleName());
         differentBehaviorFunctions.add(LocalAvgTypeComputer.class.getSimpleName());
+        differentBehaviorFunctions.add(LocalMedianTypeComputer.class.getSimpleName());
         differentBehaviorFunctions.add(BooleanOnlyTypeComputer.class.getSimpleName());
         //        differentBehaviorFunctions.add("AMissingTypeComputer"); // TODO What type computer is this?
         differentBehaviorFunctions.add(NullableDoubleTypeComputer.class.getSimpleName());
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/scalar/ScalarSqlMedianAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/scalar/ScalarSqlMedianAggregateDescriptor.java
new file mode 100644
index 0000000..5c83a5e
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/scalar/ScalarSqlMedianAggregateDescriptor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.scalar;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.om.functions.IFunctionTypeInferer;
+import org.apache.asterix.runtime.aggregates.std.SqlMedianAggregateDescriptor;
+import org.apache.asterix.runtime.functions.FunctionTypeInferers;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarSqlMedianAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+
+        @Override
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new ScalarSqlMedianAggregateDescriptor();
+        }
+
+        @Override
+        public IFunctionTypeInferer createFunctionTypeInferer() {
+            return FunctionTypeInferers.MEDIAN_MEMORY;
+        }
+    };
+
+    private ScalarSqlMedianAggregateDescriptor() {
+        super(SqlMedianAggregateDescriptor.FACTORY);
+    }
+
+    @Override
+    public void setImmutableStates(Object... states) {
+        super.setImmutableStates(states);
+        aggFuncDesc.setImmutableStates(states);
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.SCALAR_SQL_MEDIAN;
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractLocalMedianAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractLocalMedianAggregateFunction.java
new file mode 100644
index 0000000..8ff7ed5
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractLocalMedianAggregateFunction.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.aggregates.std;
+
+import java.util.List;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.comm.IFrameTupleAppender;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.partitions.JobFileState;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
+import org.apache.hyracks.dataflow.std.sort.Algorithm;
+import org.apache.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
+import org.apache.hyracks.dataflow.std.sort.ExternalSortRunMerger;
+import org.apache.hyracks.dataflow.std.sort.ISorter;
+
+public abstract class AbstractLocalMedianAggregateFunction extends AbstractMedianAggregateFunction {
+
+    protected final ExternalSortRunGenerator runsGenerator;
+    protected final IFrameTupleAppender appender;
+    private final ArrayTupleBuilder tupleBuilder;
+    private final int numFrames;
+    private ExternalSortRunMerger runsMerger;
+
+    public AbstractLocalMedianAggregateFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context,
+            SourceLocation sourceLoc, int numFrames) throws HyracksDataException {
+        super(args, context, sourceLoc);
+        this.numFrames = numFrames;
+        appender = new FrameTupleAppender(frame);
+        tupleBuilder = new ArrayTupleBuilder(1);
+        runsGenerator = new ExternalSortRunGenerator(context.getTaskContext(), new int[] { 0 },
+                new INormalizedKeyComputerFactory[] { doubleNkComputerFactory },
+                new IBinaryComparatorFactory[] { doubleComparatorFactory }, recordDesc, Algorithm.MERGE_SORT,
+                EnumFreeSlotPolicy.LAST_FIT, numFrames, Integer.MAX_VALUE);
+
+    }
+
+    @Override
+    public void init() throws HyracksDataException {
+        super.init();
+        appender.reset(frame, true);
+        runsGenerator.open();
+        runsGenerator.getSorter().reset();
+    }
+
+    protected void processDataValue(IFrameTupleReference tuple) throws HyracksDataException {
+        eval.evaluate(tuple, inputVal);
+        byte[] data = inputVal.getByteArray();
+        int start = inputVal.getStartOffset();
+        ATypeTag tag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(data[start]);
+        if (ATypeHierarchy.getTypeDomain(tag) == ATypeHierarchy.Domain.NUMERIC) {
+            count++;
+            aDouble.setValue(ATypeHierarchy.getDoubleValue(MEDIAN, 0, data, start));
+            tupleBuilder.reset();
+            tupleBuilder.addField(doubleSerde, aDouble);
+            FrameUtils.appendToWriter(runsGenerator, appender, tupleBuilder.getFieldEndOffsets(),
+                    tupleBuilder.getByteArray(), 0, tupleBuilder.getSize());
+        }
+    }
+
+    protected void finishLocalPartial(IPointable result) throws HyracksDataException {
+        if (count == 0) {
+            setPartialResult(result, -1, "", -1);
+            return;
+        }
+        if (appender.getTupleCount() > 0) {
+            appender.write(runsGenerator, true);
+        }
+        // close to sort the in-memory data or write out sorted data to run files
+        runsGenerator.close();
+
+        IHyracksTaskContext taskCtx = ctx.getTaskContext();
+        IHyracksJobletContext jobletCtx = taskCtx.getJobletContext();
+        NetworkAddress netAddress = ((NodeControllerService) jobletCtx.getServiceContext().getControllerService())
+                .getNetworkManager().getPublicNetworkAddress();
+        FileReference fileRef = writeRunFile(taskCtx, jobletCtx);
+        long fileId = jobletCtx.nextUniqueId();
+        taskCtx.setStateObject(new JobFileState(fileRef, jobletCtx.getJobId(), fileId));
+        setPartialResult(result, fileId, netAddress.getAddress(), netAddress.getPort());
+    }
+
+    private FileReference writeRunFile(IHyracksTaskContext taskCtx, IHyracksJobletContext jobletCtx)
+            throws HyracksDataException {
+        List<GeneratedRunFileReader> runs = runsGenerator.getRuns();
+        FileReference managedFile;
+        if (runs.isEmpty()) {
+            managedFile = jobletCtx.createManagedWorkspaceFile(MEDIAN);
+            writeMemoryDataToRunFile(managedFile, taskCtx);
+        } else if (runs.size() == 1) {
+            managedFile = runs.get(0).getFile();
+        } else {
+            managedFile = jobletCtx.createManagedWorkspaceFile(MEDIAN);
+            mergeRunsToRunFile(managedFile, taskCtx, runs);
+        }
+        return managedFile;
+    }
+
+    private void mergeRunsToRunFile(FileReference managedFile, IHyracksTaskContext taskCtx,
+            List<GeneratedRunFileReader> runs) throws HyracksDataException {
+        createOrResetRunsMerger(runs);
+        RunFileWriter runFileWriter = new RunFileWriter(managedFile, taskCtx.getIoManager());
+        IFrameWriter wrappingWriter = runsMerger.prepareFinalMergeResultWriter(runFileWriter);
+        try {
+            wrappingWriter.open();
+            runsMerger.process(wrappingWriter);
+        } finally {
+            wrappingWriter.close();
+        }
+    }
+
+    protected RunFileWriter writeMemoryDataToRunFile(FileReference managedFile, IHyracksTaskContext taskCtx)
+            throws HyracksDataException {
+        RunFileWriter runFileWriter = new RunFileWriter(managedFile, taskCtx.getIoManager());
+        try {
+            runFileWriter.open();
+            ISorter sorter = runsGenerator.getSorter();
+            if (sorter.hasRemaining()) {
+                sorter.flush(runFileWriter);
+            }
+        } finally {
+            runFileWriter.close();
+        }
+        return runFileWriter;
+    }
+
+    private void createOrResetRunsMerger(List<GeneratedRunFileReader> runs) {
+        if (runsMerger == null) {
+            IBinaryComparator[] comparators =
+                    new IBinaryComparator[] { doubleComparatorFactory.createBinaryComparator() };
+            INormalizedKeyComputer nmkComputer = doubleNkComputerFactory.createNormalizedKeyComputer();
+            runsMerger = new ExternalSortRunMerger(ctx.getTaskContext(), runs, new int[] { 0 }, comparators,
+                    nmkComputer, recordDesc, numFrames, Integer.MAX_VALUE);
+        } else {
+            runsMerger.reset(runs);
+        }
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMedianAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMedianAggregateFunction.java
new file mode 100644
index 0000000..a695c23
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMedianAggregateFunction.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.aggregates.std;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.NormalizedKeyComputerFactoryProvider;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.TypeTraitProvider;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AMutableDouble;
+import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.asterix.om.base.AMutableInt64;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.typecomputer.impl.LocalMedianTypeComputer;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
+import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameReader;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.comm.channels.FileNetworkInputChannel;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.net.NetworkManager;
+import org.apache.hyracks.control.nc.partitions.JobFileState;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.collectors.InputChannelFrameReader;
+import org.apache.hyracks.dataflow.std.sort.RunMergingFrameReader;
+import org.apache.hyracks.util.string.UTF8StringUtil;
+
+public abstract class AbstractMedianAggregateFunction extends AbstractAggregateFunction {
+
+    protected static final String MEDIAN = "median";
+    private static final int COUNT_FIELD_ID = 0;
+    private static final int HANDLE_FIELD_ID = 1;
+    private static final int ADDRESS_FIELD_ID = 2;
+    private static final int PORT_FIELD_ID = 3;
+
+    private final ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<AString> stringSerde =
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING);
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<AInt64> longSerde =
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<AInt32> intSerde =
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+    @SuppressWarnings("unchecked")
+    protected final ISerializerDeserializer<ADouble> doubleSerde =
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+    protected final IBinaryComparatorFactory doubleComparatorFactory =
+            BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(ATypeTag.DOUBLE, true);
+    protected final INormalizedKeyComputerFactory doubleNkComputerFactory =
+            NormalizedKeyComputerFactoryProvider.INSTANCE.getNormalizedKeyComputerFactory(BuiltinType.ADOUBLE, true);
+    protected final RecordDescriptor recordDesc = new RecordDescriptor(new ISerializerDeserializer[] { doubleSerde },
+            new ITypeTraits[] { TypeTraitProvider.INSTANCE.getTypeTrait(BuiltinType.ADOUBLE) });
+
+    protected final AMutableString aString = new AMutableString("");
+    protected final AMutableInt64 aInt64 = new AMutableInt64(0);
+    protected final AMutableInt32 aInt32 = new AMutableInt32(0);
+    protected final AMutableDouble aDouble = new AMutableDouble(0);
+    protected final IPointable inputVal = new VoidPointable();
+    private final FrameTupleReference ftr = new FrameTupleReference();
+    private final FrameTupleAccessor fta = new FrameTupleAccessor(recordDesc);
+    protected final List<IFrameReader> readers = new ArrayList<>();
+
+    protected final IScalarEvaluator eval;
+    protected final IEvaluatorContext ctx;
+    protected final IFrame frame;
+    protected long count;
+    private List<IFrame> inFrames;
+    private List<PartialResult> partialResults;
+    private RecordBuilder recBuilder;
+
+    public AbstractMedianAggregateFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context,
+            SourceLocation sourceLoc) throws HyracksDataException {
+        super(sourceLoc);
+        ctx = context;
+        eval = args[0].createScalarEvaluator(context);
+        frame = new VSizeFrame(context.getTaskContext());
+    }
+
+    @Override
+    public void init() throws HyracksDataException {
+        if (partialResults == null) {
+            partialResults = new ArrayList<>();
+        }
+        if (recBuilder == null) {
+            recBuilder = new RecordBuilder();
+            recBuilder.reset(LocalMedianTypeComputer.REC_TYPE);
+        }
+        count = 0;
+        partialResults.clear();
+        recBuilder.init();
+    }
+
+    protected void processPartialResults(IFrameTupleReference tuple) throws HyracksDataException {
+        eval.evaluate(tuple, inputVal);
+        byte[] serBytes = inputVal.getByteArray();
+        int offset = inputVal.getStartOffset();
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[offset]);
+        if (typeTag == ATypeTag.OBJECT) {
+            long handleCount = AInt64SerializerDeserializer.getLong(serBytes,
+                    ARecordSerializerDeserializer.getFieldOffsetById(serBytes, offset, COUNT_FIELD_ID, 0, false));
+            if (handleCount == 0) {
+                return;
+            }
+            count += handleCount;
+
+            long fileId = AInt64SerializerDeserializer.getLong(serBytes,
+                    ARecordSerializerDeserializer.getFieldOffsetById(serBytes, offset, HANDLE_FIELD_ID, 0, false));
+
+            String address = UTF8StringUtil.toString(serBytes,
+                    ARecordSerializerDeserializer.getFieldOffsetById(serBytes, offset, ADDRESS_FIELD_ID, 0, false));
+
+            int port = AInt32SerializerDeserializer.getInt(serBytes,
+                    ARecordSerializerDeserializer.getFieldOffsetById(serBytes, offset, PORT_FIELD_ID, 0, false));
+
+            partialResults.add(new PartialResult(fileId, handleCount, address, port));
+        } else {
+            throw new UnsupportedItemTypeException(sourceLoc, MEDIAN, serBytes[offset]);
+        }
+    }
+
+    protected void finishPartialResult(IPointable result) throws HyracksDataException {
+        if (count == 0) {
+            setPartialResult(result, -1, "", -1);
+            return;
+        }
+
+        IHyracksTaskContext taskCtx = ctx.getTaskContext();
+        IHyracksJobletContext jobletCtx = taskCtx.getJobletContext();
+        RunMergingFrameReader merger = createRunsMergingFrameReader();
+        FileReference managedFile = jobletCtx.createManagedWorkspaceFile(MEDIAN);
+        RunFileWriter runFileWriter = new RunFileWriter(managedFile, taskCtx.getIoManager());
+        merger.open();
+        runFileWriter.open();
+        try {
+            while (merger.nextFrame(frame)) {
+                runFileWriter.nextFrame(frame.getBuffer());
+            }
+        } finally {
+            runFileWriter.close();
+            merger.close();
+        }
+
+        NetworkAddress netAddress = ((NodeControllerService) jobletCtx.getServiceContext().getControllerService())
+                .getNetworkManager().getPublicNetworkAddress();
+
+        long fileId = jobletCtx.nextUniqueId();
+        taskCtx.setStateObject(new JobFileState(managedFile, jobletCtx.getJobId(), fileId));
+        setPartialResult(result, fileId, netAddress.getAddress(), netAddress.getPort());
+    }
+
+    protected void finishFinalResult(IPointable result) throws HyracksDataException {
+        if (count == 0) {
+            PointableHelper.setNull(result);
+            return;
+        }
+        try {
+            double medianVal = findMedian();
+            resultStorage.reset();
+            aDouble.setValue(medianVal);
+            doubleSerde.serialize(aDouble, resultStorage.getDataOutput());
+            result.set(resultStorage);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private double findMedian() throws HyracksDataException {
+        RunMergingFrameReader merger = createRunsMergingFrameReader();
+        return getMedian(merger);
+    }
+
+    protected RunMergingFrameReader createRunsMergingFrameReader() throws HyracksDataException {
+        IHyracksTaskContext taskCtx = ctx.getTaskContext();
+        IHyracksJobletContext jobletCtx = taskCtx.getJobletContext();
+        INCServiceContext serviceCtx = jobletCtx.getServiceContext();
+        NetworkManager netManager = ((NodeControllerService) serviceCtx.getControllerService()).getNetworkManager();
+        List<IFrame> inFrames = getInFrames(partialResults.size(), taskCtx);
+        readers.clear();
+        for (PartialResult partialResult : partialResults) {
+            IFrameReader inputChannelReader = createInputChannel(netManager, taskCtx,
+                    new NetworkAddress(partialResult.address, partialResult.port), jobletCtx.getJobId().getId(),
+                    partialResult.fileId);
+            readers.add(inputChannelReader);
+        }
+        return new RunMergingFrameReader(taskCtx, readers, inFrames, new int[] { 0 },
+                new IBinaryComparator[] { doubleComparatorFactory.createBinaryComparator() },
+                doubleNkComputerFactory.createNormalizedKeyComputer(), recordDesc);
+    }
+
+    private double getMedian(RunMergingFrameReader merger) throws HyracksDataException {
+        boolean isOdd = count % 2 != 0;
+        long medianPosition = isOdd ? count / 2 : (count - 1) / 2;
+        long currentTupleCount = 0;
+        double medianVal = -1;
+        merger.open();
+        try {
+            while (merger.nextFrame(frame)) {
+                fta.reset(frame.getBuffer());
+                int tupleCount = fta.getTupleCount();
+                if (currentTupleCount + tupleCount > medianPosition) {
+                    int firstMedian = (int) (medianPosition - currentTupleCount);
+                    ftr.reset(fta, firstMedian);
+                    medianVal = ADoubleSerializerDeserializer.getDouble(ftr.getFieldData(0), ftr.getFieldStart(0) + 1);
+                    if (!isOdd) {
+                        if (firstMedian + 1 < tupleCount) {
+                            // second median is in the same frame
+                            ftr.reset(fta, firstMedian + 1);
+                        } else {
+                            // second median is in the next frame
+                            merger.nextFrame(frame);
+                            fta.reset(frame.getBuffer());
+                            ftr.reset(fta, 0);
+                        }
+                        medianVal =
+                                (ADoubleSerializerDeserializer.getDouble(ftr.getFieldData(0), ftr.getFieldStart(0) + 1)
+                                        + medianVal) / 2;
+                    }
+                    break;
+                }
+                currentTupleCount += tupleCount;
+            }
+        } finally {
+            merger.close();
+        }
+        return medianVal;
+    }
+
+    protected void setPartialResult(IPointable result, long fileId, String address, int port)
+            throws HyracksDataException {
+        try {
+            resultStorage.reset();
+            aInt64.setValue(count);
+            longSerde.serialize(aInt64, resultStorage.getDataOutput());
+            recBuilder.addField(COUNT_FIELD_ID, resultStorage);
+
+            resultStorage.reset();
+            aInt64.setValue(fileId);
+            longSerde.serialize(aInt64, resultStorage.getDataOutput());
+            recBuilder.addField(HANDLE_FIELD_ID, resultStorage);
+
+            resultStorage.reset();
+            aString.setValue(address);
+            stringSerde.serialize(aString, resultStorage.getDataOutput());
+            recBuilder.addField(ADDRESS_FIELD_ID, resultStorage);
+
+            resultStorage.reset();
+            aInt32.setValue(port);
+            intSerde.serialize(aInt32, resultStorage.getDataOutput());
+            recBuilder.addField(PORT_FIELD_ID, resultStorage);
+
+            resultStorage.reset();
+            recBuilder.write(resultStorage.getDataOutput(), true);
+            result.set(resultStorage);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    protected List<IFrame> getInFrames(int size, IHyracksTaskContext taskCtx) throws HyracksDataException {
+        if (inFrames == null) {
+            inFrames = new ArrayList<>(size);
+        }
+        int k = 0;
+        for (int inFramesSize = inFrames.size(); k < size && k < inFramesSize; k++) {
+            inFrames.get(k).reset();
+        }
+        for (; k < size; k++) {
+            inFrames.add(new VSizeFrame(taskCtx));
+        }
+        return inFrames;
+    }
+
+    private IFrameReader createInputChannel(NetworkManager netManager, IHyracksTaskContext taskContext,
+            NetworkAddress networkAddress, long jobId, long fileId) throws HyracksDataException {
+        FileNetworkInputChannel FileNetworkInputChannel =
+                new FileNetworkInputChannel(netManager, getSocketAddress(networkAddress), jobId, fileId);
+        InputChannelFrameReader channelFrameReader = new InputChannelFrameReader(FileNetworkInputChannel);
+        FileNetworkInputChannel.registerMonitor(channelFrameReader);
+        FileNetworkInputChannel.open(taskContext);
+        return channelFrameReader;
+    }
+
+    private static SocketAddress getSocketAddress(NetworkAddress netAddress) throws HyracksDataException {
+        try {
+            return new InetSocketAddress(InetAddress.getByAddress(netAddress.lookupIpAddress()), netAddress.getPort());
+        } catch (UnknownHostException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public static class PartialResult {
+
+        long fileId;
+        String address;
+        int port;
+        long count;
+
+        PartialResult(long fileId, long count, String address, int port) {
+            this.fileId = fileId;
+            this.count = count;
+            this.address = address;
+            this.port = port;
+        }
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlMedianAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlMedianAggregateDescriptor.java
new file mode 100644
index 0000000..faf5272
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlMedianAggregateDescriptor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class GlobalSqlMedianAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final IFunctionDescriptorFactory FACTORY = GlobalSqlMedianAggregateDescriptor::new;
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.GLOBAL_SQL_MEDIAN;
+    }
+
+    @Override
+    public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) {
+        return new IAggregateEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IAggregateEvaluator createAggregateEvaluator(final IEvaluatorContext ctx)
+                    throws HyracksDataException {
+                return new GlobalSqlMedianAggregateFunction(args, ctx, sourceLoc);
+            }
+        };
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlMedianAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlMedianAggregateFunction.java
new file mode 100644
index 0000000..c9ee8d8
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlMedianAggregateFunction.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.aggregates.std;
+
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class GlobalSqlMedianAggregateFunction extends AbstractMedianAggregateFunction {
+
+    public GlobalSqlMedianAggregateFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context,
+            SourceLocation sourceLoc) throws HyracksDataException {
+        super(args, context, sourceLoc);
+
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws HyracksDataException {
+        processPartialResults(tuple);
+    }
+
+    @Override
+    public void finish(IPointable result) throws HyracksDataException {
+        finishFinalResult(result);
+    }
+
+    @Override
+    public void finishPartial(IPointable result) throws HyracksDataException {
+        finishPartialResult(result);
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlMedianAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlMedianAggregateDescriptor.java
new file mode 100644
index 0000000..532cb5f
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlMedianAggregateDescriptor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class IntermediateSqlMedianAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final IFunctionDescriptorFactory FACTORY = IntermediateSqlMedianAggregateDescriptor::new;
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.INTERMEDIATE_SQL_MEDIAN;
+    }
+
+    @Override
+    public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) {
+        return new IAggregateEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IAggregateEvaluator createAggregateEvaluator(final IEvaluatorContext ctx)
+                    throws HyracksDataException {
+                return new IntermediateSqlMedianAggregateFunction(args, ctx, sourceLoc);
+            }
+        };
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlMedianAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlMedianAggregateFunction.java
new file mode 100644
index 0000000..d78c17f
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlMedianAggregateFunction.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.aggregates.std;
+
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class IntermediateSqlMedianAggregateFunction extends AbstractMedianAggregateFunction {
+
+    public IntermediateSqlMedianAggregateFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context,
+            SourceLocation sourceLoc) throws HyracksDataException {
+        super(args, context, sourceLoc);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws HyracksDataException {
+        processPartialResults(tuple);
+    }
+
+    @Override
+    public void finish(IPointable result) throws HyracksDataException {
+        finishPartialResult(result);
+    }
+
+    @Override
+    public void finishPartial(IPointable result) throws HyracksDataException {
+        finishPartialResult(result);
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlMedianAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlMedianAggregateDescriptor.java
new file mode 100644
index 0000000..50f36ce
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlMedianAggregateDescriptor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.om.functions.IFunctionTypeInferer;
+import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import org.apache.asterix.runtime.functions.FunctionTypeInferers;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class LocalSqlMedianAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+
+        @Override
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new LocalSqlMedianAggregateDescriptor();
+        }
+
+        @Override
+        public IFunctionTypeInferer createFunctionTypeInferer() {
+            return FunctionTypeInferers.MEDIAN_MEMORY;
+        }
+    };
+
+    private int numFrames = 0;
+
+    @Override
+    public void setImmutableStates(Object... states) {
+        this.numFrames = (int) states[0];
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.LOCAL_SQL_MEDIAN;
+    }
+
+    @Override
+    public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) {
+        return new IAggregateEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IAggregateEvaluator createAggregateEvaluator(final IEvaluatorContext ctx)
+                    throws HyracksDataException {
+                return new LocalSqlMedianAggregateFunction(args, ctx, sourceLoc, numFrames);
+            }
+        };
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlMedianAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlMedianAggregateFunction.java
new file mode 100644
index 0000000..5400ca2
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlMedianAggregateFunction.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.aggregates.std;
+
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class LocalSqlMedianAggregateFunction extends AbstractLocalMedianAggregateFunction {
+
+    public LocalSqlMedianAggregateFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context,
+            SourceLocation sourceLoc, int numFrames) throws HyracksDataException {
+        super(args, context, sourceLoc, numFrames);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws HyracksDataException {
+        processDataValue(tuple);
+    }
+
+    @Override
+    public void finishPartial(IPointable result) throws HyracksDataException {
+        finishLocalPartial(result);
+    }
+
+    @Override
+    public void finish(IPointable result) throws HyracksDataException {
+        finishLocalPartial(result);
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlMedianAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlMedianAggregateDescriptor.java
new file mode 100644
index 0000000..b98e389
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlMedianAggregateDescriptor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.aggregates.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.om.functions.IFunctionTypeInferer;
+import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import org.apache.asterix.runtime.functions.FunctionTypeInferers;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class SqlMedianAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+
+        @Override
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SqlMedianAggregateDescriptor();
+        }
+
+        @Override
+        public IFunctionTypeInferer createFunctionTypeInferer() {
+            return FunctionTypeInferers.MEDIAN_MEMORY;
+        }
+    };
+
+    private int numFrames = 0;
+
+    @Override
+    public void setImmutableStates(Object... states) {
+        this.numFrames = (int) states[0];
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.SQL_MEDIAN;
+    }
+
+    @Override
+    public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) {
+        return new IAggregateEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IAggregateEvaluator createAggregateEvaluator(final IEvaluatorContext ctx)
+                    throws HyracksDataException {
+                return new SqlMedianAggregateFunction(args, ctx, sourceLoc, numFrames);
+            }
+        };
+    }
+
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlMedianAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlMedianAggregateFunction.java
new file mode 100644
index 0000000..1dbbfe6
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlMedianAggregateFunction.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.aggregates.std;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.sort.RunMergingFrameReader;
+
+public class SqlMedianAggregateFunction extends AbstractLocalMedianAggregateFunction {
+
+    public SqlMedianAggregateFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context,
+            SourceLocation sourceLoc, int numFrames) throws HyracksDataException {
+        super(args, context, sourceLoc, numFrames);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws HyracksDataException {
+        processDataValue(tuple);
+    }
+
+    @Override
+    public void finish(IPointable result) throws HyracksDataException {
+        if (appender.getTupleCount() > 0) {
+            appender.write(runsGenerator, true);
+        }
+        // close to sort the in-memory data or write out sorted data to run files
+        runsGenerator.close();
+        super.finishFinalResult(result);
+    }
+
+    @Override
+    public void finishPartial(IPointable result) throws HyracksDataException {
+        finishLocalPartial(result);
+    }
+
+    @Override
+    protected RunMergingFrameReader createRunsMergingFrameReader() throws HyracksDataException {
+        IHyracksTaskContext taskCtx = ctx.getTaskContext();
+        List<GeneratedRunFileReader> runs = runsGenerator.getRuns();
+        readers.clear();
+        if (runs.isEmpty()) {
+            //TODO: no need to write memory to run file, should just read the sorted data out of the sorter
+            FileReference managedFile = taskCtx.createManagedWorkspaceFile(MEDIAN);
+            RunFileWriter runFileWriter = writeMemoryDataToRunFile(managedFile, taskCtx);
+            GeneratedRunFileReader deleteOnCloseReader = runFileWriter.createDeleteOnCloseReader();
+            readers.add(deleteOnCloseReader);
+        } else {
+            readers.addAll(runs);
+        }
+
+        List<IFrame> inFrames = getInFrames(readers.size(), taskCtx);
+        return new RunMergingFrameReader(taskCtx, readers, inFrames, new int[] { 0 },
+                new IBinaryComparator[] { doubleComparatorFactory.createBinaryComparator() },
+                doubleNkComputerFactory.createNormalizedKeyComputer(), recordDesc);
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
index 4b418b6..2b8d4e2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
@@ -54,6 +54,7 @@
 import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlKurtosisDistinctAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlMaxAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlMaxDistinctAggregateDescriptor;
+import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlMedianAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlMinAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlMinDistinctAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlSkewnessAggregateDescriptor;
@@ -156,6 +157,7 @@
 import org.apache.asterix.runtime.aggregates.std.GlobalSqlAvgAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.GlobalSqlKurtosisAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.GlobalSqlMaxAggregateDescriptor;
+import org.apache.asterix.runtime.aggregates.std.GlobalSqlMedianAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.GlobalSqlMinAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.GlobalSqlSkewnessAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.GlobalSqlStddevAggregateDescriptor;
@@ -178,6 +180,7 @@
 import org.apache.asterix.runtime.aggregates.std.IntermediateSqlAvgAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.IntermediateSqlKurtosisAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.IntermediateSqlMaxAggregateDescriptor;
+import org.apache.asterix.runtime.aggregates.std.IntermediateSqlMedianAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.IntermediateSqlMinAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.IntermediateSqlSkewnessAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.IntermediateSqlStddevAggregateDescriptor;
@@ -202,6 +205,7 @@
 import org.apache.asterix.runtime.aggregates.std.LocalSqlAvgAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.LocalSqlKurtosisAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.LocalSqlMaxAggregateDescriptor;
+import org.apache.asterix.runtime.aggregates.std.LocalSqlMedianAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.LocalSqlMinAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.LocalSqlSkewnessAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.LocalSqlStddevAggregateDescriptor;
@@ -224,6 +228,7 @@
 import org.apache.asterix.runtime.aggregates.std.SqlCountAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.SqlKurtosisAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.SqlMaxAggregateDescriptor;
+import org.apache.asterix.runtime.aggregates.std.SqlMedianAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.SqlMinAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.SqlSkewnessAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.SqlStddevAggregateDescriptor;
@@ -816,6 +821,10 @@
         fc.add(LocalSqlMinAggregateDescriptor.FACTORY);
         fc.add(IntermediateSqlMinAggregateDescriptor.FACTORY);
         fc.add(GlobalSqlMinAggregateDescriptor.FACTORY);
+        fc.add(SqlMedianAggregateDescriptor.FACTORY);
+        fc.add(LocalSqlMedianAggregateDescriptor.FACTORY);
+        fc.add(IntermediateSqlMedianAggregateDescriptor.FACTORY);
+        fc.add(GlobalSqlMedianAggregateDescriptor.FACTORY);
         fc.add(SqlStddevAggregateDescriptor.FACTORY);
         fc.add(LocalSqlStddevAggregateDescriptor.FACTORY);
         fc.add(IntermediateSqlStddevAggregateDescriptor.FACTORY);
@@ -891,6 +900,7 @@
         fc.add(ScalarSqlMaxDistinctAggregateDescriptor.FACTORY);
         fc.add(ScalarSqlMinAggregateDescriptor.FACTORY);
         fc.add(ScalarSqlMinDistinctAggregateDescriptor.FACTORY);
+        fc.add(ScalarSqlMedianAggregateDescriptor.FACTORY);
         fc.add(ScalarSqlStddevAggregateDescriptor.FACTORY);
         fc.add(ScalarSqlStddevDistinctAggregateDescriptor.FACTORY);
         fc.add(ScalarSqlStddevPopAggregateDescriptor.FACTORY);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java
index bc763bd..7d484b6 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java
@@ -116,6 +116,9 @@
         }
     };
 
+    public static final IFunctionTypeInferer MEDIAN_MEMORY =
+            (expr, fd, context, compilerProps) -> fd.setImmutableStates(compilerProps.getSortMemoryFrames());
+
     public static final class CastTypeInferer implements IFunctionTypeInferer {
         @Override
         public void infer(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment context,
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
index 4d324f4..422782e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
@@ -44,4 +44,6 @@
     Class<?> loadClass(String className) throws HyracksException;
 
     ClassLoader getClassLoader() throws HyracksException;
+
+    long nextUniqueId();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/FileNetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/FileNetworkInputChannel.java
new file mode 100644
index 0000000..542fda1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/FileNetworkInputChannel.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.comm.channels;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import org.apache.hyracks.api.channels.IInputChannel;
+import org.apache.hyracks.api.channels.IInputChannelMonitor;
+import org.apache.hyracks.api.comm.IBufferAcceptor;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FileNetworkInputChannel implements IInputChannel {
+
+    private static final int NUM_READ_BUFFERS = 1;
+    public static final long FILE_CHANNEL_CODE = -1;
+
+    private final IChannelConnectionFactory netManager;
+    private final SocketAddress remoteAddress;
+    private final long jobId;
+    private final long fileId;
+    private final Queue<ByteBuffer> fullQueue;
+    private final int nBuffers;
+    private IChannelControlBlock ccb;
+    private IInputChannelMonitor monitor;
+    private Object attachment;
+
+    public FileNetworkInputChannel(IChannelConnectionFactory netManager, SocketAddress remoteAddress, long jobId,
+            long fileId) {
+        this.netManager = netManager;
+        this.remoteAddress = remoteAddress;
+        this.jobId = jobId;
+        this.fileId = fileId;
+        this.fullQueue = new ArrayDeque<>(NUM_READ_BUFFERS);
+        this.nBuffers = NUM_READ_BUFFERS;
+    }
+
+    @Override
+    public void registerMonitor(IInputChannelMonitor monitor) {
+        this.monitor = monitor;
+    }
+
+    @Override
+    public void setAttachment(Object attachment) {
+        this.attachment = attachment;
+    }
+
+    @Override
+    public Object getAttachment() {
+        return attachment;
+    }
+
+    @Override
+    public synchronized ByteBuffer getNextBuffer() {
+        return fullQueue.poll();
+    }
+
+    @Override
+    public void recycleBuffer(ByteBuffer buffer) {
+        buffer.clear();
+        ccb.getReadInterface().getEmptyBufferAcceptor().accept(buffer);
+    }
+
+    @Override
+    public void open(IHyracksCommonContext ctx) throws HyracksDataException {
+        try {
+            ccb = netManager.connect(remoteAddress);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+        ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor());
+        ccb.getWriteInterface().setEmptyBufferAcceptor(WriteEmptyBufferAcceptor.INSTANCE);
+        ccb.getReadInterface().setBufferFactory(new ReadBufferFactory(nBuffers, ctx), nBuffers,
+                ctx.getInitialFrameSize());
+
+        ByteBuffer writeBuffer = ByteBuffer.allocate(NetworkInputChannel.INITIAL_MESSAGE_SIZE);
+        writeBuffer.putLong(FILE_CHANNEL_CODE);
+        writeBuffer.putLong(jobId);
+        writeBuffer.putLong(fileId);
+        writeBuffer.flip();
+
+        ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer);
+        ccb.getWriteInterface().getFullBufferAcceptor().close();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+
+    }
+
+    @Override
+    public void fail() {
+        // do nothing (covered by job lifecycle)
+    }
+
+    private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
+        @Override
+        public void accept(ByteBuffer buffer) {
+            fullQueue.add(buffer);
+            monitor.notifyDataAvailability(FileNetworkInputChannel.this, 1);
+        }
+
+        @Override
+        public void close() {
+            monitor.notifyEndOfStream(FileNetworkInputChannel.this);
+        }
+
+        @Override
+        public void error(int ecode) {
+            monitor.notifyFailure(FileNetworkInputChannel.this, ecode);
+        }
+    }
+
+    private static class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+
+        static final WriteEmptyBufferAcceptor INSTANCE = new WriteEmptyBufferAcceptor();
+
+        @Override
+        public void accept(ByteBuffer buffer) {
+            // do nothing
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
index 53bb7cd..5ae81fb 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
@@ -37,7 +37,7 @@
 public class NetworkInputChannel implements IInputChannel {
     private static final Logger LOGGER = LogManager.getLogger();
 
-    static final int INITIAL_MESSAGE_SIZE = 20;
+    public static final int INITIAL_MESSAGE_SIZE = 24;
 
     private final IChannelConnectionFactory netManager;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index f179c36..a96dfe5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -108,6 +108,7 @@
     private final String jobStartTimeZoneId;
 
     private final long maxWarnings;
+    private final AtomicLong uniqueIds;
 
     public Joblet(NodeControllerService nodeController, DeploymentId deploymentId, JobId jobId,
             INCServiceContext serviceCtx, ActivityClusterGraph acg,
@@ -140,6 +141,7 @@
         this.jobStartTime = jobStartTime;
         this.jobStartTimeZoneId = jobStartTimeZoneId;
         this.maxWarnings = acg.getMaxWarnings();
+        this.uniqueIds = new AtomicLong();
     }
 
     @Override
@@ -161,6 +163,11 @@
     }
 
     @Override
+    public long nextUniqueId() {
+        return uniqueIds.getAndIncrement();
+    }
+
+    @Override
     public long getJobStartTime() {
         return jobStartTime;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
index 6876618..468a969 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
@@ -31,8 +31,12 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.network.ISocketChannelFactory;
 import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.comm.channels.FileNetworkInputChannel;
 import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
+import org.apache.hyracks.comm.channels.NetworkInputChannel;
 import org.apache.hyracks.comm.channels.NetworkOutputChannel;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.partitions.PartitionFileReaderUtil;
 import org.apache.hyracks.control.nc.partitions.PartitionManager;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
@@ -47,8 +51,6 @@
 
     private static final int MAX_CONNECTION_ATTEMPTS = 5;
 
-    static final int INITIAL_MESSAGE_SIZE = 20;
-
     private final PartitionManager partitionManager;
 
     private final int nBuffers;
@@ -113,7 +115,8 @@
         @Override
         public void channelOpened(ChannelControlBlock channel) {
             channel.getReadInterface().setFullBufferAcceptor(new InitialBufferAcceptor(channel));
-            channel.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(INITIAL_MESSAGE_SIZE));
+            channel.getReadInterface().getEmptyBufferAcceptor()
+                    .accept(ByteBuffer.allocate(NetworkInputChannel.INITIAL_MESSAGE_SIZE));
         }
     }
 
@@ -128,12 +131,13 @@
 
         @Override
         public void accept(ByteBuffer buffer) {
-            PartitionId pid = readInitialMessage(buffer);
-            if (LOGGER.isTraceEnabled()) {
-                LOGGER.trace("Received initial partition request: " + pid + " on channel: " + ccb);
-            }
             noc = new NetworkOutputChannel(ccb, nBuffers);
-            partitionManager.registerPartitionRequest(pid, noc);
+            long id = buffer.getLong();
+            if (id == FileNetworkInputChannel.FILE_CHANNEL_CODE) {
+                handleFileRequest(buffer);
+            } else {
+                handlePartitionRequest(buffer, id);
+            }
         }
 
         @Override
@@ -147,16 +151,37 @@
                 noc.abort(ecode);
             }
         }
+
+        private void handlePartitionRequest(ByteBuffer buffer, long jid) {
+            PartitionId pid = readInitialMessage(buffer, jid);
+            if (LOGGER.isTraceEnabled()) {
+                LOGGER.trace("Received initial partition request: " + pid + " on channel: " + ccb);
+            }
+            partitionManager.registerPartitionRequest(pid, noc);
+        }
+
+        private void handleFileRequest(ByteBuffer buffer) {
+            JobId jobId = new JobId(buffer.getLong());
+            long fileId = buffer.getLong();
+            if (partitionManager.registerFileRequest(jobId, noc)) {
+                writeFileToChannel(partitionManager.getNodeControllerService(), noc, jobId, fileId);
+            }
+        }
     }
 
-    private static PartitionId readInitialMessage(ByteBuffer buffer) {
-        JobId jobId = new JobId(buffer.getLong());
+    private static PartitionId readInitialMessage(ByteBuffer buffer, long jid) {
+        JobId jobId = new JobId(jid);
         ConnectorDescriptorId cdid = new ConnectorDescriptorId(buffer.getInt());
         int senderIndex = buffer.getInt();
         int receiverIndex = buffer.getInt();
         return new PartitionId(jobId, cdid, senderIndex, receiverIndex);
     }
 
+    private static void writeFileToChannel(NodeControllerService ncs, NetworkOutputChannel noc, JobId jobId,
+            long fileId) {
+        PartitionFileReaderUtil.writeFileToChannel(ncs, noc, jobId, fileId);
+    }
+
     public MuxDemuxPerformanceCounters getPerformanceCounters() {
         return md.getPerformanceCounters();
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/JobFileState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/JobFileState.java
new file mode 100644
index 0000000..e4e6f09
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/JobFileState.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.partitions;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hyracks.api.dataflow.state.IStateObject;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.job.JobId;
+
+public class JobFileState implements IStateObject {
+
+    private final FileReference fileRef;
+    private final JobId jobId;
+    private final JobFileId jobFileId;
+
+    public JobFileState(FileReference fileRef, JobId jobId, long fileId) {
+        this.fileRef = fileRef;
+        this.jobFileId = new JobFileId(fileId);
+        this.jobId = jobId;
+    }
+
+    @Override
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    @Override
+    public Object getId() {
+        return jobFileId;
+    }
+
+    @Override
+    public long getMemoryOccupancy() {
+        return 0;
+    }
+
+    @Override
+    public void toBytes(DataOutput out) throws IOException {
+
+    }
+
+    @Override
+    public void fromBytes(DataInput in) throws IOException {
+
+    }
+
+    public FileReference getFileRef() {
+        return fileRef;
+    }
+
+    public static class JobFileId {
+
+        private final long fileId;
+
+        public JobFileId(long fileId) {
+            this.fileId = fileId;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof JobFileId)) {
+                return false;
+            }
+            JobFileId otherFileId = (JobFileId) o;
+            return otherFileId.fileId == fileId;
+        }
+
+        @Override
+        public int hashCode() {
+            return Long.hashCode(fileId);
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java
index 3e36946..218e557 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java
@@ -18,14 +18,11 @@
  */
 package org.apache.hyracks.control.nc.partitions;
 
-import java.nio.ByteBuffer;
 import java.util.concurrent.Executor;
 
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IFileHandle;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.partitions.IPartition;
 
@@ -60,45 +57,7 @@
 
     @Override
     public void writeTo(final IFrameWriter writer) {
-        executor.execute(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    if (partitionFile == null) {
-                        writer.open();
-                        writer.close();
-                        return;
-                    }
-                    IFileHandle fh = ioManager.open(partitionFile, IIOManager.FileReadWriteMode.READ_ONLY,
-                            IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
-                    try {
-                        writer.open();
-                        try {
-                            long offset = 0;
-                            ByteBuffer buffer = ctx.allocateFrame();
-                            while (true) {
-                                buffer.clear();
-                                long size = ioManager.syncRead(fh, offset, buffer);
-                                if (size < 0) {
-                                    break;
-                                } else if (size < buffer.capacity()) {
-                                    throw new HyracksDataException("Premature end of file");
-                                }
-                                offset += size;
-                                buffer.flip();
-                                writer.nextFrame(buffer);
-                            }
-                        } finally {
-                            writer.close();
-                        }
-                    } finally {
-                        ioManager.close(fh);
-                    }
-                } catch (HyracksDataException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        });
+        executor.execute(new PartitionFileReader(ctx, partitionFile, ioManager, writer));
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReader.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReader.java
new file mode 100644
index 0000000..a0d8dcd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReader.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.io.IIOManager;
+
+public class PartitionFileReader implements Runnable {
+
+    private final IHyracksCommonContext ctx;
+    private final FileReference partitionFile;
+    private final IIOManager ioManager;
+    private final IFrameWriter writer;
+
+    public PartitionFileReader(IHyracksCommonContext ctx, FileReference partitionFile, IIOManager ioManager,
+            IFrameWriter writer) {
+        this.ctx = ctx;
+        this.partitionFile = partitionFile;
+        this.ioManager = ioManager;
+        this.writer = writer;
+    }
+
+    @Override
+    public void run() {
+        try {
+            if (partitionFile == null) {
+                writer.open();
+                writer.close();
+                return;
+            }
+            IFileHandle fh = ioManager.open(partitionFile, IIOManager.FileReadWriteMode.READ_ONLY,
+                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+            try {
+                writer.open();
+                try {
+                    long offset = 0;
+                    ByteBuffer buffer = ctx.allocateFrame();
+                    while (true) {
+                        buffer.clear();
+                        long size = ioManager.syncRead(fh, offset, buffer);
+                        if (size < 0) {
+                            break;
+                        } else if (size < buffer.capacity()) {
+                            throw new HyracksDataException("Premature end of file");
+                        }
+                        offset += size;
+                        buffer.flip();
+                        writer.nextFrame(buffer);
+                    }
+                } finally {
+                    writer.close();
+                }
+            } finally {
+                ioManager.close(fh);
+            }
+        } catch (HyracksDataException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReaderUtil.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReaderUtil.java
new file mode 100644
index 0000000..2675c27
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReaderUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.partitions;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.hyracks.api.dataflow.state.IStateObject;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.comm.channels.NetworkOutputChannel;
+import org.apache.hyracks.control.nc.Joblet;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface;
+
+public class PartitionFileReaderUtil {
+
+    private PartitionFileReaderUtil() {
+    }
+
+    public static void writeFileToChannel(NodeControllerService ncs, NetworkOutputChannel noc, JobId jobId,
+            long fileId) {
+        Joblet joblet = ncs.getJobletMap().get(jobId);
+        if (joblet == null) {
+            noc.abort(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
+            return;
+        }
+        IStateObject stateObject = joblet.getEnvironment().getStateObject(new JobFileState.JobFileId(fileId));
+        if (!(stateObject instanceof JobFileState)) {
+            noc.abort(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
+            return;
+        }
+        JobFileState fileState = (JobFileState) stateObject;
+        FileReference fileRef = fileState.getFileRef();
+        if (!fileRef.getFile().exists()) {
+            noc.abort(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
+            return;
+        }
+        ExecutorService executor = ncs.getExecutor();
+        noc.setFrameSize(joblet.getInitialFrameSize());
+        executor.execute(new PartitionFileReader(joblet, fileRef, ncs.getIoManager(), noc));
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
index c082b71..927dd7c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
@@ -58,6 +58,8 @@
 
     private final Map<PartitionId, NetworkOutputChannel> partitionRequests = new HashMap<>();
 
+    private final Map<JobId, List<NetworkOutputChannel>> fileRequests = new HashMap<>();
+
     private final Cache<JobId, JobId> failedJobsCache;
 
     public PartitionManager(NodeControllerService ncs) {
@@ -122,10 +124,24 @@
         }
     }
 
+    public synchronized boolean registerFileRequest(JobId jid, NetworkOutputChannel noc) {
+        if (failedJobsCache.getIfPresent(jid) != null) {
+            noc.abort(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
+            return false;
+        }
+        List<NetworkOutputChannel> netOutChannels = fileRequests.computeIfAbsent(jid, k -> new ArrayList<>());
+        netOutChannels.add(noc);
+        return true;
+    }
+
     public IWorkspaceFileFactory getFileFactory() {
         return fileFactory;
     }
 
+    public NodeControllerService getNodeControllerService() {
+        return ncs;
+    }
+
     public void close() {
         deallocatableRegistry.close();
     }
@@ -175,6 +191,10 @@
     }
 
     private List<NetworkOutputChannel> removePendingRequests(JobId jobId, JobStatus status) {
+        List<NetworkOutputChannel> jobFileRequests = null;
+        if (!fileRequests.isEmpty()) {
+            jobFileRequests = fileRequests.remove(jobId);
+        }
         if (status != JobStatus.FAILURE) {
             return Collections.emptyList();
         }
@@ -189,6 +209,9 @@
                 requestsIterator.remove();
             }
         }
+        if (jobFileRequests != null && !jobFileRequests.isEmpty()) {
+            pendingRequests.addAll(jobFileRequests);
+        }
         return pendingRequests;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
index 50cacb2..7051e1d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
@@ -115,6 +115,10 @@
         return size;
     }
 
+    public FileReference getFile() {
+        return file;
+    }
+
     public void setDeleteAfterClose(boolean deleteAfterClose) {
         this.deleteAfterClose = deleteAfterClose;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
index 46e3eec..bf4cbd0 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
@@ -20,6 +20,7 @@
 
 import java.nio.ByteBuffer;
 import java.time.ZoneId;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.context.IHyracksJobletContext;
@@ -42,6 +43,7 @@
     private final WorkspaceFileFactory fileFactory;
     private final long jobStartTime;
     private final String jobStartTimeZoneId;
+    private final AtomicLong ids;
 
     TestJobletContext(int frameSize, INCServiceContext serviceContext, JobId jobId) throws HyracksException {
         this.serviceContext = serviceContext;
@@ -50,6 +52,7 @@
         this.frameManger = new FrameManager(frameSize);
         this.jobStartTime = System.currentTimeMillis();
         this.jobStartTimeZoneId = ZoneId.systemDefault().getId();
+        this.ids = new AtomicLong();
     }
 
     @Override
@@ -151,4 +154,8 @@
         return this.getClass().getClassLoader();
     }
 
+    @Override
+    public long nextUniqueId() {
+        return ids.getAndIncrement();
+    }
 }