Merge branch 'gerrit/trinity' into 'master'

Change-Id: I1850f4af7fcaca4649d503cbe64f6f33a853a524
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index 983f99c..28549f3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -29,7 +29,6 @@
 import org.apache.hyracks.api.job.resource.IClusterCapacity;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.job.JobRun;
-import org.apache.hyracks.util.StorageUtil;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
@@ -124,14 +123,14 @@
         putTime(json, jobRun.getCreateTime(), "jobCreateTime", dateTime);
         putTime(json, jobRun.getStartTime(), "jobStartTime", dateTime);
         putTime(json, jobRun.getEndTime(), "jobEndTime", dateTime);
-        json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(jobRun.getQueueWaitTimeInMillis()) + "s");
+        json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(jobRun.getQueueWaitTimeInMillis()));
     }
 
     private static void putJobRequiredResources(ObjectNode json, JobRun jobRun) {
         IClusterCapacity jobCapacity = jobRun.getJobSpecification().getRequiredClusterCapacity();
         if (jobCapacity != null) {
             json.put("jobRequiredCPUs", jobCapacity.getAggregatedCores());
-            json.put("jobRequiredMemory", StorageUtil.toHumanReadableSize(jobCapacity.getAggregatedMemoryByteSize()));
+            json.put("jobRequiredMemory", jobCapacity.getAggregatedMemoryByteSize());
         }
     }
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
index af119d6..02f1f0e 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
@@ -1227,10 +1227,14 @@
         boolean isWin = BuiltinFunctions.isWindowFunction(fi);
         boolean isWinAgg = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
                 BuiltinFunctions.WindowFunctionProperty.HAS_LIST_ARG);
-        boolean prohibitOrderClause = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
-                BuiltinFunctions.WindowFunctionProperty.NO_ORDER_CLAUSE);
-        boolean prohibitFrameClause = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
-                BuiltinFunctions.WindowFunctionProperty.NO_FRAME_CLAUSE);
+        boolean prohibitOrderClause = (isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
+                BuiltinFunctions.WindowFunctionProperty.NO_ORDER_CLAUSE))
+                || (!isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
+                        BuiltinFunctions.AggregateFunctionProperty.NO_ORDER_CLAUSE));
+        boolean prohibitFrameClause = (isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
+                BuiltinFunctions.WindowFunctionProperty.NO_FRAME_CLAUSE))
+                || (!isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
+                        BuiltinFunctions.AggregateFunctionProperty.NO_FRAME_CLAUSE));
         boolean allowRespectIgnoreNulls = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
                 BuiltinFunctions.WindowFunctionProperty.ALLOW_RESPECT_IGNORE_NULLS);
         boolean allowFromFirstLast = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 27be120..914a625 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -532,6 +532,7 @@
                     -Xdebug
                     -Xrunjdwp:transport=dt_socket,server=y,address=8000,suspend=${debug.suspend.flag}
                     -Xloggc:"${project.build.directory}/surefire-reports/SqlppExecutionTest-%p-gc.log" -XX:+PrintGC -XX:+PrintGCDateStamps -XX:GCLogFileSize=10M
+                    --add-opens=jdk.management/com.sun.management.internal=ALL-UNNAMED --add-opens=java.management/sun.management=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED
                   </argLine>
                 </configuration>
                 <goals>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 16f922f..47498ea 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -341,7 +341,7 @@
                     final AlgebricksAbsolutePartitionConstraint jobLocations =
                             getJobLocations(spec, nodeJobTracker, computationLocations);
                     final IClusterCapacity jobRequiredCapacity =
-                            ResourceUtils.getRequiredCapacity(plan, jobLocations, physOptConf);
+                            ResourceUtils.getRequiredCapacity(plan, jobLocations, physOptConf, compilerProperties);
                     addRuntimeMemoryOverhead(jobRequiredCapacity, compilerProperties);
                     spec.setRequiredClusterCapacity(jobRequiredCapacity);
                 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
index a16ac84..9e77922 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
@@ -18,25 +18,42 @@
  */
 package org.apache.asterix.app.resource;
 
+import org.apache.asterix.common.config.CompilerProperties;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 public class OperatorResourcesComputer {
 
+    private static final Logger LOGGER = LogManager.getLogger();
     public static final int MIN_OPERATOR_CORES = 1;
     private static final long MAX_BUFFER_PER_CONNECTION = 1L;
 
     private final int numComputationPartitions;
     private final long frameSize;
+    private final ExpressionMemoryComputer exprMemoryComputer;
+    private final CompilerProperties compilerProperties;
 
-    public OperatorResourcesComputer(int numComputationPartitions, long frameSize) {
+    public OperatorResourcesComputer(int numComputationPartitions, long frameSize,
+            CompilerProperties compilerProperties) {
         this.numComputationPartitions = numComputationPartitions;
         this.frameSize = frameSize;
+        this.exprMemoryComputer = new ExpressionMemoryComputer();
+        this.compilerProperties = compilerProperties;
     }
 
     public int getOperatorRequiredCores(ILogicalOperator operator) {
@@ -52,10 +69,22 @@
             return getExchangeRequiredMemory((ExchangeOperator) operator);
         } else {
             IPhysicalOperator physOp = ((AbstractLogicalOperator) operator).getPhysicalOperator();
-            return getOperatorRequiredMemory(operator.getExecutionMode(), physOp.getLocalMemoryRequirements());
+            return getOperatorRequiredMemory(operator.getExecutionMode(), physOp.getLocalMemoryRequirements())
+                    + getOperatorExpressionsRequiredMemory(operator);
         }
     }
 
+    private long getOperatorExpressionsRequiredMemory(ILogicalOperator operator) {
+        exprMemoryComputer.reset(operator);
+        try {
+            operator.acceptExpressionTransform(exprMemoryComputer);
+        } catch (Throwable e) {
+            // ignore
+            LOGGER.warn("encountered error while computing operator expressions required memory", e);
+        }
+        return exprMemoryComputer.requiredMemory;
+    }
+
     private long getOperatorRequiredMemory(AbstractLogicalOperator.ExecutionMode opExecMode, long memorySize) {
         if (opExecMode == AbstractLogicalOperator.ExecutionMode.PARTITIONED
                 || opExecMode == AbstractLogicalOperator.ExecutionMode.LOCAL) {
@@ -78,4 +107,42 @@
         }
         return 2L * MAX_BUFFER_PER_CONNECTION * numComputationPartitions * numComputationPartitions * frameSize;
     }
+
+    class ExpressionMemoryComputer implements ILogicalExpressionReferenceTransform {
+
+        private long requiredMemory;
+        private ILogicalOperator operator;
+
+        public ExpressionMemoryComputer() {
+        }
+
+        @Override
+        public boolean transform(Mutable<ILogicalExpression> expression) throws AlgebricksException {
+            ILogicalExpression expr = expression.getValue();
+            if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+                AbstractFunctionCallExpression funExpr = (AbstractFunctionCallExpression) expr;
+                if (funExpr.getKind() == AbstractFunctionCallExpression.FunctionKind.AGGREGATE) {
+                    if (isMedian(funExpr.getFunctionIdentifier())) {
+                        requiredMemory +=
+                                (compilerProperties.getSortMemorySize() * numCompute(operator.getExecutionMode()));
+                    }
+                }
+            }
+            return false;
+        }
+
+        private int numCompute(AbstractLogicalOperator.ExecutionMode executionMode) {
+            return (executionMode == AbstractLogicalOperator.ExecutionMode.PARTITIONED
+                    || executionMode == AbstractLogicalOperator.ExecutionMode.LOCAL) ? numComputationPartitions : 1;
+        }
+
+        private boolean isMedian(FunctionIdentifier funId) {
+            return BuiltinFunctions.LOCAL_SQL_MEDIAN.equals(funId) || BuiltinFunctions.SQL_MEDIAN.equals(funId);
+        }
+
+        public void reset(ILogicalOperator op) {
+            requiredMemory = 0;
+            operator = op;
+        }
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
index ba11956..353bebf 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
@@ -24,6 +24,7 @@
 import org.apache.asterix.app.resource.OperatorResourcesComputer;
 import org.apache.asterix.app.resource.PlanStage;
 import org.apache.asterix.app.resource.PlanStagesGenerator;
+import org.apache.asterix.common.config.CompilerProperties;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -38,25 +39,28 @@
     }
 
     /**
-     * Calculates the required cluster capacity from a given query plan, the computation locations,
-     * the operator memory budgets, and frame size.
+     * Calculates the required cluster capacity from a given query plan, the computation locations, the operator memory
+     * budgets, and frame size.
      *
      * @param plan,
-     *            a given query plan.
+     *         a given query plan.
      * @param computationLocations,
-     *            the partitions for computation.
+     *         the partitions for computation.
      * @param physicalOptimizationConfig,
-     *            a PhysicalOptimizationConfig.
+     *         a PhysicalOptimizationConfig.
+     * @param compilerProperties
      * @return the required cluster capacity for executing the query.
      * @throws AlgebricksException
-     *             if the query plan is malformed.
+     *         if the query plan is malformed.
      */
     public static IClusterCapacity getRequiredCapacity(ILogicalPlan plan,
             AlgebricksAbsolutePartitionConstraint computationLocations,
-            PhysicalOptimizationConfig physicalOptimizationConfig) throws AlgebricksException {
+            PhysicalOptimizationConfig physicalOptimizationConfig, CompilerProperties compilerProperties)
+            throws AlgebricksException {
         final int frameSize = physicalOptimizationConfig.getFrameSize();
         final List<PlanStage> planStages = getStages(plan);
-        return getStageBasedRequiredCapacity(planStages, computationLocations.getLocations().length, frameSize);
+        return getStageBasedRequiredCapacity(planStages, computationLocations.getLocations().length, frameSize,
+                compilerProperties);
     }
 
     public static List<PlanStage> getStages(ILogicalPlan plan) throws AlgebricksException {
@@ -68,8 +72,9 @@
     }
 
     public static IClusterCapacity getStageBasedRequiredCapacity(List<PlanStage> stages, int computationLocations,
-            int frameSize) {
-        final OperatorResourcesComputer computer = new OperatorResourcesComputer(computationLocations, frameSize);
+            int frameSize, CompilerProperties compilerProperties) {
+        final OperatorResourcesComputer computer =
+                new OperatorResourcesComputer(computationLocations, frameSize, compilerProperties);
         final IClusterCapacity clusterCapacity = new ClusterCapacity();
         final long maxRequiredMemory = stages.stream().mapToLong(stage -> stage.getRequiredMemory(computer)).max()
                 .orElseThrow(IllegalStateException::new);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
index bc9e6d3..c09cb34 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
@@ -340,7 +340,7 @@
             }
         }
         final IClusterCapacity clusterCapacity =
-                ResourceUtils.getStageBasedRequiredCapacity(stages, PARALLELISM, FRAME_SIZE);
+                ResourceUtils.getStageBasedRequiredCapacity(stages, PARALLELISM, FRAME_SIZE, null);
         Assert.assertEquals(clusterCapacity.getAggregatedMemoryByteSize(), expectedMemory);
     }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java
index 218ef97..f8dbaad 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java
@@ -60,7 +60,7 @@
     public void failedJobPartitionRequestTest() throws Exception {
         final NodeControllerService nc1 = integrationUtil.ncs[0];
         final NodeControllerService nc2 = integrationUtil.ncs[1];
-        final JobId failedJob = new JobId(-1);
+        final JobId failedJob = new JobId(10);
         nc2.getPartitionManager().jobCompleted(failedJob, JobStatus.FAILURE);
         final NetworkAddress localNetworkAddress = nc2.getNetworkManager().getPublicNetworkAddress();
         final InetSocketAddress nc2Address =
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.01.ddl.sqlpp
new file mode 100644
index 0000000..8739d27
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.01.ddl.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+CREATE TYPE openType AS {id: int};
+CREATE DATASET large_ds(openType) primary key id;
+CREATE DATASET odd_ds(openType) primary key id;
+CREATE DATASET even_ds(openType) primary key id;
+CREATE DATASET empty_ds(openType) primary key id;
+CREATE DATASET one_item_ds(openType) primary key id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.02.update.sqlpp
new file mode 100644
index 0000000..5f3816b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.02.update.sqlpp
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+UPSERT INTO one_item_ds {"id": 1, "m": 9};
+
+UPSERT INTO large_ds
+(FROM range(0, 5) as v
+ SELECT v AS id, v AS m, v % 15 AS g
+);
+
+UPSERT INTO large_ds
+(FROM range(6, 1000000) as v
+ SELECT v AS id, round_half_to_even(random(8) * 100, 1) AS m, v % 15 AS g
+);
+
+UPSERT INTO large_ds( [{"id": 1000001, "m": null, "g": 1}, {"id": 1000002, "g": 7}] );
+
+UPSERT INTO odd_ds
+(FROM range(1, 15) as v
+ SELECT v AS id, 513 % v AS m
+);
+
+UPSERT INTO even_ds
+(FROM range(1, 14) as v
+ SELECT v AS id, 513 % v AS m
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.03.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.03.query.sqlpp
new file mode 100644
index 0000000..9668a90
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.03.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT median(m) AS med FROM odd_ds;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.04.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.04.query.sqlpp
new file mode 100644
index 0000000..5b52dcd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.04.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT median(m) AS med FROM even_ds;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.05.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.05.query.sqlpp
new file mode 100644
index 0000000..9773b9f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.05.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT median(v) AS med
+FROM [0,1,0,1,3,3,2,1,0,3,7,9,6,9,3] AS v;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.06.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.06.query.sqlpp
new file mode 100644
index 0000000..68b7be0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.06.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT median(v) AS med
+FROM [0,1,0,1,3,3,2,1,0,3,7,9,6,9] AS v;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.07.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.07.query.sqlpp
new file mode 100644
index 0000000..95dc135
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.07.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT median(m) AS med
+FROM empty_ds;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.08.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.08.query.sqlpp
new file mode 100644
index 0000000..daac19b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.08.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT median(m) AS med
+FROM one_item_ds;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.09.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.09.query.sqlpp
new file mode 100644
index 0000000..9927bab
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.09.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT median(m) AS med
+FROM large_ds;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.10.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.10.query.sqlpp
new file mode 100644
index 0000000..e4b1d11
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.10.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT g, median(m) AS med
+FROM large_ds
+GROUP BY g
+ORDER BY g;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.11.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.11.query.sqlpp
new file mode 100644
index 0000000..4880e98
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.11.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+SET `compiler.sortmemory` "130KB";
+SELECT median(m) AS med
+FROM large_ds;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.99.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.99.ddl.sqlpp
new file mode 100644
index 0000000..36b2bab
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql/median/median.99.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.03.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.03.adm
new file mode 100644
index 0000000..6d291ab
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.03.adm
@@ -0,0 +1 @@
+{ "med": 3.0 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.04.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.04.adm
new file mode 100644
index 0000000..9506c5b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.04.adm
@@ -0,0 +1 @@
+{ "med": 2.5 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.05.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.05.adm
new file mode 100644
index 0000000..6d291ab
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.05.adm
@@ -0,0 +1 @@
+{ "med": 3.0 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.06.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.06.adm
new file mode 100644
index 0000000..9506c5b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.06.adm
@@ -0,0 +1 @@
+{ "med": 2.5 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.07.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.07.adm
new file mode 100644
index 0000000..dd133f3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.07.adm
@@ -0,0 +1 @@
+{ "med": null }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.08.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.08.adm
new file mode 100644
index 0000000..a4a8fa5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.08.adm
@@ -0,0 +1 @@
+{ "med": 9.0 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.09.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.09.adm
new file mode 100644
index 0000000..8f23892
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.09.adm
@@ -0,0 +1 @@
+{ "med": 50.1 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.10.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.10.adm
new file mode 100644
index 0000000..d5f5227
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.10.adm
@@ -0,0 +1,15 @@
+{ "med": 50.0, "g": 0 }
+{ "med": 49.8, "g": 1 }
+{ "med": 50.0, "g": 2 }
+{ "med": 50.2, "g": 3 }
+{ "med": 50.3, "g": 4 }
+{ "med": 50.0, "g": 5 }
+{ "med": 50.1, "g": 6 }
+{ "med": 50.1, "g": 7 }
+{ "med": 50.2, "g": 8 }
+{ "med": 50.1, "g": 9 }
+{ "med": 50.3, "g": 10 }
+{ "med": 49.8, "g": 11 }
+{ "med": 49.9, "g": 12 }
+{ "med": 49.9, "g": 13 }
+{ "med": 50.2, "g": 14 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.11.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.11.adm
new file mode 100644
index 0000000..8f23892
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql/median/median.11.adm
@@ -0,0 +1 @@
+{ "med": 50.1 }
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
index edd25bf..042516e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
@@ -311,6 +311,11 @@
         return queryPlanShapeMode;
     }
 
+    public int getSortMemoryFrames() {
+        int numFrames = (int) getSortMemorySize() / getFrameSize();
+        return Math.max(numFrames, OptimizationConfUtil.MIN_FRAME_LIMIT_FOR_SORT);
+    }
+
     public boolean isColumnFilter() {
         return accessor.getBoolean(Option.COMPILER_COLUMN_FILTER);
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
index ad31281..c83b86e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
@@ -39,7 +39,7 @@
 
 public class OptimizationConfUtil {
 
-    private static final int MIN_FRAME_LIMIT_FOR_SORT = AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT;
+    public static final int MIN_FRAME_LIMIT_FOR_SORT = AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT;
     private static final int MIN_FRAME_LIMIT_FOR_GROUP_BY = AbstractGroupByPOperator.MIN_FRAME_LIMIT_FOR_GROUP_BY;
     private static final int MIN_FRAME_LIMIT_FOR_JOIN = AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN;
     private static final int MIN_FRAME_LIMIT_FOR_WINDOW = WindowPOperator.MIN_FRAME_LIMIT_FOR_WINDOW;
diff --git a/asterixdb/asterix-doc/src/main/markdown/builtins/9_aggregate_sql.md b/asterixdb/asterix-doc/src/main/markdown/builtins/9_aggregate_sql.md
index 0657fb0..755fd39 100644
--- a/asterixdb/asterix-doc/src/main/markdown/builtins/9_aggregate_sql.md
+++ b/asterixdb/asterix-doc/src/main/markdown/builtins/9_aggregate_sql.md
@@ -48,8 +48,8 @@
         * or, a `missing` value.
  * Return Value:
     * a `bigint` value representing the number of non-null and non-missing items in the given collection,
-    * `null` is returned if the input is `null` or `missing`,
-    * any other non-array and non-multiset input value will cause an error.
+    * `0` is returned if the input is `null` or `missing`,
+    * `0` is returned if the input is not an array or a multiset.
 
  * Example:
 
@@ -77,8 +77,8 @@
     * a `double` value representing the average of the non-null and non-missing numbers in the given collection,
     * `null` is returned if the input is `null` or `missing`,
     * `null` is returned if the given collection does not contain any non-null and non-missing items,
-    * any other non-array and non-multiset input value will cause a type error,
-    * any other non-numeric value in the input collection will cause a type error.
+    * `null` is returned if the input is not an array or a multiset,
+    * any other non-numeric value in the input collection will be ignored.
 
  * Example:
 
@@ -107,8 +107,8 @@
       items.
     * `null` is returned if the input is `null` or `missing`,
     * `null` is returned if the given collection does not contain any non-null and non-missing items,
-    * any other non-array and non-multiset input value will cause a type error,
-    * any other non-numeric value in the input collection will cause a type error.
+    * `null` is returned if the input is not an array or a multiset,
+    * any other non-numeric value in the input collection will be ignored.
 
  * Example:
 
@@ -136,8 +136,8 @@
       type promotion order (`tinyint`-> `smallint`->`integer`->`bigint`->`float`->`double`) among numeric items.
     * `null` is returned if the input is `null` or `missing`,
     * `null` is returned if the given collection does not contain any non-null and non-missing items,
-    * multiple incomparable items in the input array or multiset will cause a type error,
-    * any other non-array and non-multiset input value will cause a type error.
+    * `null` is returned if there are incomparable items in the input array or multiset,
+    * `null` is returned if the input is not an array or a multiset.
 
  * Example:
 
@@ -165,8 +165,8 @@
       type promotion order (`tinyint`-> `smallint`->`integer`->`bigint`->`float`->`double`) among numeric items.
     * `null` is returned if the input is `null` or `missing`,
     * `null` is returned if the given collection does not contain any non-null and non-missing items,
-    * multiple incomparable items in the input array or multiset will cause a type error,
-    * any other non-array and non-multiset input value will cause a type error.
+    * `null` is returned if there are incomparable items in the input array or multiset,
+    * `null` is returned if the input is not an array or a multiset.
 
  * Example:
 
@@ -177,6 +177,44 @@
         3.4
 
 
+### array_median ###
+ * Syntax:
+
+        array_median(num_collection)
+
+ * Gets the median value of the numeric items in the given collection, ignoring null, missing, and non-numeric items.
+
+   The function starts by sorting the numeric items.
+
+     - If there is an odd number of numeric items, the function returns the item that is exactly in the middle of the range: that is, it has the same number of items before and after.
+     - If there is an even number of numeric items, the function returns the mean of the two items that are exactly in the middle of the range.
+
+ * Note: You cannot use the `DISTINCT` keyword with this function, or with the `median` aggregation pseudo-function.
+   The `median` aggregation pseudo-function does support the `FILTER` clause.
+   There is no `strict_median` function corresponding to this function.
+ * Arguments:
+    * `num_collection` could be:
+        * an `array` or `multiset` of numbers,
+        * or, a `null` value,
+        * or, a `missing` value.
+ * Clauses: When used as a window function, this function supports the [Window Partition Clause](manual.html#Window_partition_clause), but not the [Window Order Clause](manual.html#Window_order_clause) or the [Window Frame Clause](manual.html#Window_frame_clause).
+ * Return Value:
+    * a `double` value representing the median of the numeric items in the given collection,
+    * `null` is returned if the input is `null` or `missing`,
+    * `null` is returned if the given collection does not contain any numeric items,
+    * `null` is returned if the input is not an array or a multiset,
+    * any other non-numeric value in the input collection will be ignored.
+ * Example:
+
+       { "v1": array_median( [1.2, 2.3, 3.4, 0, null, missing],
+         "v2": array_median( [1.2, 2.3, 3.4, 4.5, 0, null, missing] ) };
+
+ * The expected result is:
+
+       { "v1": 1.75,
+         "v2": 2.3 }
+
+
 ### array_stddev_samp ###
 
  * Syntax:
@@ -193,7 +231,7 @@
     * a `double` value representing the sample standard deviation of the non-null and non-missing numbers in the given collection,
     * `null` is returned if the input is `null` or `missing`,
     * `null` is returned if the given collection does not contain any non-null and non-missing items,
-    * any other non-array and non-multiset input value will cause a type error,
+    * `null` is returned if the input is not an array or a multiset,
     * any other non-numeric value in the input collection will cause a type error.
 
  * Example:
@@ -220,7 +258,7 @@
     * a `double` value representing the population standard deviation of the non-null and non-missing numbers in the given collection,
     * `null` is returned if the input is `null` or `missing`,
     * `null` is returned if the given collection does not contain any non-null and non-missing items,
-    * any other non-array and non-multiset input value will cause a type error,
+    * `null` is returned if the input is not an array or a multiset,
     * any other non-numeric value in the input collection will cause a type error.
 
  * Example:
@@ -247,7 +285,7 @@
     * a `double` value representing the sample variance of the non-null and non-missing numbers in the given collection,
     * `null` is returned if the input is `null` or `missing`,
     * `null` is returned if the given collection does not contain any non-null and non-missing items,
-    * any other non-array and non-multiset input value will cause a type error,
+    * `null` is returned if the input is not an array or a multiset,
     * any other non-numeric value in the input collection will cause a type error.
 
  * Example:
@@ -274,7 +312,7 @@
     * a `double` value representing the population variance of the non-null and non-missing numbers in the given collection,
     * `null` is returned if the input is `null` or `missing`,
     * `null` is returned if the given collection does not contain any non-null and non-missing items,
-    * any other non-array and non-multiset input value will cause a type error,
+    * `null` is returned if the input is not an array or a multiset,
     * any other non-numeric value in the input collection will cause a type error.
 
  * Example:
@@ -301,7 +339,7 @@
     * a `double` value representing the skewness of the non-null and non-missing numbers in the given collection,
     * `null` is returned if the input is `null` or `missing`,
     * `null` is returned if the given collection does not contain any non-null and non-missing items,
-    * any other non-array and non-multiset input value will cause a type error,
+    * `null` is returned if the input is not an array or a multiset,
     * any other non-numeric value in the input collection will cause a type error.
 
  * Example:
@@ -328,7 +366,7 @@
     * a `double` value representing the kurtosis from a normal distribution of the non-null and non-missing numbers in the given collection,
     * `null` is returned if the input is `null` or `missing`,
     * `null` is returned if the given collection does not contain any non-null and non-missing items,
-    * any other non-array and non-multiset input value will cause a type error,
+    * `null` is returned if the input is not an array or a multiset,
     * any other non-numeric value in the input collection will cause a type error.
 
  * Example:
@@ -352,7 +390,8 @@
         * or a `missing` value.
  * Return Value:
     * a `bigint` value representing the number of items in the given collection,
-    * `null` is returned if the input is `null` or `missing`.
+    * `0` is returned if the input is `null` or `missing`,
+    * `0` is returned if the input is not an array or a multiset.
 
  * Example:
 
@@ -377,7 +416,8 @@
     * a `double` value representing the average of the numbers in the given collection,
     * `null` is returned if the input is `null` or `missing`,
     * `null` is returned if there is a `null` or `missing` in the input collection,
-    * any other non-numeric value in the input collection will cause a type error.
+    * `null` is returned if the input is not an array or a multiset,
+    * `null` is returned if there are any other non-numeric values in the input collection.
 
  * Example:
 
@@ -404,7 +444,8 @@
       items.
     * `null` is returned if the input is `null` or `missing`,
     * `null` is returned if there is a `null` or `missing` in the input collection,
-    * any other non-numeric value in the input collection will cause a type error.
+    * `null` is returned if the input is not an array or a multiset,
+    * `null` is returned if there are any other non-numeric values in the input collection.
 
  * Example:
 
@@ -431,8 +472,8 @@
       (`tinyint`-> `smallint`->`integer`->`bigint`->`float`->`double`) among numeric items.
     * `null` is returned if the input is `null` or `missing`,
     * `null` is returned if there is a `null` or `missing` in the input collection,
-    * multiple incomparable items in the input array or multiset will cause a type error,
-    * any other non-array and non-multiset input value will cause a type error.
+    * `null` is returned if there are incomparable items in the input array or multiset,
+    * `null` is returned if the input is not an array or a multiset.
 
  * Example:
 
@@ -460,8 +501,8 @@
       (`tinyint`-> `smallint`->`integer`->`bigint`->`float`->`double`) among numeric items.
     * `null` is returned if the input is `null` or `missing`,
     * `null` is returned if there is a `null` or `missing` in the input collection,
-    * multiple incomparable items in the input array or multiset will cause a type error,
-    * any other non-array and non-multiset input value will cause a type error.
+    * `null` is returned if there are incomparable items in the input array or multiset,
+    * `null` is returned if the input is not an array or a multiset.
 
  * Example:
 
@@ -536,6 +577,7 @@
     * a `double` value representing the sample variance of the numbers in the given collection,
     * `null` is returned if the input is `null` or `missing`,
     * `null` is returned if there is a `null` or `missing` in the input collection,
+    * `null` is returned if the input is not an array or a multiset,
     * any other non-numeric value in the input collection will cause a type error.
 
  * Example:
@@ -561,6 +603,7 @@
     * a `double` value representing the population variance of the numbers in the given collection,
     * `null` is returned if the input is `null` or `missing`,
     * `null` is returned if there is a `null` or `missing` in the input collection,
+    * `null` is returned if the input is not an array or a multiset,
     * any other non-numeric value in the input collection will cause a type error.
 
  * Example:
@@ -586,6 +629,7 @@
     * a `double` value representing the skewness of the numbers in the given collection,
     * `null` is returned if the input is `null` or `missing`,
     * `null` is returned if there is a `null` or `missing` in the input collection,
+    * `null` is returned if the input is not an array or a multiset,
     * any other non-numeric value in the input collection will cause a type error.
 
  * Example:
@@ -611,6 +655,7 @@
     * a `double` value representing the kurtosis from a normal distribution of the numbers in the given collection,
     * `null` is returned if the input is `null` or `missing`,
     * `null` is returned if there is a `null` or `missing` in the input collection,
+    * `null` is returned if the input is not an array or a multiset,
     * any other non-numeric value in the input collection will cause a type error.
 
  * Example:
diff --git a/asterixdb/asterix-doc/src/main/markdown/sqlpp/3_query.md b/asterixdb/asterix-doc/src/main/markdown/sqlpp/3_query.md
index b8dc3bf..06a49ef 100644
--- a/asterixdb/asterix-doc/src/main/markdown/sqlpp/3_query.md
+++ b/asterixdb/asterix-doc/src/main/markdown/sqlpp/3_query.md
@@ -1241,20 +1241,21 @@
 
 For example, `SELECT COUNT(*) FROM customers` simply returns the total number of customers, whereas `SELECT COUNT(rating) FROM customers` returns the number of customers who have known ratings (that is, their ratings are not `null` or `missing`).
 
-Because the aggregation pseudo-functions sometimes restructure their operands, they can be used only in query blocks where (explicit or implicit) grouping is being done. Therefore the pseudo-functions cannot operate directly on arrays or multisets. For operating directly on JSON collections, SQL++ provides a set of ordinary functions for computing aggregations. Each ordinary aggregation function (except the ones corresponding to `COUNT` and `ARRAY_AGG`) has two versions: one that ignores `null` and `missing` values and one that returns `null` if a `null` or `missing` value is encountered anywhere in the collection. The names of the aggregation functions are as follows:
+Because the aggregation pseudo-functions sometimes restructure their operands, they can be used only in query blocks where (explicit or implicit) grouping is being done. Therefore the pseudo-functions cannot operate directly on arrays or multisets. For operating directly on JSON collections, SQL++ provides a set of ordinary functions for computing aggregations. Each ordinary aggregation function (except as noted below) has two versions: one that ignores `null` and `missing` values, and one that returns `null` if a `null` or `missing` value is encountered anywhere in the collection. The names of the aggregation functions are as follows:
 
 | Aggregation pseudo-function; operates on groups only | Ordinary function: Ignores NULL or MISSING values | Ordinary function: Returns NULL if NULL or MISSING are encountered|
 |----------|----------|--------|
-|SUM| ARRAY_SUM| STRICT_SUM |
-| AVG |ARRAY_MAX| STRICT_MAX |
-| MAX | ARRAY_MIN| STRICT_MIN |
-| MIN | ARRAY_AVG| STRICT_AVG |
+| SUM | ARRAY_SUM| STRICT_SUM |
+| AVG | ARRAY_AVG| STRICT_AVG |
+| MAX | ARRAY_MAX| STRICT_MAX |
+| MIN | ARRAY_MIN| STRICT_MIN |
 | COUNT |ARRAY_COUNT|STRICT_COUNT (see exception below) |
+| MEDIAN | ARRAY_MEDIAN | |
 |STDDEV_SAMP|ARRAY_STDDEV_SAMP| STRICT_STDDEV_SAMP |
 |STDDEV_POP|ARRAY_STDDEV_POP| STRICT_STDDEV_POP |
 |VAR_SAMP|ARRAY_VAR_SAMP| STRICT_VAR_SAMP |
 |VAR_POP|ARRAY_VAR_POP| STRICT_VAR_POP |
-|SKEWENESS|ARRAY_SKEWNESS| STRICT_SKEWNESS |
+|SKEWNESS|ARRAY_SKEWNESS| STRICT_SKEWNESS |
 |KURTOSIS|ARRAY_KURTOSIS| STRICT_KURTOSIS |
 | |ARRAY_AGG| |
 
diff --git a/asterixdb/asterix-doc/src/main/markdown/sqlpp/4_windowfunctions.md b/asterixdb/asterix-doc/src/main/markdown/sqlpp/4_windowfunctions.md
index d6e40e7..c41e8a3 100644
--- a/asterixdb/asterix-doc/src/main/markdown/sqlpp/4_windowfunctions.md
+++ b/asterixdb/asterix-doc/src/main/markdown/sqlpp/4_windowfunctions.md
@@ -108,7 +108,7 @@
 
 The *window order clause* determines how tuples are ordered within each partition. The window function works on tuples in the order specified by this clause.
 
-This clause may be used with any [window function](builtins.html#WindowFunctions), or any [aggregate function](builtins.html#AggregateFunctions) used as a window function.
+This clause may be used with any [window function](builtins.html#WindowFunctions), and most [aggregate functions](builtins.html#AggregateFunctions) &mdash; refer to the descriptions of individual functions for more details.
 
 This clause is optional. If omitted, all tuples are considered peers, i.e. their order is tied. When tuples in the window partition are tied, each window function behaves differently.
 
@@ -130,7 +130,7 @@
 ##### WindowFrameClause
 ![](../images/diagrams/WindowFrameClause.png)
 
-The *window frame clause* defines the window frame. It can be used with all [aggregate functions](builtins.html#AggregateFunctions) and some [window functions](builtins.html#WindowFunctions) &mdash; refer to the descriptions of individual functions for more details.  It is optional and allowed only when the [window order clause](#Window_order_clause) is present.
+The *window frame clause* defines the window frame. It can be used with some [window functions](builtins.html#WindowFunctions) and most [aggregate functions](builtins.html#AggregateFunctions) &mdash; refer to the descriptions of individual functions for more details. It is optional and allowed only when the [window order clause](#Window_order_clause) is present.
 
 * If this clause is omitted and there is no [window order clause](#Window_order_clause), the window frame is the entire partition.
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index f2ede19..7a1bdad 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -38,6 +38,7 @@
 import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.util.LogRedactionUtil;
 
+import software.amazon.awssdk.core.ResponseInputStream;
 import software.amazon.awssdk.core.exception.SdkException;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
@@ -49,6 +50,7 @@
     // Configuration
     private final String bucket;
     private final S3Client s3Client;
+    private ResponseInputStream<?> s3InStream;
     private static final int MAX_RETRIES = 5; // We will retry 5 times in case of internal error from AWS S3 service
 
     public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths,
@@ -84,7 +86,8 @@
         int retries = 0;
         while (retries < MAX_RETRIES) {
             try {
-                in = s3Client.getObject(request);
+                s3InStream = s3Client.getObject(request);
+                in = s3InStream;
                 break;
             } catch (NoSuchKeyException ex) {
                 LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(request.key()) + " was not found in bucket "
@@ -116,6 +119,9 @@
     @Override
     public void close() throws IOException {
         if (in != null) {
+            if (s3InStream != null) {
+                s3InStream.abort();
+            }
             CleanupUtils.close(in, null);
         }
         if (s3Client != null) {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index 255af53..c65cf38 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -95,6 +95,7 @@
 import org.apache.asterix.om.typecomputer.impl.InjectFailureTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.Int64ArrayToStringTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.LocalAvgTypeComputer;
+import org.apache.asterix.om.typecomputer.impl.LocalMedianTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.LocalSingleVarStatisticsTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.MinMaxAggTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.MissingIfTypeComputer;
@@ -429,6 +430,7 @@
     public static final FunctionIdentifier GLOBAL_AVG = FunctionConstants.newAsterix("agg-global-avg", 1);
     public static final FunctionIdentifier INTERMEDIATE_AVG = FunctionConstants.newAsterix("agg-intermediate-avg", 1);
     public static final FunctionIdentifier LOCAL_AVG = FunctionConstants.newAsterix("agg-local-avg", 1);
+    public static final FunctionIdentifier MEDIAN = FunctionConstants.newAsterix("agg-median", 1);
     public static final FunctionIdentifier FIRST_ELEMENT = FunctionConstants.newAsterix("agg-first-element", 1);
     public static final FunctionIdentifier LOCAL_FIRST_ELEMENT =
             FunctionConstants.newAsterix("agg-local-first-element", 1);
@@ -481,6 +483,7 @@
     public static final FunctionIdentifier SCALAR_SUM = FunctionConstants.newAsterix("sum", 1);
     public static final FunctionIdentifier SCALAR_MAX = FunctionConstants.newAsterix("max", 1);
     public static final FunctionIdentifier SCALAR_MIN = FunctionConstants.newAsterix("min", 1);
+    public static final FunctionIdentifier SCALAR_MEDIAN = FunctionConstants.newAsterix("median", 1);
     public static final FunctionIdentifier SCALAR_FIRST_ELEMENT = FunctionConstants.newAsterix("first-element", 1);
     public static final FunctionIdentifier SCALAR_LOCAL_FIRST_ELEMENT =
             FunctionConstants.newAsterix("local-first-element", 1);
@@ -605,6 +608,11 @@
     public static final FunctionIdentifier GLOBAL_SQL_MIN = FunctionConstants.newAsterix("agg-global-sql-min", 1);
     public static final FunctionIdentifier GLOBAL_SQL_AVG = FunctionConstants.newAsterix("agg-global-sql-avg", 1);
     public static final FunctionIdentifier LOCAL_SQL_AVG = FunctionConstants.newAsterix("agg-local-sql-avg", 1);
+    public static final FunctionIdentifier SQL_MEDIAN = FunctionConstants.newAsterix("agg-sql-median", 1);
+    public static final FunctionIdentifier LOCAL_SQL_MEDIAN = FunctionConstants.newAsterix("agg-local-sql-median", 1);
+    public static final FunctionIdentifier INTERMEDIATE_SQL_MEDIAN =
+            FunctionConstants.newAsterix("agg-intermediate-sql-median", 1);
+    public static final FunctionIdentifier GLOBAL_SQL_MEDIAN = FunctionConstants.newAsterix("agg-global-sql-median", 1);
     public static final FunctionIdentifier SQL_STDDEV_SAMP = FunctionConstants.newAsterix("agg-sql-stddev_samp", 1);
     public static final FunctionIdentifier INTERMEDIATE_SQL_STDDEV_SAMP =
             FunctionConstants.newAsterix("intermediate-agg-sql-stddev_samp", 1);
@@ -659,6 +667,7 @@
     public static final FunctionIdentifier SCALAR_SQL_SUM = FunctionConstants.newAsterix("sql-sum", 1);
     public static final FunctionIdentifier SCALAR_SQL_MAX = FunctionConstants.newAsterix("sql-max", 1);
     public static final FunctionIdentifier SCALAR_SQL_MIN = FunctionConstants.newAsterix("sql-min", 1);
+    public static final FunctionIdentifier SCALAR_SQL_MEDIAN = FunctionConstants.newAsterix("sql-median", 1);
     public static final FunctionIdentifier SCALAR_SQL_STDDEV_SAMP = FunctionConstants.newAsterix("sql-stddev_samp", 1);
     public static final FunctionIdentifier SCALAR_SQL_STDDEV_POP = FunctionConstants.newAsterix("sql-stddev_pop", 1);
     public static final FunctionIdentifier SCALAR_SQL_VAR_SAMP = FunctionConstants.newAsterix("sql-var_samp", 1);
@@ -1714,6 +1723,13 @@
         addPrivateFunction(GLOBAL_SQL_UNION_MBR, ARectangleTypeComputer.INSTANCE, true);
         addFunction(SCALAR_SQL_UNION_MBR, ARectangleTypeComputer.INSTANCE, true);
 
+        addFunction(SQL_MEDIAN, NullableDoubleTypeComputer.INSTANCE, true);
+        addFunction(SCALAR_MEDIAN, NullableDoubleTypeComputer.INSTANCE, true);
+        addFunction(SCALAR_SQL_MEDIAN, NullableDoubleTypeComputer.INSTANCE, true);
+        addPrivateFunction(LOCAL_SQL_MEDIAN, LocalMedianTypeComputer.INSTANCE, true);
+        addPrivateFunction(INTERMEDIATE_SQL_MEDIAN, LocalMedianTypeComputer.INSTANCE, true);
+        addPrivateFunction(GLOBAL_SQL_MEDIAN, NullableDoubleTypeComputer.INSTANCE, true);
+
         addPrivateFunction(SERIAL_AVG, NullableDoubleTypeComputer.INSTANCE, true);
         addPrivateFunction(SERIAL_COUNT, AInt64TypeComputer.INSTANCE, true);
         addPrivateFunction(SERIAL_GLOBAL_AVG, NullableDoubleTypeComputer.INSTANCE, true);
@@ -2745,6 +2761,25 @@
         addDistinctAgg(SQL_SUM_DISTINCT, SQL_SUM);
         addScalarAgg(SQL_SUM_DISTINCT, SCALAR_SQL_SUM_DISTINCT);
 
+        // SQL MEDIAN
+        addAgg(SQL_MEDIAN);
+        addAgg(LOCAL_SQL_MEDIAN);
+        addAgg(GLOBAL_SQL_MEDIAN);
+
+        addLocalAgg(SQL_MEDIAN, LOCAL_SQL_MEDIAN);
+
+        addIntermediateAgg(SQL_MEDIAN, INTERMEDIATE_SQL_MEDIAN);
+        addIntermediateAgg(LOCAL_SQL_MEDIAN, INTERMEDIATE_SQL_MEDIAN);
+        addIntermediateAgg(GLOBAL_SQL_MEDIAN, INTERMEDIATE_SQL_MEDIAN);
+
+        addGlobalAgg(SQL_MEDIAN, GLOBAL_SQL_MEDIAN);
+
+        addScalarAgg(MEDIAN, SCALAR_MEDIAN);
+        addScalarAgg(SQL_MEDIAN, SCALAR_SQL_MEDIAN);
+
+        registerAggFunctionProperties(SCALAR_SQL_MEDIAN, AggregateFunctionProperty.NO_FRAME_CLAUSE,
+                AggregateFunctionProperty.NO_ORDER_CLAUSE);
+
         // SPATIAL AGGREGATES
 
         addAgg(ST_UNION_AGG);
@@ -2779,6 +2814,13 @@
     interface BuiltinFunctionProperty {
     }
 
+    public enum AggregateFunctionProperty implements BuiltinFunctionProperty {
+        /** Whether the order clause is prohibited */
+        NO_ORDER_CLAUSE,
+        /** Whether the frame clause is prohibited */
+        NO_FRAME_CLAUSE
+    }
+
     public enum WindowFunctionProperty implements BuiltinFunctionProperty {
         /**
          * Whether the order clause is prohibited
@@ -2959,6 +3001,11 @@
         registeredFunctions.put(functionInfo.getFunctionIdentifier(), functionInfo);
     }
 
+    private static <T extends Enum<T> & BuiltinFunctionProperty> void registerAggFunctionProperties(
+            FunctionIdentifier fid, AggregateFunctionProperty... properties) {
+        registerFunctionProperties(fid, AggregateFunctionProperty.class, properties);
+    }
+
     private static <T extends Enum<T> & BuiltinFunctionProperty> void registerFunctionProperties(FunctionIdentifier fid,
             Class<T> propertyClass, T[] properties) {
         if (properties == null) {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/LocalMedianTypeComputer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/LocalMedianTypeComputer.java
new file mode 100644
index 0000000..3411ef4
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/LocalMedianTypeComputer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.typecomputer.impl;
+
+import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+
+public class LocalMedianTypeComputer implements IResultTypeComputer {
+
+    public static final LocalMedianTypeComputer INSTANCE = new LocalMedianTypeComputer();
+
+    public static final ARecordType REC_TYPE = new ARecordType(null,
+            new String[] { "count", "handle", "address", "port" },
+            new IAType[] { BuiltinType.AINT64, BuiltinType.AINT64, BuiltinType.ASTRING, BuiltinType.AINT32 }, false);
+
+    @Override
+    public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
+            IMetadataProvider<?, ?> metadataProvider) throws AlgebricksException {
+        return REC_TYPE;
+    }
+}
diff --git a/asterixdb/asterix-om/src/test/java/org/apache/asterix/test/om/typecomputer/TypeComputerTest.java b/asterixdb/asterix-om/src/test/java/org/apache/asterix/test/om/typecomputer/TypeComputerTest.java
index a857981..07f2021 100644
--- a/asterixdb/asterix-om/src/test/java/org/apache/asterix/test/om/typecomputer/TypeComputerTest.java
+++ b/asterixdb/asterix-om/src/test/java/org/apache/asterix/test/om/typecomputer/TypeComputerTest.java
@@ -36,6 +36,7 @@
 import org.apache.asterix.om.typecomputer.impl.ClosedRecordConstructorResultType;
 import org.apache.asterix.om.typecomputer.impl.InjectFailureTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.LocalAvgTypeComputer;
+import org.apache.asterix.om.typecomputer.impl.LocalMedianTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.LocalSingleVarStatisticsTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.NullableDoubleTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.OpenRecordConstructorResultType;
@@ -190,6 +191,7 @@
         differentBehaviorFunctions.add(RecordRemoveFieldsTypeComputer.class.getSimpleName());
         differentBehaviorFunctions.add(ClosedRecordConstructorResultType.class.getSimpleName());
         differentBehaviorFunctions.add(LocalAvgTypeComputer.class.getSimpleName());
+        differentBehaviorFunctions.add(LocalMedianTypeComputer.class.getSimpleName());
         differentBehaviorFunctions.add(BooleanOnlyTypeComputer.class.getSimpleName());
         //        differentBehaviorFunctions.add("AMissingTypeComputer"); // TODO What type computer is this?
         differentBehaviorFunctions.add(NullableDoubleTypeComputer.class.getSimpleName());
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/scalar/ScalarSqlMedianAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/scalar/ScalarSqlMedianAggregateDescriptor.java
new file mode 100644
index 0000000..5c83a5e
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/scalar/ScalarSqlMedianAggregateDescriptor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.scalar;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.om.functions.IFunctionTypeInferer;
+import org.apache.asterix.runtime.aggregates.std.SqlMedianAggregateDescriptor;
+import org.apache.asterix.runtime.functions.FunctionTypeInferers;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarSqlMedianAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+
+        @Override
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new ScalarSqlMedianAggregateDescriptor();
+        }
+
+        @Override
+        public IFunctionTypeInferer createFunctionTypeInferer() {
+            return FunctionTypeInferers.MEDIAN_MEMORY;
+        }
+    };
+
+    private ScalarSqlMedianAggregateDescriptor() {
+        super(SqlMedianAggregateDescriptor.FACTORY);
+    }
+
+    @Override
+    public void setImmutableStates(Object... states) {
+        super.setImmutableStates(states);
+        aggFuncDesc.setImmutableStates(states);
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.SCALAR_SQL_MEDIAN;
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractLocalMedianAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractLocalMedianAggregateFunction.java
new file mode 100644
index 0000000..8ff7ed5
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractLocalMedianAggregateFunction.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.aggregates.std;
+
+import java.util.List;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.comm.IFrameTupleAppender;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.partitions.JobFileState;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
+import org.apache.hyracks.dataflow.std.sort.Algorithm;
+import org.apache.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
+import org.apache.hyracks.dataflow.std.sort.ExternalSortRunMerger;
+import org.apache.hyracks.dataflow.std.sort.ISorter;
+
+public abstract class AbstractLocalMedianAggregateFunction extends AbstractMedianAggregateFunction {
+
+    protected final ExternalSortRunGenerator runsGenerator;
+    protected final IFrameTupleAppender appender;
+    private final ArrayTupleBuilder tupleBuilder;
+    private final int numFrames;
+    private ExternalSortRunMerger runsMerger;
+
+    public AbstractLocalMedianAggregateFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context,
+            SourceLocation sourceLoc, int numFrames) throws HyracksDataException {
+        super(args, context, sourceLoc);
+        this.numFrames = numFrames;
+        appender = new FrameTupleAppender(frame);
+        tupleBuilder = new ArrayTupleBuilder(1);
+        runsGenerator = new ExternalSortRunGenerator(context.getTaskContext(), new int[] { 0 },
+                new INormalizedKeyComputerFactory[] { doubleNkComputerFactory },
+                new IBinaryComparatorFactory[] { doubleComparatorFactory }, recordDesc, Algorithm.MERGE_SORT,
+                EnumFreeSlotPolicy.LAST_FIT, numFrames, Integer.MAX_VALUE);
+
+    }
+
+    @Override
+    public void init() throws HyracksDataException {
+        super.init();
+        appender.reset(frame, true);
+        runsGenerator.open();
+        runsGenerator.getSorter().reset();
+    }
+
+    protected void processDataValue(IFrameTupleReference tuple) throws HyracksDataException {
+        eval.evaluate(tuple, inputVal);
+        byte[] data = inputVal.getByteArray();
+        int start = inputVal.getStartOffset();
+        ATypeTag tag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(data[start]);
+        if (ATypeHierarchy.getTypeDomain(tag) == ATypeHierarchy.Domain.NUMERIC) {
+            count++;
+            aDouble.setValue(ATypeHierarchy.getDoubleValue(MEDIAN, 0, data, start));
+            tupleBuilder.reset();
+            tupleBuilder.addField(doubleSerde, aDouble);
+            FrameUtils.appendToWriter(runsGenerator, appender, tupleBuilder.getFieldEndOffsets(),
+                    tupleBuilder.getByteArray(), 0, tupleBuilder.getSize());
+        }
+    }
+
+    protected void finishLocalPartial(IPointable result) throws HyracksDataException {
+        if (count == 0) {
+            setPartialResult(result, -1, "", -1);
+            return;
+        }
+        if (appender.getTupleCount() > 0) {
+            appender.write(runsGenerator, true);
+        }
+        // close to sort the in-memory data or write out sorted data to run files
+        runsGenerator.close();
+
+        IHyracksTaskContext taskCtx = ctx.getTaskContext();
+        IHyracksJobletContext jobletCtx = taskCtx.getJobletContext();
+        NetworkAddress netAddress = ((NodeControllerService) jobletCtx.getServiceContext().getControllerService())
+                .getNetworkManager().getPublicNetworkAddress();
+        FileReference fileRef = writeRunFile(taskCtx, jobletCtx);
+        long fileId = jobletCtx.nextUniqueId();
+        taskCtx.setStateObject(new JobFileState(fileRef, jobletCtx.getJobId(), fileId));
+        setPartialResult(result, fileId, netAddress.getAddress(), netAddress.getPort());
+    }
+
+    private FileReference writeRunFile(IHyracksTaskContext taskCtx, IHyracksJobletContext jobletCtx)
+            throws HyracksDataException {
+        List<GeneratedRunFileReader> runs = runsGenerator.getRuns();
+        FileReference managedFile;
+        if (runs.isEmpty()) {
+            managedFile = jobletCtx.createManagedWorkspaceFile(MEDIAN);
+            writeMemoryDataToRunFile(managedFile, taskCtx);
+        } else if (runs.size() == 1) {
+            managedFile = runs.get(0).getFile();
+        } else {
+            managedFile = jobletCtx.createManagedWorkspaceFile(MEDIAN);
+            mergeRunsToRunFile(managedFile, taskCtx, runs);
+        }
+        return managedFile;
+    }
+
+    private void mergeRunsToRunFile(FileReference managedFile, IHyracksTaskContext taskCtx,
+            List<GeneratedRunFileReader> runs) throws HyracksDataException {
+        createOrResetRunsMerger(runs);
+        RunFileWriter runFileWriter = new RunFileWriter(managedFile, taskCtx.getIoManager());
+        IFrameWriter wrappingWriter = runsMerger.prepareFinalMergeResultWriter(runFileWriter);
+        try {
+            wrappingWriter.open();
+            runsMerger.process(wrappingWriter);
+        } finally {
+            wrappingWriter.close();
+        }
+    }
+
+    protected RunFileWriter writeMemoryDataToRunFile(FileReference managedFile, IHyracksTaskContext taskCtx)
+            throws HyracksDataException {
+        RunFileWriter runFileWriter = new RunFileWriter(managedFile, taskCtx.getIoManager());
+        try {
+            runFileWriter.open();
+            ISorter sorter = runsGenerator.getSorter();
+            if (sorter.hasRemaining()) {
+                sorter.flush(runFileWriter);
+            }
+        } finally {
+            runFileWriter.close();
+        }
+        return runFileWriter;
+    }
+
+    private void createOrResetRunsMerger(List<GeneratedRunFileReader> runs) {
+        if (runsMerger == null) {
+            IBinaryComparator[] comparators =
+                    new IBinaryComparator[] { doubleComparatorFactory.createBinaryComparator() };
+            INormalizedKeyComputer nmkComputer = doubleNkComputerFactory.createNormalizedKeyComputer();
+            runsMerger = new ExternalSortRunMerger(ctx.getTaskContext(), runs, new int[] { 0 }, comparators,
+                    nmkComputer, recordDesc, numFrames, Integer.MAX_VALUE);
+        } else {
+            runsMerger.reset(runs);
+        }
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMedianAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMedianAggregateFunction.java
new file mode 100644
index 0000000..207eaa3
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMedianAggregateFunction.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.aggregates.std;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.NormalizedKeyComputerFactoryProvider;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.TypeTraitProvider;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AMutableDouble;
+import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.asterix.om.base.AMutableInt64;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.typecomputer.impl.LocalMedianTypeComputer;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
+import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameReader;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.comm.channels.FileNetworkInputChannel;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.net.NetworkManager;
+import org.apache.hyracks.control.nc.partitions.JobFileState;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.collectors.InputChannelFrameReader;
+import org.apache.hyracks.dataflow.std.sort.RunMergingFrameReader;
+import org.apache.hyracks.util.string.UTF8StringUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public abstract class AbstractMedianAggregateFunction extends AbstractAggregateFunction {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    protected static final String MEDIAN = "median";
+    private static final int COUNT_FIELD_ID = 0;
+    private static final int HANDLE_FIELD_ID = 1;
+    private static final int ADDRESS_FIELD_ID = 2;
+    private static final int PORT_FIELD_ID = 3;
+
+    private final ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<AString> stringSerde =
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING);
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<AInt64> longSerde =
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<AInt32> intSerde =
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+    @SuppressWarnings("unchecked")
+    protected final ISerializerDeserializer<ADouble> doubleSerde =
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+    protected final IBinaryComparatorFactory doubleComparatorFactory =
+            BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(ATypeTag.DOUBLE, true);
+    protected final INormalizedKeyComputerFactory doubleNkComputerFactory =
+            NormalizedKeyComputerFactoryProvider.INSTANCE.getNormalizedKeyComputerFactory(BuiltinType.ADOUBLE, true);
+    protected final RecordDescriptor recordDesc = new RecordDescriptor(new ISerializerDeserializer[] { doubleSerde },
+            new ITypeTraits[] { TypeTraitProvider.INSTANCE.getTypeTrait(BuiltinType.ADOUBLE) });
+
+    protected final AMutableString aString = new AMutableString("");
+    protected final AMutableInt64 aInt64 = new AMutableInt64(0);
+    protected final AMutableInt32 aInt32 = new AMutableInt32(0);
+    protected final AMutableDouble aDouble = new AMutableDouble(0);
+    protected final IPointable inputVal = new VoidPointable();
+    private final FrameTupleReference ftr = new FrameTupleReference();
+    private final FrameTupleAccessor fta = new FrameTupleAccessor(recordDesc);
+    protected final List<IFrameReader> readers = new ArrayList<>();
+
+    protected final IScalarEvaluator eval;
+    protected final IEvaluatorContext ctx;
+    protected final IFrame frame;
+    protected long count;
+    private List<IFrame> inFrames;
+    private List<PartialResult> partialResults;
+    private RecordBuilder recBuilder;
+
+    public AbstractMedianAggregateFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context,
+            SourceLocation sourceLoc) throws HyracksDataException {
+        super(sourceLoc);
+        ctx = context;
+        eval = args[0].createScalarEvaluator(context);
+        frame = new VSizeFrame(context.getTaskContext());
+    }
+
+    @Override
+    public void init() throws HyracksDataException {
+        if (partialResults == null) {
+            partialResults = new ArrayList<>();
+        }
+        if (recBuilder == null) {
+            recBuilder = new RecordBuilder();
+            recBuilder.reset(LocalMedianTypeComputer.REC_TYPE);
+        }
+        count = 0;
+        partialResults.clear();
+        recBuilder.init();
+    }
+
+    protected void processPartialResults(IFrameTupleReference tuple) throws HyracksDataException {
+        eval.evaluate(tuple, inputVal);
+        byte[] serBytes = inputVal.getByteArray();
+        int offset = inputVal.getStartOffset();
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[offset]);
+        if (typeTag == ATypeTag.OBJECT) {
+            long handleCount = AInt64SerializerDeserializer.getLong(serBytes,
+                    ARecordSerializerDeserializer.getFieldOffsetById(serBytes, offset, COUNT_FIELD_ID, 0, false));
+            if (handleCount == 0) {
+                return;
+            }
+            count += handleCount;
+
+            long fileId = AInt64SerializerDeserializer.getLong(serBytes,
+                    ARecordSerializerDeserializer.getFieldOffsetById(serBytes, offset, HANDLE_FIELD_ID, 0, false));
+
+            String address = UTF8StringUtil.toString(serBytes,
+                    ARecordSerializerDeserializer.getFieldOffsetById(serBytes, offset, ADDRESS_FIELD_ID, 0, false));
+
+            int port = AInt32SerializerDeserializer.getInt(serBytes,
+                    ARecordSerializerDeserializer.getFieldOffsetById(serBytes, offset, PORT_FIELD_ID, 0, false));
+
+            partialResults.add(new PartialResult(fileId, handleCount, address, port));
+        } else {
+            throw new UnsupportedItemTypeException(sourceLoc, MEDIAN, serBytes[offset]);
+        }
+    }
+
+    protected void finishPartialResult(IPointable result) throws HyracksDataException {
+        if (count == 0) {
+            setPartialResult(result, -1, "", -1);
+            return;
+        }
+
+        IHyracksTaskContext taskCtx = ctx.getTaskContext();
+        IHyracksJobletContext jobletCtx = taskCtx.getJobletContext();
+        RunMergingFrameReader merger = createRunsMergingFrameReader();
+        FileReference managedFile = jobletCtx.createManagedWorkspaceFile(MEDIAN);
+        RunFileWriter runFileWriter = new RunFileWriter(managedFile, taskCtx.getIoManager());
+        merger.open();
+        runFileWriter.open();
+        try {
+            while (merger.nextFrame(frame)) {
+                runFileWriter.nextFrame(frame.getBuffer());
+            }
+        } finally {
+            runFileWriter.close();
+            merger.close();
+        }
+
+        NetworkAddress netAddress = ((NodeControllerService) jobletCtx.getServiceContext().getControllerService())
+                .getNetworkManager().getPublicNetworkAddress();
+
+        long fileId = jobletCtx.nextUniqueId();
+        taskCtx.setStateObject(new JobFileState(managedFile, jobletCtx.getJobId(), fileId));
+        setPartialResult(result, fileId, netAddress.getAddress(), netAddress.getPort());
+    }
+
+    protected void finishFinalResult(IPointable result) throws HyracksDataException {
+        if (count == 0) {
+            PointableHelper.setNull(result);
+            return;
+        }
+        try {
+            boolean medianFound = findMedian();
+            resultStorage.reset();
+            if (medianFound) {
+                doubleSerde.serialize(aDouble, resultStorage.getDataOutput());
+                result.set(resultStorage);
+            } else {
+                PointableHelper.setNull(result);
+                LOGGER.warn("median was not found");
+            }
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private boolean findMedian() throws HyracksDataException {
+        RunMergingFrameReader merger = createRunsMergingFrameReader();
+        return getMedian(merger);
+    }
+
+    protected RunMergingFrameReader createRunsMergingFrameReader() throws HyracksDataException {
+        IHyracksTaskContext taskCtx = ctx.getTaskContext();
+        IHyracksJobletContext jobletCtx = taskCtx.getJobletContext();
+        INCServiceContext serviceCtx = jobletCtx.getServiceContext();
+        NetworkManager netManager = ((NodeControllerService) serviceCtx.getControllerService()).getNetworkManager();
+        List<IFrame> inFrames = getInFrames(partialResults.size(), taskCtx);
+        readers.clear();
+        for (PartialResult partialResult : partialResults) {
+            IFrameReader inputChannelReader = createInputChannel(netManager, taskCtx,
+                    new NetworkAddress(partialResult.address, partialResult.port), jobletCtx.getJobId().getId(),
+                    partialResult.fileId);
+            readers.add(inputChannelReader);
+        }
+        return new RunMergingFrameReader(taskCtx, readers, inFrames, new int[] { 0 },
+                new IBinaryComparator[] { doubleComparatorFactory.createBinaryComparator() },
+                doubleNkComputerFactory.createNormalizedKeyComputer(), recordDesc);
+    }
+
+    private boolean getMedian(RunMergingFrameReader merger) throws HyracksDataException {
+        boolean isOdd = count % 2 != 0;
+        long medianPosition = isOdd ? count / 2 : (count - 1) / 2;
+        long currentTupleCount = 0;
+        boolean found = false;
+        merger.open();
+        try {
+            while (merger.nextFrame(frame)) {
+                fta.reset(frame.getBuffer());
+                int tupleCount = fta.getTupleCount();
+                if (currentTupleCount + tupleCount > medianPosition) {
+                    int firstMedian = (int) (medianPosition - currentTupleCount);
+                    ftr.reset(fta, firstMedian);
+                    double medianVal =
+                            ADoubleSerializerDeserializer.getDouble(ftr.getFieldData(0), ftr.getFieldStart(0) + 1);
+                    if (!isOdd) {
+                        if (firstMedian + 1 < tupleCount) {
+                            // second median is in the same frame
+                            ftr.reset(fta, firstMedian + 1);
+                        } else {
+                            // second median is in the next frame
+                            merger.nextFrame(frame);
+                            fta.reset(frame.getBuffer());
+                            ftr.reset(fta, 0);
+                        }
+                        medianVal =
+                                (ADoubleSerializerDeserializer.getDouble(ftr.getFieldData(0), ftr.getFieldStart(0) + 1)
+                                        + medianVal) / 2;
+                    }
+                    aDouble.setValue(medianVal);
+                    found = true;
+                    break;
+                }
+                currentTupleCount += tupleCount;
+            }
+            while (merger.nextFrame(frame)) {
+                // consume the remaining frames to close the network channels gracefully
+            }
+        } finally {
+            merger.close();
+        }
+        return found;
+    }
+
+    protected void setPartialResult(IPointable result, long fileId, String address, int port)
+            throws HyracksDataException {
+        try {
+            resultStorage.reset();
+            aInt64.setValue(count);
+            longSerde.serialize(aInt64, resultStorage.getDataOutput());
+            recBuilder.addField(COUNT_FIELD_ID, resultStorage);
+
+            resultStorage.reset();
+            aInt64.setValue(fileId);
+            longSerde.serialize(aInt64, resultStorage.getDataOutput());
+            recBuilder.addField(HANDLE_FIELD_ID, resultStorage);
+
+            resultStorage.reset();
+            aString.setValue(address);
+            stringSerde.serialize(aString, resultStorage.getDataOutput());
+            recBuilder.addField(ADDRESS_FIELD_ID, resultStorage);
+
+            resultStorage.reset();
+            aInt32.setValue(port);
+            intSerde.serialize(aInt32, resultStorage.getDataOutput());
+            recBuilder.addField(PORT_FIELD_ID, resultStorage);
+
+            resultStorage.reset();
+            recBuilder.write(resultStorage.getDataOutput(), true);
+            result.set(resultStorage);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    protected List<IFrame> getInFrames(int size, IHyracksTaskContext taskCtx) throws HyracksDataException {
+        if (inFrames == null) {
+            inFrames = new ArrayList<>(size);
+        }
+        int k = 0;
+        for (int inFramesSize = inFrames.size(); k < size && k < inFramesSize; k++) {
+            inFrames.get(k).reset();
+        }
+        for (; k < size; k++) {
+            inFrames.add(new VSizeFrame(taskCtx));
+        }
+        return inFrames;
+    }
+
+    private IFrameReader createInputChannel(NetworkManager netManager, IHyracksTaskContext taskContext,
+            NetworkAddress networkAddress, long jobId, long fileId) throws HyracksDataException {
+        FileNetworkInputChannel FileNetworkInputChannel =
+                new FileNetworkInputChannel(netManager, getSocketAddress(networkAddress), jobId, fileId);
+        InputChannelFrameReader channelFrameReader = new InputChannelFrameReader(FileNetworkInputChannel);
+        FileNetworkInputChannel.registerMonitor(channelFrameReader);
+        FileNetworkInputChannel.open(taskContext);
+        return channelFrameReader;
+    }
+
+    private static SocketAddress getSocketAddress(NetworkAddress netAddress) throws HyracksDataException {
+        try {
+            return new InetSocketAddress(InetAddress.getByAddress(netAddress.lookupIpAddress()), netAddress.getPort());
+        } catch (UnknownHostException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public static class PartialResult {
+
+        long fileId;
+        String address;
+        int port;
+        long count;
+
+        PartialResult(long fileId, long count, String address, int port) {
+            this.fileId = fileId;
+            this.count = count;
+            this.address = address;
+            this.port = port;
+        }
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlMedianAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlMedianAggregateDescriptor.java
new file mode 100644
index 0000000..faf5272
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlMedianAggregateDescriptor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class GlobalSqlMedianAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final IFunctionDescriptorFactory FACTORY = GlobalSqlMedianAggregateDescriptor::new;
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.GLOBAL_SQL_MEDIAN;
+    }
+
+    @Override
+    public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) {
+        return new IAggregateEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IAggregateEvaluator createAggregateEvaluator(final IEvaluatorContext ctx)
+                    throws HyracksDataException {
+                return new GlobalSqlMedianAggregateFunction(args, ctx, sourceLoc);
+            }
+        };
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlMedianAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlMedianAggregateFunction.java
new file mode 100644
index 0000000..c9ee8d8
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlMedianAggregateFunction.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.aggregates.std;
+
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class GlobalSqlMedianAggregateFunction extends AbstractMedianAggregateFunction {
+
+    public GlobalSqlMedianAggregateFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context,
+            SourceLocation sourceLoc) throws HyracksDataException {
+        super(args, context, sourceLoc);
+
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws HyracksDataException {
+        processPartialResults(tuple);
+    }
+
+    @Override
+    public void finish(IPointable result) throws HyracksDataException {
+        finishFinalResult(result);
+    }
+
+    @Override
+    public void finishPartial(IPointable result) throws HyracksDataException {
+        finishPartialResult(result);
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlMedianAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlMedianAggregateDescriptor.java
new file mode 100644
index 0000000..532cb5f
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlMedianAggregateDescriptor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class IntermediateSqlMedianAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final IFunctionDescriptorFactory FACTORY = IntermediateSqlMedianAggregateDescriptor::new;
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.INTERMEDIATE_SQL_MEDIAN;
+    }
+
+    @Override
+    public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) {
+        return new IAggregateEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IAggregateEvaluator createAggregateEvaluator(final IEvaluatorContext ctx)
+                    throws HyracksDataException {
+                return new IntermediateSqlMedianAggregateFunction(args, ctx, sourceLoc);
+            }
+        };
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlMedianAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlMedianAggregateFunction.java
new file mode 100644
index 0000000..d78c17f
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlMedianAggregateFunction.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.aggregates.std;
+
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class IntermediateSqlMedianAggregateFunction extends AbstractMedianAggregateFunction {
+
+    public IntermediateSqlMedianAggregateFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context,
+            SourceLocation sourceLoc) throws HyracksDataException {
+        super(args, context, sourceLoc);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws HyracksDataException {
+        processPartialResults(tuple);
+    }
+
+    @Override
+    public void finish(IPointable result) throws HyracksDataException {
+        finishPartialResult(result);
+    }
+
+    @Override
+    public void finishPartial(IPointable result) throws HyracksDataException {
+        finishPartialResult(result);
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlMedianAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlMedianAggregateDescriptor.java
new file mode 100644
index 0000000..50f36ce
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlMedianAggregateDescriptor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.om.functions.IFunctionTypeInferer;
+import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import org.apache.asterix.runtime.functions.FunctionTypeInferers;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class LocalSqlMedianAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+
+        @Override
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new LocalSqlMedianAggregateDescriptor();
+        }
+
+        @Override
+        public IFunctionTypeInferer createFunctionTypeInferer() {
+            return FunctionTypeInferers.MEDIAN_MEMORY;
+        }
+    };
+
+    private int numFrames = 0;
+
+    @Override
+    public void setImmutableStates(Object... states) {
+        this.numFrames = (int) states[0];
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.LOCAL_SQL_MEDIAN;
+    }
+
+    @Override
+    public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) {
+        return new IAggregateEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IAggregateEvaluator createAggregateEvaluator(final IEvaluatorContext ctx)
+                    throws HyracksDataException {
+                return new LocalSqlMedianAggregateFunction(args, ctx, sourceLoc, numFrames);
+            }
+        };
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlMedianAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlMedianAggregateFunction.java
new file mode 100644
index 0000000..5400ca2
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlMedianAggregateFunction.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.aggregates.std;
+
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class LocalSqlMedianAggregateFunction extends AbstractLocalMedianAggregateFunction {
+
+    public LocalSqlMedianAggregateFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context,
+            SourceLocation sourceLoc, int numFrames) throws HyracksDataException {
+        super(args, context, sourceLoc, numFrames);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws HyracksDataException {
+        processDataValue(tuple);
+    }
+
+    @Override
+    public void finishPartial(IPointable result) throws HyracksDataException {
+        finishLocalPartial(result);
+    }
+
+    @Override
+    public void finish(IPointable result) throws HyracksDataException {
+        finishLocalPartial(result);
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlMedianAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlMedianAggregateDescriptor.java
new file mode 100644
index 0000000..b98e389
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlMedianAggregateDescriptor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.aggregates.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.om.functions.IFunctionTypeInferer;
+import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import org.apache.asterix.runtime.functions.FunctionTypeInferers;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class SqlMedianAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+
+        @Override
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SqlMedianAggregateDescriptor();
+        }
+
+        @Override
+        public IFunctionTypeInferer createFunctionTypeInferer() {
+            return FunctionTypeInferers.MEDIAN_MEMORY;
+        }
+    };
+
+    private int numFrames = 0;
+
+    @Override
+    public void setImmutableStates(Object... states) {
+        this.numFrames = (int) states[0];
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.SQL_MEDIAN;
+    }
+
+    @Override
+    public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) {
+        return new IAggregateEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IAggregateEvaluator createAggregateEvaluator(final IEvaluatorContext ctx)
+                    throws HyracksDataException {
+                return new SqlMedianAggregateFunction(args, ctx, sourceLoc, numFrames);
+            }
+        };
+    }
+
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlMedianAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlMedianAggregateFunction.java
new file mode 100644
index 0000000..1dbbfe6
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlMedianAggregateFunction.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.aggregates.std;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.sort.RunMergingFrameReader;
+
+public class SqlMedianAggregateFunction extends AbstractLocalMedianAggregateFunction {
+
+    public SqlMedianAggregateFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context,
+            SourceLocation sourceLoc, int numFrames) throws HyracksDataException {
+        super(args, context, sourceLoc, numFrames);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws HyracksDataException {
+        processDataValue(tuple);
+    }
+
+    @Override
+    public void finish(IPointable result) throws HyracksDataException {
+        if (appender.getTupleCount() > 0) {
+            appender.write(runsGenerator, true);
+        }
+        // close to sort the in-memory data or write out sorted data to run files
+        runsGenerator.close();
+        super.finishFinalResult(result);
+    }
+
+    @Override
+    public void finishPartial(IPointable result) throws HyracksDataException {
+        finishLocalPartial(result);
+    }
+
+    @Override
+    protected RunMergingFrameReader createRunsMergingFrameReader() throws HyracksDataException {
+        IHyracksTaskContext taskCtx = ctx.getTaskContext();
+        List<GeneratedRunFileReader> runs = runsGenerator.getRuns();
+        readers.clear();
+        if (runs.isEmpty()) {
+            //TODO: no need to write memory to run file, should just read the sorted data out of the sorter
+            FileReference managedFile = taskCtx.createManagedWorkspaceFile(MEDIAN);
+            RunFileWriter runFileWriter = writeMemoryDataToRunFile(managedFile, taskCtx);
+            GeneratedRunFileReader deleteOnCloseReader = runFileWriter.createDeleteOnCloseReader();
+            readers.add(deleteOnCloseReader);
+        } else {
+            readers.addAll(runs);
+        }
+
+        List<IFrame> inFrames = getInFrames(readers.size(), taskCtx);
+        return new RunMergingFrameReader(taskCtx, readers, inFrames, new int[] { 0 },
+                new IBinaryComparator[] { doubleComparatorFactory.createBinaryComparator() },
+                doubleNkComputerFactory.createNormalizedKeyComputer(), recordDesc);
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
index 949afed..7001c25 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
@@ -54,6 +54,7 @@
 import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlKurtosisDistinctAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlMaxAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlMaxDistinctAggregateDescriptor;
+import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlMedianAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlMinAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlMinDistinctAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlSkewnessAggregateDescriptor;
@@ -156,6 +157,7 @@
 import org.apache.asterix.runtime.aggregates.std.GlobalSqlAvgAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.GlobalSqlKurtosisAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.GlobalSqlMaxAggregateDescriptor;
+import org.apache.asterix.runtime.aggregates.std.GlobalSqlMedianAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.GlobalSqlMinAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.GlobalSqlSkewnessAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.GlobalSqlStddevAggregateDescriptor;
@@ -178,6 +180,7 @@
 import org.apache.asterix.runtime.aggregates.std.IntermediateSqlAvgAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.IntermediateSqlKurtosisAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.IntermediateSqlMaxAggregateDescriptor;
+import org.apache.asterix.runtime.aggregates.std.IntermediateSqlMedianAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.IntermediateSqlMinAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.IntermediateSqlSkewnessAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.IntermediateSqlStddevAggregateDescriptor;
@@ -202,6 +205,7 @@
 import org.apache.asterix.runtime.aggregates.std.LocalSqlAvgAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.LocalSqlKurtosisAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.LocalSqlMaxAggregateDescriptor;
+import org.apache.asterix.runtime.aggregates.std.LocalSqlMedianAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.LocalSqlMinAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.LocalSqlSkewnessAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.LocalSqlStddevAggregateDescriptor;
@@ -224,6 +228,7 @@
 import org.apache.asterix.runtime.aggregates.std.SqlCountAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.SqlKurtosisAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.SqlMaxAggregateDescriptor;
+import org.apache.asterix.runtime.aggregates.std.SqlMedianAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.SqlMinAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.SqlSkewnessAggregateDescriptor;
 import org.apache.asterix.runtime.aggregates.std.SqlStddevAggregateDescriptor;
@@ -823,6 +828,10 @@
         fc.add(LocalSqlMinAggregateDescriptor.FACTORY);
         fc.add(IntermediateSqlMinAggregateDescriptor.FACTORY);
         fc.add(GlobalSqlMinAggregateDescriptor.FACTORY);
+        fc.add(SqlMedianAggregateDescriptor.FACTORY);
+        fc.add(LocalSqlMedianAggregateDescriptor.FACTORY);
+        fc.add(IntermediateSqlMedianAggregateDescriptor.FACTORY);
+        fc.add(GlobalSqlMedianAggregateDescriptor.FACTORY);
         fc.add(SqlStddevAggregateDescriptor.FACTORY);
         fc.add(LocalSqlStddevAggregateDescriptor.FACTORY);
         fc.add(IntermediateSqlStddevAggregateDescriptor.FACTORY);
@@ -898,6 +907,7 @@
         fc.add(ScalarSqlMaxDistinctAggregateDescriptor.FACTORY);
         fc.add(ScalarSqlMinAggregateDescriptor.FACTORY);
         fc.add(ScalarSqlMinDistinctAggregateDescriptor.FACTORY);
+        fc.add(ScalarSqlMedianAggregateDescriptor.FACTORY);
         fc.add(ScalarSqlStddevAggregateDescriptor.FACTORY);
         fc.add(ScalarSqlStddevDistinctAggregateDescriptor.FACTORY);
         fc.add(ScalarSqlStddevPopAggregateDescriptor.FACTORY);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java
index 5abebd2..9c25f6e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java
@@ -116,6 +116,9 @@
         }
     };
 
+    public static final IFunctionTypeInferer MEDIAN_MEMORY =
+            (expr, fd, context, compilerProps) -> fd.setImmutableStates(compilerProps.getSortMemoryFrames());
+
     public static final IFunctionTypeInferer RECORD_MODIFY_INFERER = (expr, fd, context, compilerProps) -> {
         AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expr;
         IAType outType = (IAType) context.getType(expr);
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 92a1285..e660213 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -153,6 +153,7 @@
             -DrunSlowAQLTests=${runSlowAQLTests}
             -Xdebug
             -Xrunjdwp:transport=dt_socket,server=y,address=8000,suspend=${debug.suspend.flag}
+            --add-opens=jdk.management/com.sun.management.internal=ALL-UNNAMED --add-opens=java.management/sun.management=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED
             ${coverageArgLine}
             ${extraSurefireArgLine}
           </argLine>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
index 4d324f4..422782e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
@@ -44,4 +44,6 @@
     Class<?> loadClass(String className) throws HyracksException;
 
     ClassLoader getClassLoader() throws HyracksException;
+
+    long nextUniqueId();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/FileNetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/FileNetworkInputChannel.java
new file mode 100644
index 0000000..542fda1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/FileNetworkInputChannel.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.comm.channels;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import org.apache.hyracks.api.channels.IInputChannel;
+import org.apache.hyracks.api.channels.IInputChannelMonitor;
+import org.apache.hyracks.api.comm.IBufferAcceptor;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FileNetworkInputChannel implements IInputChannel {
+
+    private static final int NUM_READ_BUFFERS = 1;
+    public static final long FILE_CHANNEL_CODE = -1;
+
+    private final IChannelConnectionFactory netManager;
+    private final SocketAddress remoteAddress;
+    private final long jobId;
+    private final long fileId;
+    private final Queue<ByteBuffer> fullQueue;
+    private final int nBuffers;
+    private IChannelControlBlock ccb;
+    private IInputChannelMonitor monitor;
+    private Object attachment;
+
+    public FileNetworkInputChannel(IChannelConnectionFactory netManager, SocketAddress remoteAddress, long jobId,
+            long fileId) {
+        this.netManager = netManager;
+        this.remoteAddress = remoteAddress;
+        this.jobId = jobId;
+        this.fileId = fileId;
+        this.fullQueue = new ArrayDeque<>(NUM_READ_BUFFERS);
+        this.nBuffers = NUM_READ_BUFFERS;
+    }
+
+    @Override
+    public void registerMonitor(IInputChannelMonitor monitor) {
+        this.monitor = monitor;
+    }
+
+    @Override
+    public void setAttachment(Object attachment) {
+        this.attachment = attachment;
+    }
+
+    @Override
+    public Object getAttachment() {
+        return attachment;
+    }
+
+    @Override
+    public synchronized ByteBuffer getNextBuffer() {
+        return fullQueue.poll();
+    }
+
+    @Override
+    public void recycleBuffer(ByteBuffer buffer) {
+        buffer.clear();
+        ccb.getReadInterface().getEmptyBufferAcceptor().accept(buffer);
+    }
+
+    @Override
+    public void open(IHyracksCommonContext ctx) throws HyracksDataException {
+        try {
+            ccb = netManager.connect(remoteAddress);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+        ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor());
+        ccb.getWriteInterface().setEmptyBufferAcceptor(WriteEmptyBufferAcceptor.INSTANCE);
+        ccb.getReadInterface().setBufferFactory(new ReadBufferFactory(nBuffers, ctx), nBuffers,
+                ctx.getInitialFrameSize());
+
+        ByteBuffer writeBuffer = ByteBuffer.allocate(NetworkInputChannel.INITIAL_MESSAGE_SIZE);
+        writeBuffer.putLong(FILE_CHANNEL_CODE);
+        writeBuffer.putLong(jobId);
+        writeBuffer.putLong(fileId);
+        writeBuffer.flip();
+
+        ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer);
+        ccb.getWriteInterface().getFullBufferAcceptor().close();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+
+    }
+
+    @Override
+    public void fail() {
+        // do nothing (covered by job lifecycle)
+    }
+
+    private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
+        @Override
+        public void accept(ByteBuffer buffer) {
+            fullQueue.add(buffer);
+            monitor.notifyDataAvailability(FileNetworkInputChannel.this, 1);
+        }
+
+        @Override
+        public void close() {
+            monitor.notifyEndOfStream(FileNetworkInputChannel.this);
+        }
+
+        @Override
+        public void error(int ecode) {
+            monitor.notifyFailure(FileNetworkInputChannel.this, ecode);
+        }
+    }
+
+    private static class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+
+        static final WriteEmptyBufferAcceptor INSTANCE = new WriteEmptyBufferAcceptor();
+
+        @Override
+        public void accept(ByteBuffer buffer) {
+            // do nothing
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
index 53bb7cd..18cab49 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
@@ -37,7 +37,8 @@
 public class NetworkInputChannel implements IInputChannel {
     private static final Logger LOGGER = LogManager.getLogger();
 
-    static final int INITIAL_MESSAGE_SIZE = 20;
+    private static final int INITIAL_MSG_FILLER = -1;
+    public static final int INITIAL_MESSAGE_SIZE = 24;
 
     private final IChannelConnectionFactory netManager;
 
@@ -106,6 +107,7 @@
         writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
         writeBuffer.putInt(partitionId.getSenderIndex());
         writeBuffer.putInt(partitionId.getReceiverIndex());
+        writeBuffer.putInt(INITIAL_MSG_FILLER);
         writeBuffer.flip();
         if (LOGGER.isTraceEnabled()) {
             LOGGER.trace("Sending partition request: " + partitionId + " on channel: " + ccb);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index f179c36..a96dfe5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -108,6 +108,7 @@
     private final String jobStartTimeZoneId;
 
     private final long maxWarnings;
+    private final AtomicLong uniqueIds;
 
     public Joblet(NodeControllerService nodeController, DeploymentId deploymentId, JobId jobId,
             INCServiceContext serviceCtx, ActivityClusterGraph acg,
@@ -140,6 +141,7 @@
         this.jobStartTime = jobStartTime;
         this.jobStartTimeZoneId = jobStartTimeZoneId;
         this.maxWarnings = acg.getMaxWarnings();
+        this.uniqueIds = new AtomicLong();
     }
 
     @Override
@@ -161,6 +163,11 @@
     }
 
     @Override
+    public long nextUniqueId() {
+        return uniqueIds.getAndIncrement();
+    }
+
+    @Override
     public long getJobStartTime() {
         return jobStartTime;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
index 6876618..468a969 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
@@ -31,8 +31,12 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.network.ISocketChannelFactory;
 import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.comm.channels.FileNetworkInputChannel;
 import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
+import org.apache.hyracks.comm.channels.NetworkInputChannel;
 import org.apache.hyracks.comm.channels.NetworkOutputChannel;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.partitions.PartitionFileReaderUtil;
 import org.apache.hyracks.control.nc.partitions.PartitionManager;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
@@ -47,8 +51,6 @@
 
     private static final int MAX_CONNECTION_ATTEMPTS = 5;
 
-    static final int INITIAL_MESSAGE_SIZE = 20;
-
     private final PartitionManager partitionManager;
 
     private final int nBuffers;
@@ -113,7 +115,8 @@
         @Override
         public void channelOpened(ChannelControlBlock channel) {
             channel.getReadInterface().setFullBufferAcceptor(new InitialBufferAcceptor(channel));
-            channel.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(INITIAL_MESSAGE_SIZE));
+            channel.getReadInterface().getEmptyBufferAcceptor()
+                    .accept(ByteBuffer.allocate(NetworkInputChannel.INITIAL_MESSAGE_SIZE));
         }
     }
 
@@ -128,12 +131,13 @@
 
         @Override
         public void accept(ByteBuffer buffer) {
-            PartitionId pid = readInitialMessage(buffer);
-            if (LOGGER.isTraceEnabled()) {
-                LOGGER.trace("Received initial partition request: " + pid + " on channel: " + ccb);
-            }
             noc = new NetworkOutputChannel(ccb, nBuffers);
-            partitionManager.registerPartitionRequest(pid, noc);
+            long id = buffer.getLong();
+            if (id == FileNetworkInputChannel.FILE_CHANNEL_CODE) {
+                handleFileRequest(buffer);
+            } else {
+                handlePartitionRequest(buffer, id);
+            }
         }
 
         @Override
@@ -147,16 +151,37 @@
                 noc.abort(ecode);
             }
         }
+
+        private void handlePartitionRequest(ByteBuffer buffer, long jid) {
+            PartitionId pid = readInitialMessage(buffer, jid);
+            if (LOGGER.isTraceEnabled()) {
+                LOGGER.trace("Received initial partition request: " + pid + " on channel: " + ccb);
+            }
+            partitionManager.registerPartitionRequest(pid, noc);
+        }
+
+        private void handleFileRequest(ByteBuffer buffer) {
+            JobId jobId = new JobId(buffer.getLong());
+            long fileId = buffer.getLong();
+            if (partitionManager.registerFileRequest(jobId, noc)) {
+                writeFileToChannel(partitionManager.getNodeControllerService(), noc, jobId, fileId);
+            }
+        }
     }
 
-    private static PartitionId readInitialMessage(ByteBuffer buffer) {
-        JobId jobId = new JobId(buffer.getLong());
+    private static PartitionId readInitialMessage(ByteBuffer buffer, long jid) {
+        JobId jobId = new JobId(jid);
         ConnectorDescriptorId cdid = new ConnectorDescriptorId(buffer.getInt());
         int senderIndex = buffer.getInt();
         int receiverIndex = buffer.getInt();
         return new PartitionId(jobId, cdid, senderIndex, receiverIndex);
     }
 
+    private static void writeFileToChannel(NodeControllerService ncs, NetworkOutputChannel noc, JobId jobId,
+            long fileId) {
+        PartitionFileReaderUtil.writeFileToChannel(ncs, noc, jobId, fileId);
+    }
+
     public MuxDemuxPerformanceCounters getPerformanceCounters() {
         return md.getPerformanceCounters();
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/JobFileState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/JobFileState.java
new file mode 100644
index 0000000..e4e6f09
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/JobFileState.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.partitions;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hyracks.api.dataflow.state.IStateObject;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.job.JobId;
+
+public class JobFileState implements IStateObject {
+
+    private final FileReference fileRef;
+    private final JobId jobId;
+    private final JobFileId jobFileId;
+
+    public JobFileState(FileReference fileRef, JobId jobId, long fileId) {
+        this.fileRef = fileRef;
+        this.jobFileId = new JobFileId(fileId);
+        this.jobId = jobId;
+    }
+
+    @Override
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    @Override
+    public Object getId() {
+        return jobFileId;
+    }
+
+    @Override
+    public long getMemoryOccupancy() {
+        return 0;
+    }
+
+    @Override
+    public void toBytes(DataOutput out) throws IOException {
+
+    }
+
+    @Override
+    public void fromBytes(DataInput in) throws IOException {
+
+    }
+
+    public FileReference getFileRef() {
+        return fileRef;
+    }
+
+    public static class JobFileId {
+
+        private final long fileId;
+
+        public JobFileId(long fileId) {
+            this.fileId = fileId;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof JobFileId)) {
+                return false;
+            }
+            JobFileId otherFileId = (JobFileId) o;
+            return otherFileId.fileId == fileId;
+        }
+
+        @Override
+        public int hashCode() {
+            return Long.hashCode(fileId);
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java
index 3e36946..1471453 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java
@@ -18,14 +18,11 @@
  */
 package org.apache.hyracks.control.nc.partitions;
 
-import java.nio.ByteBuffer;
 import java.util.concurrent.Executor;
 
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IFileHandle;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.partitions.IPartition;
 
@@ -60,45 +57,7 @@
 
     @Override
     public void writeTo(final IFrameWriter writer) {
-        executor.execute(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    if (partitionFile == null) {
-                        writer.open();
-                        writer.close();
-                        return;
-                    }
-                    IFileHandle fh = ioManager.open(partitionFile, IIOManager.FileReadWriteMode.READ_ONLY,
-                            IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
-                    try {
-                        writer.open();
-                        try {
-                            long offset = 0;
-                            ByteBuffer buffer = ctx.allocateFrame();
-                            while (true) {
-                                buffer.clear();
-                                long size = ioManager.syncRead(fh, offset, buffer);
-                                if (size < 0) {
-                                    break;
-                                } else if (size < buffer.capacity()) {
-                                    throw new HyracksDataException("Premature end of file");
-                                }
-                                offset += size;
-                                buffer.flip();
-                                writer.nextFrame(buffer);
-                            }
-                        } finally {
-                            writer.close();
-                        }
-                    } finally {
-                        ioManager.close(fh);
-                    }
-                } catch (HyracksDataException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        });
+        executor.execute(new PartitionFileReader(ctx, partitionFile, ioManager, writer, false));
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReader.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReader.java
new file mode 100644
index 0000000..609b32a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReader.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.io.IIOManager;
+
+public class PartitionFileReader implements Runnable {
+
+    private final IHyracksCommonContext ctx;
+    private final FileReference partitionFile;
+    private final IIOManager ioManager;
+    private final IFrameWriter writer;
+    private final boolean deleteFile;
+
+    public PartitionFileReader(IHyracksCommonContext ctx, FileReference partitionFile, IIOManager ioManager,
+            IFrameWriter writer, boolean deleteFile) {
+        this.ctx = ctx;
+        this.partitionFile = partitionFile;
+        this.ioManager = ioManager;
+        this.writer = writer;
+        this.deleteFile = deleteFile;
+    }
+
+    @Override
+    public void run() {
+        try {
+            if (partitionFile == null) {
+                writer.open();
+                writer.close();
+                return;
+            }
+            IFileHandle fh = ioManager.open(partitionFile, IIOManager.FileReadWriteMode.READ_ONLY,
+                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+            try {
+                writer.open();
+                try {
+                    long offset = 0;
+                    ByteBuffer buffer = ctx.allocateFrame();
+                    while (true) {
+                        buffer.clear();
+                        long size = ioManager.syncRead(fh, offset, buffer);
+                        if (size < 0) {
+                            break;
+                        } else if (size < buffer.capacity()) {
+                            throw new HyracksDataException("Premature end of file");
+                        }
+                        offset += size;
+                        buffer.flip();
+                        writer.nextFrame(buffer);
+                    }
+                } finally {
+                    writer.close();
+                }
+            } finally {
+                try {
+                    ioManager.close(fh);
+                } finally {
+                    if (deleteFile) {
+                        FileUtils.deleteQuietly(partitionFile.getFile());
+                    }
+                }
+            }
+        } catch (HyracksDataException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReaderUtil.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReaderUtil.java
new file mode 100644
index 0000000..2a105d9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReaderUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.partitions;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.hyracks.api.dataflow.state.IStateObject;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.comm.channels.NetworkOutputChannel;
+import org.apache.hyracks.control.nc.Joblet;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface;
+
+public class PartitionFileReaderUtil {
+
+    private PartitionFileReaderUtil() {
+    }
+
+    public static void writeFileToChannel(NodeControllerService ncs, NetworkOutputChannel noc, JobId jobId,
+            long fileId) {
+        Joblet joblet = ncs.getJobletMap().get(jobId);
+        if (joblet == null) {
+            noc.abort(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
+            return;
+        }
+        IStateObject stateObject = joblet.getEnvironment().getStateObject(new JobFileState.JobFileId(fileId));
+        if (!(stateObject instanceof JobFileState)) {
+            noc.abort(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
+            return;
+        }
+        JobFileState fileState = (JobFileState) stateObject;
+        FileReference fileRef = fileState.getFileRef();
+        if (!fileRef.getFile().exists()) {
+            noc.abort(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
+            return;
+        }
+        ExecutorService executor = ncs.getExecutor();
+        noc.setFrameSize(joblet.getInitialFrameSize());
+        executor.execute(new PartitionFileReader(joblet, fileRef, ncs.getIoManager(), noc, true));
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
index c082b71..927dd7c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
@@ -58,6 +58,8 @@
 
     private final Map<PartitionId, NetworkOutputChannel> partitionRequests = new HashMap<>();
 
+    private final Map<JobId, List<NetworkOutputChannel>> fileRequests = new HashMap<>();
+
     private final Cache<JobId, JobId> failedJobsCache;
 
     public PartitionManager(NodeControllerService ncs) {
@@ -122,10 +124,24 @@
         }
     }
 
+    public synchronized boolean registerFileRequest(JobId jid, NetworkOutputChannel noc) {
+        if (failedJobsCache.getIfPresent(jid) != null) {
+            noc.abort(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
+            return false;
+        }
+        List<NetworkOutputChannel> netOutChannels = fileRequests.computeIfAbsent(jid, k -> new ArrayList<>());
+        netOutChannels.add(noc);
+        return true;
+    }
+
     public IWorkspaceFileFactory getFileFactory() {
         return fileFactory;
     }
 
+    public NodeControllerService getNodeControllerService() {
+        return ncs;
+    }
+
     public void close() {
         deallocatableRegistry.close();
     }
@@ -175,6 +191,10 @@
     }
 
     private List<NetworkOutputChannel> removePendingRequests(JobId jobId, JobStatus status) {
+        List<NetworkOutputChannel> jobFileRequests = null;
+        if (!fileRequests.isEmpty()) {
+            jobFileRequests = fileRequests.remove(jobId);
+        }
         if (status != JobStatus.FAILURE) {
             return Collections.emptyList();
         }
@@ -189,6 +209,9 @@
                 requestsIterator.remove();
             }
         }
+        if (jobFileRequests != null && !jobFileRequests.isEmpty()) {
+            pendingRequests.addAll(jobFileRequests);
+        }
         return pendingRequests;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
index 97dc344..96d3ecb 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
@@ -31,6 +31,7 @@
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -85,6 +86,12 @@
      */
     private static OperatingSystemMXBean osMXBean;
 
+    public static final String[] DEFAULT_ADD_OPENS =
+            { "--add-opens=jdk.management/com.sun.management.internal=ALL-UNNAMED",
+                    "--add-opens=java.management/sun.management=ALL-UNNAMED",
+                    "--add-opens=java.base/java.lang=ALL-UNNAMED", "--add-opens=java.base/java.nio=ALL-UNNAMED",
+                    "--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED" };
+
     private static List<String> buildCommand() throws IOException {
         List<String> cList = new ArrayList<>();
 
@@ -134,12 +141,20 @@
         // Sets up memory parameter if it is not specified.
         if (!jvmargs.contains("-Xmx")) {
             long ramSize = ((com.sun.management.OperatingSystemMXBean) osMXBean).getTotalPhysicalMemorySize();
-            int proportionalRamSize = (int) Math.ceil(0.6 * ramSize / (1024 * 1024));
+            int proportionalRamSize = (int) Math.ceil(0.5 * ramSize / (1024 * 1024));
             //if under 32bit JVM, use less than 1GB heap by default. otherwise use proportional ramsize.
             int heapSize = "32".equals(System.getProperty("sun.arch.data.model"))
                     ? (proportionalRamSize <= 1024 ? proportionalRamSize : 1024) : proportionalRamSize;
             jvmargs = jvmargs + " -Xmx" + heapSize + "m";
         }
+
+        // Squelch some module access warnings and errors from JDK9+
+        if (!jvmargs.contains("-add-opens")) {
+            StringBuilder jvmArgsBuilder = new StringBuilder(jvmargs);
+            Arrays.stream(DEFAULT_ADD_OPENS).map(s -> jvmArgsBuilder.append(s));
+            jvmargs = jvmArgsBuilder.toString();
+        }
+
         env.put("JAVA_OPTS", jvmargs.trim());
         LOGGER.info("Setting JAVA_OPTS to " + jvmargs);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/test/java/org/apache/hyracks/control/nc/service/NCServiceTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/test/java/org/apache/hyracks/control/nc/service/NCServiceTest.java
index 00d7ca4..bc04320 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/test/java/org/apache/hyracks/control/nc/service/NCServiceTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/test/java/org/apache/hyracks/control/nc/service/NCServiceTest.java
@@ -42,7 +42,7 @@
         String sizeStr = prefix.substring(0, prefix.length() - 1);
         int size = Integer.parseInt(sizeStr);
         long ramSize = ((com.sun.management.OperatingSystemMXBean) osMXBean).getTotalPhysicalMemorySize();
-        int base = 1024 * 1024 * 5;
-        Assert.assertTrue(size == ramSize * 3 / base + ((ramSize * 3) % base == 0 ? 0 : 1));
+        int base = 1024 * 1024;
+        Assert.assertTrue(size == (ramSize / 2) / base + (((ramSize / 2) % base) == 0 ? 0 : 1));
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
index 50cacb2..7051e1d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
@@ -115,6 +115,10 @@
         return size;
     }
 
+    public FileReference getFile() {
+        return file;
+    }
+
     public void setDeleteAfterClose(boolean deleteAfterClose) {
         this.deleteAfterClose = deleteAfterClose;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IRateLimiter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IRateLimiter.java
index b7433ae..163cbaf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IRateLimiter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IRateLimiter.java
@@ -24,5 +24,5 @@
 
     void setRate(double ratePerSecond);
 
-    void request(int permits) throws HyracksDataException;
+    void request(int numOfPermits) throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/NoOpRateLimiter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/NoOpRateLimiter.java
index ac0a1a9..a0660f6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/NoOpRateLimiter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/NoOpRateLimiter.java
@@ -33,7 +33,7 @@
     }
 
     @Override
-    public void request(int tokens) throws HyracksDataException {
+    public void request(int numOfPermits) throws HyracksDataException {
         // no op
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/SleepRateLimiter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/SleepRateLimiter.java
index 4d0ca92..3dcbfb8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/SleepRateLimiter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/SleepRateLimiter.java
@@ -26,7 +26,6 @@
 
 /**
  * A wrapper of the RateLimiter implementation from {@link RateLimiter}
- *
  */
 public class SleepRateLimiter implements IRateLimiter {
     /**
@@ -55,8 +54,8 @@
     }
 
     @Override
-    public void request(int permits) throws HyracksDataException {
-        rateLimiterImpl.acquire(permits);
+    public void request(int numOfPermits) throws HyracksDataException {
+        rateLimiterImpl.acquire(numOfPermits);
     }
 
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
index 46e3eec..bf4cbd0 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
@@ -20,6 +20,7 @@
 
 import java.nio.ByteBuffer;
 import java.time.ZoneId;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.context.IHyracksJobletContext;
@@ -42,6 +43,7 @@
     private final WorkspaceFileFactory fileFactory;
     private final long jobStartTime;
     private final String jobStartTimeZoneId;
+    private final AtomicLong ids;
 
     TestJobletContext(int frameSize, INCServiceContext serviceContext, JobId jobId) throws HyracksException {
         this.serviceContext = serviceContext;
@@ -50,6 +52,7 @@
         this.frameManger = new FrameManager(frameSize);
         this.jobStartTime = System.currentTimeMillis();
         this.jobStartTimeZoneId = ZoneId.systemDefault().getId();
+        this.ids = new AtomicLong();
     }
 
     @Override
@@ -151,4 +154,8 @@
         return this.getClass().getClassLoader();
     }
 
+    @Override
+    public long nextUniqueId() {
+        return ids.getAndIncrement();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeFailTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeFailTest.java
index bfa9ba6..a720ec8 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeFailTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeFailTest.java
@@ -20,7 +20,6 @@
 package org.apache.hyracks.storage.am.lsm.btree;
 
 import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
 
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -139,9 +138,6 @@
             }
             Field field = MergeOperation.class.getDeclaredField("cursor");
             field.setAccessible(true);
-            Field modifiersField = Field.class.getDeclaredField("modifiers");
-            modifiersField.setAccessible(true);
-            modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
             LSMBTreeRangeSearchCursor originalCursor = (LSMBTreeRangeSearchCursor) field.get(operation);
             field.set(operation, new TestCursor(originalCursor.getOpCtx()));
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreePageWriteCallbackTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreePageWriteCallbackTest.java
index cdf1500..100139a 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreePageWriteCallbackTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreePageWriteCallbackTest.java
@@ -70,8 +70,8 @@
         }
 
         @Override
-        public void request(int permits) throws HyracksDataException {
-            limiter.request(permits);
+        public void request(int numOfPermits) throws HyracksDataException {
+            limiter.request(numOfPermits);
             pageCounter++;
         }
     };
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
index 94d3447..8107827 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
@@ -183,20 +183,20 @@
         }
     }
 
-    public void allowModify(int permits) {
-        modifySemaphore.release(permits);
+    public void allowModify(int numOfPermits) {
+        modifySemaphore.release(numOfPermits);
     }
 
-    public void allowSearch(int permits) {
-        searchSemaphore.release(permits);
+    public void allowSearch(int numOfPermits) {
+        searchSemaphore.release(numOfPermits);
     }
 
-    public void allowFlush(int permits) {
-        flushSemaphore.release(permits);
+    public void allowFlush(int numOfPermits) {
+        flushSemaphore.release(numOfPermits);
     }
 
-    public void allowMerge(int permits) {
-        mergeSemaphore.release(permits);
+    public void allowMerge(int numOfPermits) {
+        mergeSemaphore.release(numOfPermits);
     }
 
     @Override
diff --git a/hyracks-fullstack/pom.xml b/hyracks-fullstack/pom.xml
index c38d7df..e82c5e1 100644
--- a/hyracks-fullstack/pom.xml
+++ b/hyracks-fullstack/pom.xml
@@ -49,7 +49,7 @@
     <root.dir>${basedir}</root.dir>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <file.encoding>UTF-8</file.encoding>
-    <maven.compiler.release>11</maven.compiler.release>
+    <maven.compiler.release>17</maven.compiler.release>
     <javac.xlint.value>all</javac.xlint.value>
     <jvm.extraargs />
     <sonar.jacoco.reportPath>${env.PWD}/target/jacoco-merged.exec</sonar.jacoco.reportPath>
@@ -550,6 +550,7 @@
           <reuseForks>false</reuseForks>
           <argLine>-enableassertions -Xmx2048m -Dfile.encoding=UTF-8 -Xdebug
             -Xrunjdwp:transport=dt_socket,server=y,address=8000,suspend=n ${coverageArgLine}
+            --add-opens=jdk.management/com.sun.management.internal=ALL-UNNAMED --add-opens=java.management/sun.management=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED
           </argLine>
           <systemPropertyVariables>
             <log4j.configurationFile>${testLog4jConfigFile}</log4j.configurationFile>