[ASTERIXDB-3369][FUN] Implement SQL median()
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Implement SQL median() function. Each local partition will generate a sorted
run file. Those run files can be used to determine the median.
- add ability to generate unique ids at the joblet level in IHyracksJobletContext.
- median() will use the unique ids for file ids when generating local run files.
- references to the generated local run files are kept in JobFileState at Joblet level.
- use NetworkManager to receive file reading requests.
- make PartitionManager keep track of file requests.
- currently, median() will use the same sort memory configured for the cluster.
- pass the sort memory to the median() via the type inferer.
- account for the memory required by median() when calculating the
required capacity of a query.
- median() used as a WINDOW function does not support ORDER BY and FRAME.
- refactor MaterializedPartition to PartitionFileReader.
Change-Id: Id8d03f42d54b5ed4cf316c31f0b4cce9dd7c1dc0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18210
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
index 1d9c57b..037aca3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
@@ -1219,10 +1219,14 @@
boolean isWin = BuiltinFunctions.isWindowFunction(fi);
boolean isWinAgg = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
BuiltinFunctions.WindowFunctionProperty.HAS_LIST_ARG);
- boolean prohibitOrderClause = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
- BuiltinFunctions.WindowFunctionProperty.NO_ORDER_CLAUSE);
- boolean prohibitFrameClause = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
- BuiltinFunctions.WindowFunctionProperty.NO_FRAME_CLAUSE);
+ boolean prohibitOrderClause = (isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
+ BuiltinFunctions.WindowFunctionProperty.NO_ORDER_CLAUSE))
+ || (!isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
+ BuiltinFunctions.AggregateFunctionProperty.NO_ORDER_CLAUSE));
+ boolean prohibitFrameClause = (isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
+ BuiltinFunctions.WindowFunctionProperty.NO_FRAME_CLAUSE))
+ || (!isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
+ BuiltinFunctions.AggregateFunctionProperty.NO_FRAME_CLAUSE));
boolean allowRespectIgnoreNulls = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
BuiltinFunctions.WindowFunctionProperty.ALLOW_RESPECT_IGNORE_NULLS);
boolean allowFromFirstLast = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 99678da..cb86e35 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -332,7 +332,7 @@
final AlgebricksAbsolutePartitionConstraint jobLocations =
getJobLocations(spec, nodeJobTracker, computationLocations);
final IClusterCapacity jobRequiredCapacity =
- ResourceUtils.getRequiredCapacity(plan, jobLocations, physOptConf);
+ ResourceUtils.getRequiredCapacity(plan, jobLocations, physOptConf, compilerProperties);
spec.setRequiredClusterCapacity(jobRequiredCapacity);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
index a16ac84..9e77922 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
@@ -18,25 +18,42 @@
*/
package org.apache.asterix.app.resource;
+import org.apache.asterix.common.config.CompilerProperties;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public class OperatorResourcesComputer {
+ private static final Logger LOGGER = LogManager.getLogger();
public static final int MIN_OPERATOR_CORES = 1;
private static final long MAX_BUFFER_PER_CONNECTION = 1L;
private final int numComputationPartitions;
private final long frameSize;
+ private final ExpressionMemoryComputer exprMemoryComputer;
+ private final CompilerProperties compilerProperties;
- public OperatorResourcesComputer(int numComputationPartitions, long frameSize) {
+ public OperatorResourcesComputer(int numComputationPartitions, long frameSize,
+ CompilerProperties compilerProperties) {
this.numComputationPartitions = numComputationPartitions;
this.frameSize = frameSize;
+ this.exprMemoryComputer = new ExpressionMemoryComputer();
+ this.compilerProperties = compilerProperties;
}
public int getOperatorRequiredCores(ILogicalOperator operator) {
@@ -52,10 +69,22 @@
return getExchangeRequiredMemory((ExchangeOperator) operator);
} else {
IPhysicalOperator physOp = ((AbstractLogicalOperator) operator).getPhysicalOperator();
- return getOperatorRequiredMemory(operator.getExecutionMode(), physOp.getLocalMemoryRequirements());
+ return getOperatorRequiredMemory(operator.getExecutionMode(), physOp.getLocalMemoryRequirements())
+ + getOperatorExpressionsRequiredMemory(operator);
}
}
+ private long getOperatorExpressionsRequiredMemory(ILogicalOperator operator) {
+ exprMemoryComputer.reset(operator);
+ try {
+ operator.acceptExpressionTransform(exprMemoryComputer);
+ } catch (Throwable e) {
+ // ignore
+ LOGGER.warn("encountered error while computing operator expressions required memory", e);
+ }
+ return exprMemoryComputer.requiredMemory;
+ }
+
private long getOperatorRequiredMemory(AbstractLogicalOperator.ExecutionMode opExecMode, long memorySize) {
if (opExecMode == AbstractLogicalOperator.ExecutionMode.PARTITIONED
|| opExecMode == AbstractLogicalOperator.ExecutionMode.LOCAL) {
@@ -78,4 +107,42 @@
}
return 2L * MAX_BUFFER_PER_CONNECTION * numComputationPartitions * numComputationPartitions * frameSize;
}
+
+ class ExpressionMemoryComputer implements ILogicalExpressionReferenceTransform {
+
+ private long requiredMemory;
+ private ILogicalOperator operator;
+
+ public ExpressionMemoryComputer() {
+ }
+
+ @Override
+ public boolean transform(Mutable<ILogicalExpression> expression) throws AlgebricksException {
+ ILogicalExpression expr = expression.getValue();
+ if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+ AbstractFunctionCallExpression funExpr = (AbstractFunctionCallExpression) expr;
+ if (funExpr.getKind() == AbstractFunctionCallExpression.FunctionKind.AGGREGATE) {
+ if (isMedian(funExpr.getFunctionIdentifier())) {
+ requiredMemory +=
+ (compilerProperties.getSortMemorySize() * numCompute(operator.getExecutionMode()));
+ }
+ }
+ }
+ return false;
+ }
+
+ private int numCompute(AbstractLogicalOperator.ExecutionMode executionMode) {
+ return (executionMode == AbstractLogicalOperator.ExecutionMode.PARTITIONED
+ || executionMode == AbstractLogicalOperator.ExecutionMode.LOCAL) ? numComputationPartitions : 1;
+ }
+
+ private boolean isMedian(FunctionIdentifier funId) {
+ return BuiltinFunctions.LOCAL_SQL_MEDIAN.equals(funId) || BuiltinFunctions.SQL_MEDIAN.equals(funId);
+ }
+
+ public void reset(ILogicalOperator op) {
+ requiredMemory = 0;
+ operator = op;
+ }
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
index ba11956..353bebf 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
@@ -24,6 +24,7 @@
import org.apache.asterix.app.resource.OperatorResourcesComputer;
import org.apache.asterix.app.resource.PlanStage;
import org.apache.asterix.app.resource.PlanStagesGenerator;
+import org.apache.asterix.common.config.CompilerProperties;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -38,25 +39,28 @@
}
/**
- * Calculates the required cluster capacity from a given query plan, the computation locations,
- * the operator memory budgets, and frame size.
+ * Calculates the required cluster capacity from a given query plan, the computation locations, the operator memory
+ * budgets, and frame size.
*
* @param plan,
- * a given query plan.
+ * a given query plan.
* @param computationLocations,
- * the partitions for computation.
+ * the partitions for computation.
* @param physicalOptimizationConfig,
- * a PhysicalOptimizationConfig.
+ * a PhysicalOptimizationConfig.
+ * @param compilerProperties
* @return the required cluster capacity for executing the query.
* @throws AlgebricksException
- * if the query plan is malformed.
+ * if the query plan is malformed.
*/
public static IClusterCapacity getRequiredCapacity(ILogicalPlan plan,
AlgebricksAbsolutePartitionConstraint computationLocations,
- PhysicalOptimizationConfig physicalOptimizationConfig) throws AlgebricksException {
+ PhysicalOptimizationConfig physicalOptimizationConfig, CompilerProperties compilerProperties)
+ throws AlgebricksException {
final int frameSize = physicalOptimizationConfig.getFrameSize();
final List<PlanStage> planStages = getStages(plan);
- return getStageBasedRequiredCapacity(planStages, computationLocations.getLocations().length, frameSize);
+ return getStageBasedRequiredCapacity(planStages, computationLocations.getLocations().length, frameSize,
+ compilerProperties);
}
public static List<PlanStage> getStages(ILogicalPlan plan) throws AlgebricksException {
@@ -68,8 +72,9 @@
}
public static IClusterCapacity getStageBasedRequiredCapacity(List<PlanStage> stages, int computationLocations,
- int frameSize) {
- final OperatorResourcesComputer computer = new OperatorResourcesComputer(computationLocations, frameSize);
+ int frameSize, CompilerProperties compilerProperties) {
+ final OperatorResourcesComputer computer =
+ new OperatorResourcesComputer(computationLocations, frameSize, compilerProperties);
final IClusterCapacity clusterCapacity = new ClusterCapacity();
final long maxRequiredMemory = stages.stream().mapToLong(stage -> stage.getRequiredMemory(computer)).max()
.orElseThrow(IllegalStateException::new);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
index b0de85e..77d0e60 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
@@ -340,7 +340,7 @@
}
}
final IClusterCapacity clusterCapacity =
- ResourceUtils.getStageBasedRequiredCapacity(stages, PARALLELISM, FRAME_SIZE);
+ ResourceUtils.getStageBasedRequiredCapacity(stages, PARALLELISM, FRAME_SIZE, null);
Assert.assertEquals(clusterCapacity.getAggregatedMemoryByteSize(), expectedMemory);
}
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java
index 218ef97..f8dbaad 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java
@@ -60,7 +60,7 @@
public void failedJobPartitionRequestTest() throws Exception {
final NodeControllerService nc1 = integrationUtil.ncs[0];
final NodeControllerService nc2 = integrationUtil.ncs[1];
- final JobId failedJob = new JobId(-1);
+ final JobId failedJob = new JobId(10);
nc2.getPartitionManager().jobCompleted(failedJob, JobStatus.FAILURE);
final NetworkAddress localNetworkAddress = nc2.getNetworkManager().getPublicNetworkAddress();
final InetSocketAddress nc2Address =
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
index 325c238..5ed4621 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
@@ -286,4 +286,9 @@
return AlgebricksConfig.QUERY_PLAN_SHAPE_DEFAULT;
return queryPlanShapeMode;
}
+
+ public int getSortMemoryFrames() {
+ int numFrames = (int) getSortMemorySize() / getFrameSize();
+ return Math.max(numFrames, OptimizationConfUtil.MIN_FRAME_LIMIT_FOR_SORT);
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
index ae70475..9307469 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
@@ -34,7 +34,7 @@
public class OptimizationConfUtil {
- private static final int MIN_FRAME_LIMIT_FOR_SORT = AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT;
+ public static final int MIN_FRAME_LIMIT_FOR_SORT = AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT;
private static final int MIN_FRAME_LIMIT_FOR_GROUP_BY = AbstractGroupByPOperator.MIN_FRAME_LIMIT_FOR_GROUP_BY;
private static final int MIN_FRAME_LIMIT_FOR_JOIN = AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN;
private static final int MIN_FRAME_LIMIT_FOR_WINDOW = WindowPOperator.MIN_FRAME_LIMIT_FOR_WINDOW;
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index 4db68c3..bb05ba6 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -95,6 +95,7 @@
import org.apache.asterix.om.typecomputer.impl.InjectFailureTypeComputer;
import org.apache.asterix.om.typecomputer.impl.Int64ArrayToStringTypeComputer;
import org.apache.asterix.om.typecomputer.impl.LocalAvgTypeComputer;
+import org.apache.asterix.om.typecomputer.impl.LocalMedianTypeComputer;
import org.apache.asterix.om.typecomputer.impl.LocalSingleVarStatisticsTypeComputer;
import org.apache.asterix.om.typecomputer.impl.MinMaxAggTypeComputer;
import org.apache.asterix.om.typecomputer.impl.MissingIfTypeComputer;
@@ -551,6 +552,8 @@
new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-intermediate-avg", 1);
public static final FunctionIdentifier LOCAL_AVG =
new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-avg", 1);
+ public static final FunctionIdentifier MEDIAN =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-median", 1);
public static final FunctionIdentifier FIRST_ELEMENT =
new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-first-element", 1);
public static final FunctionIdentifier LOCAL_FIRST_ELEMENT =
@@ -628,6 +631,8 @@
public static final FunctionIdentifier SCALAR_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "sum", 1);
public static final FunctionIdentifier SCALAR_MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "max", 1);
public static final FunctionIdentifier SCALAR_MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "min", 1);
+ public static final FunctionIdentifier SCALAR_MEDIAN =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "median", 1);
public static final FunctionIdentifier SCALAR_FIRST_ELEMENT =
new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "first-element", 1);
public static final FunctionIdentifier SCALAR_LOCAL_FIRST_ELEMENT =
@@ -802,6 +807,14 @@
new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-global-sql-avg", 1);
public static final FunctionIdentifier LOCAL_SQL_AVG =
new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-sql-avg", 1);
+ public static final FunctionIdentifier SQL_MEDIAN =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-sql-median", 1);
+ public static final FunctionIdentifier LOCAL_SQL_MEDIAN =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-sql-median", 1);
+ public static final FunctionIdentifier INTERMEDIATE_SQL_MEDIAN =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-intermediate-sql-median", 1);
+ public static final FunctionIdentifier GLOBAL_SQL_MEDIAN =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-global-sql-median", 1);
public static final FunctionIdentifier SQL_STDDEV_SAMP =
new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-sql-stddev_samp", 1);
public static final FunctionIdentifier INTERMEDIATE_SQL_STDDEV_SAMP =
@@ -869,6 +882,8 @@
new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "sql-max", 1);
public static final FunctionIdentifier SCALAR_SQL_MIN =
new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "sql-min", 1);
+ public static final FunctionIdentifier SCALAR_SQL_MEDIAN =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "sql-median", 1);
public static final FunctionIdentifier SCALAR_SQL_STDDEV_SAMP =
new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "sql-stddev_samp", 1);
public static final FunctionIdentifier SCALAR_SQL_STDDEV_POP =
@@ -2143,6 +2158,13 @@
addPrivateFunction(GLOBAL_SQL_UNION_MBR, ARectangleTypeComputer.INSTANCE, true);
addFunction(SCALAR_SQL_UNION_MBR, ARectangleTypeComputer.INSTANCE, true);
+ addFunction(SQL_MEDIAN, NullableDoubleTypeComputer.INSTANCE, true);
+ addFunction(SCALAR_MEDIAN, NullableDoubleTypeComputer.INSTANCE, true);
+ addFunction(SCALAR_SQL_MEDIAN, NullableDoubleTypeComputer.INSTANCE, true);
+ addPrivateFunction(LOCAL_SQL_MEDIAN, LocalMedianTypeComputer.INSTANCE, true);
+ addPrivateFunction(INTERMEDIATE_SQL_MEDIAN, LocalMedianTypeComputer.INSTANCE, true);
+ addPrivateFunction(GLOBAL_SQL_MEDIAN, NullableDoubleTypeComputer.INSTANCE, true);
+
addPrivateFunction(SERIAL_AVG, NullableDoubleTypeComputer.INSTANCE, true);
addPrivateFunction(SERIAL_COUNT, AInt64TypeComputer.INSTANCE, true);
addPrivateFunction(SERIAL_GLOBAL_AVG, NullableDoubleTypeComputer.INSTANCE, true);
@@ -3169,6 +3191,25 @@
addDistinctAgg(SQL_SUM_DISTINCT, SQL_SUM);
addScalarAgg(SQL_SUM_DISTINCT, SCALAR_SQL_SUM_DISTINCT);
+ // SQL MEDIAN
+ addAgg(SQL_MEDIAN);
+ addAgg(LOCAL_SQL_MEDIAN);
+ addAgg(GLOBAL_SQL_MEDIAN);
+
+ addLocalAgg(SQL_MEDIAN, LOCAL_SQL_MEDIAN);
+
+ addIntermediateAgg(SQL_MEDIAN, INTERMEDIATE_SQL_MEDIAN);
+ addIntermediateAgg(LOCAL_SQL_MEDIAN, INTERMEDIATE_SQL_MEDIAN);
+ addIntermediateAgg(GLOBAL_SQL_MEDIAN, INTERMEDIATE_SQL_MEDIAN);
+
+ addGlobalAgg(SQL_MEDIAN, GLOBAL_SQL_MEDIAN);
+
+ addScalarAgg(MEDIAN, SCALAR_MEDIAN);
+ addScalarAgg(SQL_MEDIAN, SCALAR_SQL_MEDIAN);
+
+ registerAggFunctionProperties(SCALAR_SQL_MEDIAN, AggregateFunctionProperty.NO_FRAME_CLAUSE,
+ AggregateFunctionProperty.NO_ORDER_CLAUSE);
+
// SPATIAL AGGREGATES
addAgg(ST_UNION_AGG);
@@ -3203,6 +3244,13 @@
interface BuiltinFunctionProperty {
}
+ public enum AggregateFunctionProperty implements BuiltinFunctionProperty {
+ /** Whether the order clause is prohibited */
+ NO_ORDER_CLAUSE,
+ /** Whether the frame clause is prohibited */
+ NO_FRAME_CLAUSE
+ }
+
public enum WindowFunctionProperty implements BuiltinFunctionProperty {
/** Whether the order clause is prohibited */
NO_ORDER_CLAUSE,
@@ -3365,6 +3413,11 @@
registeredFunctions.put(functionInfo.getFunctionIdentifier(), functionInfo);
}
+ private static <T extends Enum<T> & BuiltinFunctionProperty> void registerAggFunctionProperties(
+ FunctionIdentifier fid, AggregateFunctionProperty... properties) {
+ registerFunctionProperties(fid, AggregateFunctionProperty.class, properties);
+ }
+
private static <T extends Enum<T> & BuiltinFunctionProperty> void registerFunctionProperties(FunctionIdentifier fid,
Class<T> propertyClass, T[] properties) {
if (properties == null) {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/LocalMedianTypeComputer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/LocalMedianTypeComputer.java
new file mode 100644
index 0000000..3411ef4
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/LocalMedianTypeComputer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.typecomputer.impl;
+
+import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+
+public class LocalMedianTypeComputer implements IResultTypeComputer {
+
+ public static final LocalMedianTypeComputer INSTANCE = new LocalMedianTypeComputer();
+
+ public static final ARecordType REC_TYPE = new ARecordType(null,
+ new String[] { "count", "handle", "address", "port" },
+ new IAType[] { BuiltinType.AINT64, BuiltinType.AINT64, BuiltinType.ASTRING, BuiltinType.AINT32 }, false);
+
+ @Override
+ public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
+ IMetadataProvider<?, ?> metadataProvider) throws AlgebricksException {
+ return REC_TYPE;
+ }
+}
diff --git a/asterixdb/asterix-om/src/test/java/org/apache/asterix/test/om/typecomputer/TypeComputerTest.java b/asterixdb/asterix-om/src/test/java/org/apache/asterix/test/om/typecomputer/TypeComputerTest.java
index a857981..07f2021 100644
--- a/asterixdb/asterix-om/src/test/java/org/apache/asterix/test/om/typecomputer/TypeComputerTest.java
+++ b/asterixdb/asterix-om/src/test/java/org/apache/asterix/test/om/typecomputer/TypeComputerTest.java
@@ -36,6 +36,7 @@
import org.apache.asterix.om.typecomputer.impl.ClosedRecordConstructorResultType;
import org.apache.asterix.om.typecomputer.impl.InjectFailureTypeComputer;
import org.apache.asterix.om.typecomputer.impl.LocalAvgTypeComputer;
+import org.apache.asterix.om.typecomputer.impl.LocalMedianTypeComputer;
import org.apache.asterix.om.typecomputer.impl.LocalSingleVarStatisticsTypeComputer;
import org.apache.asterix.om.typecomputer.impl.NullableDoubleTypeComputer;
import org.apache.asterix.om.typecomputer.impl.OpenRecordConstructorResultType;
@@ -190,6 +191,7 @@
differentBehaviorFunctions.add(RecordRemoveFieldsTypeComputer.class.getSimpleName());
differentBehaviorFunctions.add(ClosedRecordConstructorResultType.class.getSimpleName());
differentBehaviorFunctions.add(LocalAvgTypeComputer.class.getSimpleName());
+ differentBehaviorFunctions.add(LocalMedianTypeComputer.class.getSimpleName());
differentBehaviorFunctions.add(BooleanOnlyTypeComputer.class.getSimpleName());
// differentBehaviorFunctions.add("AMissingTypeComputer"); // TODO What type computer is this?
differentBehaviorFunctions.add(NullableDoubleTypeComputer.class.getSimpleName());
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/scalar/ScalarSqlMedianAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/scalar/ScalarSqlMedianAggregateDescriptor.java
new file mode 100644
index 0000000..5c83a5e
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/scalar/ScalarSqlMedianAggregateDescriptor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.scalar;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.om.functions.IFunctionTypeInferer;
+import org.apache.asterix.runtime.aggregates.std.SqlMedianAggregateDescriptor;
+import org.apache.asterix.runtime.functions.FunctionTypeInferers;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarSqlMedianAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+
+ @Override
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new ScalarSqlMedianAggregateDescriptor();
+ }
+
+ @Override
+ public IFunctionTypeInferer createFunctionTypeInferer() {
+ return FunctionTypeInferers.MEDIAN_MEMORY;
+ }
+ };
+
+ private ScalarSqlMedianAggregateDescriptor() {
+ super(SqlMedianAggregateDescriptor.FACTORY);
+ }
+
+ @Override
+ public void setImmutableStates(Object... states) {
+ super.setImmutableStates(states);
+ aggFuncDesc.setImmutableStates(states);
+ }
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return BuiltinFunctions.SCALAR_SQL_MEDIAN;
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractLocalMedianAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractLocalMedianAggregateFunction.java
new file mode 100644
index 0000000..8ff7ed5
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractLocalMedianAggregateFunction.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.aggregates.std;
+
+import java.util.List;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.comm.IFrameTupleAppender;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.partitions.JobFileState;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
+import org.apache.hyracks.dataflow.std.sort.Algorithm;
+import org.apache.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
+import org.apache.hyracks.dataflow.std.sort.ExternalSortRunMerger;
+import org.apache.hyracks.dataflow.std.sort.ISorter;
+
+public abstract class AbstractLocalMedianAggregateFunction extends AbstractMedianAggregateFunction {
+
+ protected final ExternalSortRunGenerator runsGenerator;
+ protected final IFrameTupleAppender appender;
+ private final ArrayTupleBuilder tupleBuilder;
+ private final int numFrames;
+ private ExternalSortRunMerger runsMerger;
+
+ public AbstractLocalMedianAggregateFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context,
+ SourceLocation sourceLoc, int numFrames) throws HyracksDataException {
+ super(args, context, sourceLoc);
+ this.numFrames = numFrames;
+ appender = new FrameTupleAppender(frame);
+ tupleBuilder = new ArrayTupleBuilder(1);
+ runsGenerator = new ExternalSortRunGenerator(context.getTaskContext(), new int[] { 0 },
+ new INormalizedKeyComputerFactory[] { doubleNkComputerFactory },
+ new IBinaryComparatorFactory[] { doubleComparatorFactory }, recordDesc, Algorithm.MERGE_SORT,
+ EnumFreeSlotPolicy.LAST_FIT, numFrames, Integer.MAX_VALUE);
+
+ }
+
+ @Override
+ public void init() throws HyracksDataException {
+ super.init();
+ appender.reset(frame, true);
+ runsGenerator.open();
+ runsGenerator.getSorter().reset();
+ }
+
+ protected void processDataValue(IFrameTupleReference tuple) throws HyracksDataException {
+ eval.evaluate(tuple, inputVal);
+ byte[] data = inputVal.getByteArray();
+ int start = inputVal.getStartOffset();
+ ATypeTag tag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(data[start]);
+ if (ATypeHierarchy.getTypeDomain(tag) == ATypeHierarchy.Domain.NUMERIC) {
+ count++;
+ aDouble.setValue(ATypeHierarchy.getDoubleValue(MEDIAN, 0, data, start));
+ tupleBuilder.reset();
+ tupleBuilder.addField(doubleSerde, aDouble);
+ FrameUtils.appendToWriter(runsGenerator, appender, tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0, tupleBuilder.getSize());
+ }
+ }
+
+ protected void finishLocalPartial(IPointable result) throws HyracksDataException {
+ if (count == 0) {
+ setPartialResult(result, -1, "", -1);
+ return;
+ }
+ if (appender.getTupleCount() > 0) {
+ appender.write(runsGenerator, true);
+ }
+ // close to sort the in-memory data or write out sorted data to run files
+ runsGenerator.close();
+
+ IHyracksTaskContext taskCtx = ctx.getTaskContext();
+ IHyracksJobletContext jobletCtx = taskCtx.getJobletContext();
+ NetworkAddress netAddress = ((NodeControllerService) jobletCtx.getServiceContext().getControllerService())
+ .getNetworkManager().getPublicNetworkAddress();
+ FileReference fileRef = writeRunFile(taskCtx, jobletCtx);
+ long fileId = jobletCtx.nextUniqueId();
+ taskCtx.setStateObject(new JobFileState(fileRef, jobletCtx.getJobId(), fileId));
+ setPartialResult(result, fileId, netAddress.getAddress(), netAddress.getPort());
+ }
+
+ private FileReference writeRunFile(IHyracksTaskContext taskCtx, IHyracksJobletContext jobletCtx)
+ throws HyracksDataException {
+ List<GeneratedRunFileReader> runs = runsGenerator.getRuns();
+ FileReference managedFile;
+ if (runs.isEmpty()) {
+ managedFile = jobletCtx.createManagedWorkspaceFile(MEDIAN);
+ writeMemoryDataToRunFile(managedFile, taskCtx);
+ } else if (runs.size() == 1) {
+ managedFile = runs.get(0).getFile();
+ } else {
+ managedFile = jobletCtx.createManagedWorkspaceFile(MEDIAN);
+ mergeRunsToRunFile(managedFile, taskCtx, runs);
+ }
+ return managedFile;
+ }
+
+ private void mergeRunsToRunFile(FileReference managedFile, IHyracksTaskContext taskCtx,
+ List<GeneratedRunFileReader> runs) throws HyracksDataException {
+ createOrResetRunsMerger(runs);
+ RunFileWriter runFileWriter = new RunFileWriter(managedFile, taskCtx.getIoManager());
+ IFrameWriter wrappingWriter = runsMerger.prepareFinalMergeResultWriter(runFileWriter);
+ try {
+ wrappingWriter.open();
+ runsMerger.process(wrappingWriter);
+ } finally {
+ wrappingWriter.close();
+ }
+ }
+
+ protected RunFileWriter writeMemoryDataToRunFile(FileReference managedFile, IHyracksTaskContext taskCtx)
+ throws HyracksDataException {
+ RunFileWriter runFileWriter = new RunFileWriter(managedFile, taskCtx.getIoManager());
+ try {
+ runFileWriter.open();
+ ISorter sorter = runsGenerator.getSorter();
+ if (sorter.hasRemaining()) {
+ sorter.flush(runFileWriter);
+ }
+ } finally {
+ runFileWriter.close();
+ }
+ return runFileWriter;
+ }
+
+ private void createOrResetRunsMerger(List<GeneratedRunFileReader> runs) {
+ if (runsMerger == null) {
+ IBinaryComparator[] comparators =
+ new IBinaryComparator[] { doubleComparatorFactory.createBinaryComparator() };
+ INormalizedKeyComputer nmkComputer = doubleNkComputerFactory.createNormalizedKeyComputer();
+ runsMerger = new ExternalSortRunMerger(ctx.getTaskContext(), runs, new int[] { 0 }, comparators,
+ nmkComputer, recordDesc, numFrames, Integer.MAX_VALUE);
+ } else {
+ runsMerger.reset(runs);
+ }
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMedianAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMedianAggregateFunction.java
new file mode 100644
index 0000000..a695c23
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMedianAggregateFunction.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.aggregates.std;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.NormalizedKeyComputerFactoryProvider;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.TypeTraitProvider;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AMutableDouble;
+import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.asterix.om.base.AMutableInt64;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.typecomputer.impl.LocalMedianTypeComputer;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
+import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameReader;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.comm.channels.FileNetworkInputChannel;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.net.NetworkManager;
+import org.apache.hyracks.control.nc.partitions.JobFileState;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.collectors.InputChannelFrameReader;
+import org.apache.hyracks.dataflow.std.sort.RunMergingFrameReader;
+import org.apache.hyracks.util.string.UTF8StringUtil;
+
+public abstract class AbstractMedianAggregateFunction extends AbstractAggregateFunction {
+
+ protected static final String MEDIAN = "median";
+ private static final int COUNT_FIELD_ID = 0;
+ private static final int HANDLE_FIELD_ID = 1;
+ private static final int ADDRESS_FIELD_ID = 2;
+ private static final int PORT_FIELD_ID = 3;
+
+ private final ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<AString> stringSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING);
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<AInt64> longSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<AInt32> intSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+ @SuppressWarnings("unchecked")
+ protected final ISerializerDeserializer<ADouble> doubleSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+ protected final IBinaryComparatorFactory doubleComparatorFactory =
+ BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(ATypeTag.DOUBLE, true);
+ protected final INormalizedKeyComputerFactory doubleNkComputerFactory =
+ NormalizedKeyComputerFactoryProvider.INSTANCE.getNormalizedKeyComputerFactory(BuiltinType.ADOUBLE, true);
+ protected final RecordDescriptor recordDesc = new RecordDescriptor(new ISerializerDeserializer[] { doubleSerde },
+ new ITypeTraits[] { TypeTraitProvider.INSTANCE.getTypeTrait(BuiltinType.ADOUBLE) });
+
+ protected final AMutableString aString = new AMutableString("");
+ protected final AMutableInt64 aInt64 = new AMutableInt64(0);
+ protected final AMutableInt32 aInt32 = new AMutableInt32(0);
+ protected final AMutableDouble aDouble = new AMutableDouble(0);
+ protected final IPointable inputVal = new VoidPointable();
+ private final FrameTupleReference ftr = new FrameTupleReference();
+ private final FrameTupleAccessor fta = new FrameTupleAccessor(recordDesc);
+ protected final List<IFrameReader> readers = new ArrayList<>();
+
+ protected final IScalarEvaluator eval;
+ protected final IEvaluatorContext ctx;
+ protected final IFrame frame;
+ protected long count;
+ private List<IFrame> inFrames;
+ private List<PartialResult> partialResults;
+ private RecordBuilder recBuilder;
+
+ public AbstractMedianAggregateFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context,
+ SourceLocation sourceLoc) throws HyracksDataException {
+ super(sourceLoc);
+ ctx = context;
+ eval = args[0].createScalarEvaluator(context);
+ frame = new VSizeFrame(context.getTaskContext());
+ }
+
+ @Override
+ public void init() throws HyracksDataException {
+ if (partialResults == null) {
+ partialResults = new ArrayList<>();
+ }
+ if (recBuilder == null) {
+ recBuilder = new RecordBuilder();
+ recBuilder.reset(LocalMedianTypeComputer.REC_TYPE);
+ }
+ count = 0;
+ partialResults.clear();
+ recBuilder.init();
+ }
+
+ protected void processPartialResults(IFrameTupleReference tuple) throws HyracksDataException {
+ eval.evaluate(tuple, inputVal);
+ byte[] serBytes = inputVal.getByteArray();
+ int offset = inputVal.getStartOffset();
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[offset]);
+ if (typeTag == ATypeTag.OBJECT) {
+ long handleCount = AInt64SerializerDeserializer.getLong(serBytes,
+ ARecordSerializerDeserializer.getFieldOffsetById(serBytes, offset, COUNT_FIELD_ID, 0, false));
+ if (handleCount == 0) {
+ return;
+ }
+ count += handleCount;
+
+ long fileId = AInt64SerializerDeserializer.getLong(serBytes,
+ ARecordSerializerDeserializer.getFieldOffsetById(serBytes, offset, HANDLE_FIELD_ID, 0, false));
+
+ String address = UTF8StringUtil.toString(serBytes,
+ ARecordSerializerDeserializer.getFieldOffsetById(serBytes, offset, ADDRESS_FIELD_ID, 0, false));
+
+ int port = AInt32SerializerDeserializer.getInt(serBytes,
+ ARecordSerializerDeserializer.getFieldOffsetById(serBytes, offset, PORT_FIELD_ID, 0, false));
+
+ partialResults.add(new PartialResult(fileId, handleCount, address, port));
+ } else {
+ throw new UnsupportedItemTypeException(sourceLoc, MEDIAN, serBytes[offset]);
+ }
+ }
+
+ protected void finishPartialResult(IPointable result) throws HyracksDataException {
+ if (count == 0) {
+ setPartialResult(result, -1, "", -1);
+ return;
+ }
+
+ IHyracksTaskContext taskCtx = ctx.getTaskContext();
+ IHyracksJobletContext jobletCtx = taskCtx.getJobletContext();
+ RunMergingFrameReader merger = createRunsMergingFrameReader();
+ FileReference managedFile = jobletCtx.createManagedWorkspaceFile(MEDIAN);
+ RunFileWriter runFileWriter = new RunFileWriter(managedFile, taskCtx.getIoManager());
+ merger.open();
+ runFileWriter.open();
+ try {
+ while (merger.nextFrame(frame)) {
+ runFileWriter.nextFrame(frame.getBuffer());
+ }
+ } finally {
+ runFileWriter.close();
+ merger.close();
+ }
+
+ NetworkAddress netAddress = ((NodeControllerService) jobletCtx.getServiceContext().getControllerService())
+ .getNetworkManager().getPublicNetworkAddress();
+
+ long fileId = jobletCtx.nextUniqueId();
+ taskCtx.setStateObject(new JobFileState(managedFile, jobletCtx.getJobId(), fileId));
+ setPartialResult(result, fileId, netAddress.getAddress(), netAddress.getPort());
+ }
+
+ protected void finishFinalResult(IPointable result) throws HyracksDataException {
+ if (count == 0) {
+ PointableHelper.setNull(result);
+ return;
+ }
+ try {
+ double medianVal = findMedian();
+ resultStorage.reset();
+ aDouble.setValue(medianVal);
+ doubleSerde.serialize(aDouble, resultStorage.getDataOutput());
+ result.set(resultStorage);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private double findMedian() throws HyracksDataException {
+ RunMergingFrameReader merger = createRunsMergingFrameReader();
+ return getMedian(merger);
+ }
+
+ protected RunMergingFrameReader createRunsMergingFrameReader() throws HyracksDataException {
+ IHyracksTaskContext taskCtx = ctx.getTaskContext();
+ IHyracksJobletContext jobletCtx = taskCtx.getJobletContext();
+ INCServiceContext serviceCtx = jobletCtx.getServiceContext();
+ NetworkManager netManager = ((NodeControllerService) serviceCtx.getControllerService()).getNetworkManager();
+ List<IFrame> inFrames = getInFrames(partialResults.size(), taskCtx);
+ readers.clear();
+ for (PartialResult partialResult : partialResults) {
+ IFrameReader inputChannelReader = createInputChannel(netManager, taskCtx,
+ new NetworkAddress(partialResult.address, partialResult.port), jobletCtx.getJobId().getId(),
+ partialResult.fileId);
+ readers.add(inputChannelReader);
+ }
+ return new RunMergingFrameReader(taskCtx, readers, inFrames, new int[] { 0 },
+ new IBinaryComparator[] { doubleComparatorFactory.createBinaryComparator() },
+ doubleNkComputerFactory.createNormalizedKeyComputer(), recordDesc);
+ }
+
+ private double getMedian(RunMergingFrameReader merger) throws HyracksDataException {
+ boolean isOdd = count % 2 != 0;
+ long medianPosition = isOdd ? count / 2 : (count - 1) / 2;
+ long currentTupleCount = 0;
+ double medianVal = -1;
+ merger.open();
+ try {
+ while (merger.nextFrame(frame)) {
+ fta.reset(frame.getBuffer());
+ int tupleCount = fta.getTupleCount();
+ if (currentTupleCount + tupleCount > medianPosition) {
+ int firstMedian = (int) (medianPosition - currentTupleCount);
+ ftr.reset(fta, firstMedian);
+ medianVal = ADoubleSerializerDeserializer.getDouble(ftr.getFieldData(0), ftr.getFieldStart(0) + 1);
+ if (!isOdd) {
+ if (firstMedian + 1 < tupleCount) {
+ // second median is in the same frame
+ ftr.reset(fta, firstMedian + 1);
+ } else {
+ // second median is in the next frame
+ merger.nextFrame(frame);
+ fta.reset(frame.getBuffer());
+ ftr.reset(fta, 0);
+ }
+ medianVal =
+ (ADoubleSerializerDeserializer.getDouble(ftr.getFieldData(0), ftr.getFieldStart(0) + 1)
+ + medianVal) / 2;
+ }
+ break;
+ }
+ currentTupleCount += tupleCount;
+ }
+ } finally {
+ merger.close();
+ }
+ return medianVal;
+ }
+
+ protected void setPartialResult(IPointable result, long fileId, String address, int port)
+ throws HyracksDataException {
+ try {
+ resultStorage.reset();
+ aInt64.setValue(count);
+ longSerde.serialize(aInt64, resultStorage.getDataOutput());
+ recBuilder.addField(COUNT_FIELD_ID, resultStorage);
+
+ resultStorage.reset();
+ aInt64.setValue(fileId);
+ longSerde.serialize(aInt64, resultStorage.getDataOutput());
+ recBuilder.addField(HANDLE_FIELD_ID, resultStorage);
+
+ resultStorage.reset();
+ aString.setValue(address);
+ stringSerde.serialize(aString, resultStorage.getDataOutput());
+ recBuilder.addField(ADDRESS_FIELD_ID, resultStorage);
+
+ resultStorage.reset();
+ aInt32.setValue(port);
+ intSerde.serialize(aInt32, resultStorage.getDataOutput());
+ recBuilder.addField(PORT_FIELD_ID, resultStorage);
+
+ resultStorage.reset();
+ recBuilder.write(resultStorage.getDataOutput(), true);
+ result.set(resultStorage);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ protected List<IFrame> getInFrames(int size, IHyracksTaskContext taskCtx) throws HyracksDataException {
+ if (inFrames == null) {
+ inFrames = new ArrayList<>(size);
+ }
+ int k = 0;
+ for (int inFramesSize = inFrames.size(); k < size && k < inFramesSize; k++) {
+ inFrames.get(k).reset();
+ }
+ for (; k < size; k++) {
+ inFrames.add(new VSizeFrame(taskCtx));
+ }
+ return inFrames;
+ }
+
+ private IFrameReader createInputChannel(NetworkManager netManager, IHyracksTaskContext taskContext,
+ NetworkAddress networkAddress, long jobId, long fileId) throws HyracksDataException {
+ FileNetworkInputChannel FileNetworkInputChannel =
+ new FileNetworkInputChannel(netManager, getSocketAddress(networkAddress), jobId, fileId);
+ InputChannelFrameReader channelFrameReader = new InputChannelFrameReader(FileNetworkInputChannel);
+ FileNetworkInputChannel.registerMonitor(channelFrameReader);
+ FileNetworkInputChannel.open(taskContext);
+ return channelFrameReader;
+ }
+
+ private static SocketAddress getSocketAddress(NetworkAddress netAddress) throws HyracksDataException {
+ try {
+ return new InetSocketAddress(InetAddress.getByAddress(netAddress.lookupIpAddress()), netAddress.getPort());
+ } catch (UnknownHostException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ public static class PartialResult {
+
+ long fileId;
+ String address;
+ int port;
+ long count;
+
+ PartialResult(long fileId, long count, String address, int port) {
+ this.fileId = fileId;
+ this.count = count;
+ this.address = address;
+ this.port = port;
+ }
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlMedianAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlMedianAggregateDescriptor.java
new file mode 100644
index 0000000..faf5272
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlMedianAggregateDescriptor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class GlobalSqlMedianAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final IFunctionDescriptorFactory FACTORY = GlobalSqlMedianAggregateDescriptor::new;
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return BuiltinFunctions.GLOBAL_SQL_MEDIAN;
+ }
+
+ @Override
+ public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) {
+ return new IAggregateEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IAggregateEvaluator createAggregateEvaluator(final IEvaluatorContext ctx)
+ throws HyracksDataException {
+ return new GlobalSqlMedianAggregateFunction(args, ctx, sourceLoc);
+ }
+ };
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlMedianAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlMedianAggregateFunction.java
new file mode 100644
index 0000000..c9ee8d8
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlMedianAggregateFunction.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.aggregates.std;
+
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class GlobalSqlMedianAggregateFunction extends AbstractMedianAggregateFunction {
+
+ public GlobalSqlMedianAggregateFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context,
+ SourceLocation sourceLoc) throws HyracksDataException {
+ super(args, context, sourceLoc);
+
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws HyracksDataException {
+ processPartialResults(tuple);
+ }
+
+ @Override
+ public void finish(IPointable result) throws HyracksDataException {
+ finishFinalResult(result);
+ }
+
+ @Override
+ public void finishPartial(IPointable result) throws HyracksDataException {
+ finishPartialResult(result);
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlMedianAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlMedianAggregateDescriptor.java
new file mode 100644
index 0000000..532cb5f
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlMedianAggregateDescriptor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class IntermediateSqlMedianAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final IFunctionDescriptorFactory FACTORY = IntermediateSqlMedianAggregateDescriptor::new;
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return BuiltinFunctions.INTERMEDIATE_SQL_MEDIAN;
+ }
+
+ @Override
+ public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) {
+ return new IAggregateEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IAggregateEvaluator createAggregateEvaluator(final IEvaluatorContext ctx)
+ throws HyracksDataException {
+ return new IntermediateSqlMedianAggregateFunction(args, ctx, sourceLoc);
+ }
+ };
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlMedianAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlMedianAggregateFunction.java
new file mode 100644
index 0000000..d78c17f
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlMedianAggregateFunction.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.aggregates.std;
+
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class IntermediateSqlMedianAggregateFunction extends AbstractMedianAggregateFunction {
+
+ public IntermediateSqlMedianAggregateFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context,
+ SourceLocation sourceLoc) throws HyracksDataException {
+ super(args, context, sourceLoc);
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws HyracksDataException {
+ processPartialResults(tuple);
+ }
+
+ @Override
+ public void finish(IPointable result) throws HyracksDataException {
+ finishPartialResult(result);
+ }
+
+ @Override
+ public void finishPartial(IPointable result) throws HyracksDataException {
+ finishPartialResult(result);
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlMedianAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlMedianAggregateDescriptor.java
new file mode 100644
index 0000000..50f36ce
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlMedianAggregateDescriptor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.om.functions.IFunctionTypeInferer;
+import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import org.apache.asterix.runtime.functions.FunctionTypeInferers;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class LocalSqlMedianAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+
+ @Override
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new LocalSqlMedianAggregateDescriptor();
+ }
+
+ @Override
+ public IFunctionTypeInferer createFunctionTypeInferer() {
+ return FunctionTypeInferers.MEDIAN_MEMORY;
+ }
+ };
+
+ private int numFrames = 0;
+
+ @Override
+ public void setImmutableStates(Object... states) {
+ this.numFrames = (int) states[0];
+ }
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return BuiltinFunctions.LOCAL_SQL_MEDIAN;
+ }
+
+ @Override
+ public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) {
+ return new IAggregateEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IAggregateEvaluator createAggregateEvaluator(final IEvaluatorContext ctx)
+ throws HyracksDataException {
+ return new LocalSqlMedianAggregateFunction(args, ctx, sourceLoc, numFrames);
+ }
+ };
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlMedianAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlMedianAggregateFunction.java
new file mode 100644
index 0000000..5400ca2
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlMedianAggregateFunction.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.aggregates.std;
+
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class LocalSqlMedianAggregateFunction extends AbstractLocalMedianAggregateFunction {
+
+ public LocalSqlMedianAggregateFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context,
+ SourceLocation sourceLoc, int numFrames) throws HyracksDataException {
+ super(args, context, sourceLoc, numFrames);
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws HyracksDataException {
+ processDataValue(tuple);
+ }
+
+ @Override
+ public void finishPartial(IPointable result) throws HyracksDataException {
+ finishLocalPartial(result);
+ }
+
+ @Override
+ public void finish(IPointable result) throws HyracksDataException {
+ finishLocalPartial(result);
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlMedianAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlMedianAggregateDescriptor.java
new file mode 100644
index 0000000..b98e389
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlMedianAggregateDescriptor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.aggregates.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.om.functions.IFunctionTypeInferer;
+import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import org.apache.asterix.runtime.functions.FunctionTypeInferers;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class SqlMedianAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+
+ @Override
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new SqlMedianAggregateDescriptor();
+ }
+
+ @Override
+ public IFunctionTypeInferer createFunctionTypeInferer() {
+ return FunctionTypeInferers.MEDIAN_MEMORY;
+ }
+ };
+
+ private int numFrames = 0;
+
+ @Override
+ public void setImmutableStates(Object... states) {
+ this.numFrames = (int) states[0];
+ }
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return BuiltinFunctions.SQL_MEDIAN;
+ }
+
+ @Override
+ public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) {
+ return new IAggregateEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IAggregateEvaluator createAggregateEvaluator(final IEvaluatorContext ctx)
+ throws HyracksDataException {
+ return new SqlMedianAggregateFunction(args, ctx, sourceLoc, numFrames);
+ }
+ };
+ }
+
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlMedianAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlMedianAggregateFunction.java
new file mode 100644
index 0000000..1dbbfe6
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlMedianAggregateFunction.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.aggregates.std;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.sort.RunMergingFrameReader;
+
+public class SqlMedianAggregateFunction extends AbstractLocalMedianAggregateFunction {
+
+ public SqlMedianAggregateFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context,
+ SourceLocation sourceLoc, int numFrames) throws HyracksDataException {
+ super(args, context, sourceLoc, numFrames);
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws HyracksDataException {
+ processDataValue(tuple);
+ }
+
+ @Override
+ public void finish(IPointable result) throws HyracksDataException {
+ if (appender.getTupleCount() > 0) {
+ appender.write(runsGenerator, true);
+ }
+ // close to sort the in-memory data or write out sorted data to run files
+ runsGenerator.close();
+ super.finishFinalResult(result);
+ }
+
+ @Override
+ public void finishPartial(IPointable result) throws HyracksDataException {
+ finishLocalPartial(result);
+ }
+
+ @Override
+ protected RunMergingFrameReader createRunsMergingFrameReader() throws HyracksDataException {
+ IHyracksTaskContext taskCtx = ctx.getTaskContext();
+ List<GeneratedRunFileReader> runs = runsGenerator.getRuns();
+ readers.clear();
+ if (runs.isEmpty()) {
+ //TODO: no need to write memory to run file, should just read the sorted data out of the sorter
+ FileReference managedFile = taskCtx.createManagedWorkspaceFile(MEDIAN);
+ RunFileWriter runFileWriter = writeMemoryDataToRunFile(managedFile, taskCtx);
+ GeneratedRunFileReader deleteOnCloseReader = runFileWriter.createDeleteOnCloseReader();
+ readers.add(deleteOnCloseReader);
+ } else {
+ readers.addAll(runs);
+ }
+
+ List<IFrame> inFrames = getInFrames(readers.size(), taskCtx);
+ return new RunMergingFrameReader(taskCtx, readers, inFrames, new int[] { 0 },
+ new IBinaryComparator[] { doubleComparatorFactory.createBinaryComparator() },
+ doubleNkComputerFactory.createNormalizedKeyComputer(), recordDesc);
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
index 4b418b6..2b8d4e2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
@@ -54,6 +54,7 @@
import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlKurtosisDistinctAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlMaxAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlMaxDistinctAggregateDescriptor;
+import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlMedianAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlMinAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlMinDistinctAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlSkewnessAggregateDescriptor;
@@ -156,6 +157,7 @@
import org.apache.asterix.runtime.aggregates.std.GlobalSqlAvgAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.std.GlobalSqlKurtosisAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.std.GlobalSqlMaxAggregateDescriptor;
+import org.apache.asterix.runtime.aggregates.std.GlobalSqlMedianAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.std.GlobalSqlMinAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.std.GlobalSqlSkewnessAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.std.GlobalSqlStddevAggregateDescriptor;
@@ -178,6 +180,7 @@
import org.apache.asterix.runtime.aggregates.std.IntermediateSqlAvgAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.std.IntermediateSqlKurtosisAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.std.IntermediateSqlMaxAggregateDescriptor;
+import org.apache.asterix.runtime.aggregates.std.IntermediateSqlMedianAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.std.IntermediateSqlMinAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.std.IntermediateSqlSkewnessAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.std.IntermediateSqlStddevAggregateDescriptor;
@@ -202,6 +205,7 @@
import org.apache.asterix.runtime.aggregates.std.LocalSqlAvgAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.std.LocalSqlKurtosisAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.std.LocalSqlMaxAggregateDescriptor;
+import org.apache.asterix.runtime.aggregates.std.LocalSqlMedianAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.std.LocalSqlMinAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.std.LocalSqlSkewnessAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.std.LocalSqlStddevAggregateDescriptor;
@@ -224,6 +228,7 @@
import org.apache.asterix.runtime.aggregates.std.SqlCountAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.std.SqlKurtosisAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.std.SqlMaxAggregateDescriptor;
+import org.apache.asterix.runtime.aggregates.std.SqlMedianAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.std.SqlMinAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.std.SqlSkewnessAggregateDescriptor;
import org.apache.asterix.runtime.aggregates.std.SqlStddevAggregateDescriptor;
@@ -816,6 +821,10 @@
fc.add(LocalSqlMinAggregateDescriptor.FACTORY);
fc.add(IntermediateSqlMinAggregateDescriptor.FACTORY);
fc.add(GlobalSqlMinAggregateDescriptor.FACTORY);
+ fc.add(SqlMedianAggregateDescriptor.FACTORY);
+ fc.add(LocalSqlMedianAggregateDescriptor.FACTORY);
+ fc.add(IntermediateSqlMedianAggregateDescriptor.FACTORY);
+ fc.add(GlobalSqlMedianAggregateDescriptor.FACTORY);
fc.add(SqlStddevAggregateDescriptor.FACTORY);
fc.add(LocalSqlStddevAggregateDescriptor.FACTORY);
fc.add(IntermediateSqlStddevAggregateDescriptor.FACTORY);
@@ -891,6 +900,7 @@
fc.add(ScalarSqlMaxDistinctAggregateDescriptor.FACTORY);
fc.add(ScalarSqlMinAggregateDescriptor.FACTORY);
fc.add(ScalarSqlMinDistinctAggregateDescriptor.FACTORY);
+ fc.add(ScalarSqlMedianAggregateDescriptor.FACTORY);
fc.add(ScalarSqlStddevAggregateDescriptor.FACTORY);
fc.add(ScalarSqlStddevDistinctAggregateDescriptor.FACTORY);
fc.add(ScalarSqlStddevPopAggregateDescriptor.FACTORY);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java
index bc763bd..7d484b6 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java
@@ -116,6 +116,9 @@
}
};
+ public static final IFunctionTypeInferer MEDIAN_MEMORY =
+ (expr, fd, context, compilerProps) -> fd.setImmutableStates(compilerProps.getSortMemoryFrames());
+
public static final class CastTypeInferer implements IFunctionTypeInferer {
@Override
public void infer(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment context,
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
index 4d324f4..422782e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
@@ -44,4 +44,6 @@
Class<?> loadClass(String className) throws HyracksException;
ClassLoader getClassLoader() throws HyracksException;
+
+ long nextUniqueId();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/FileNetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/FileNetworkInputChannel.java
new file mode 100644
index 0000000..542fda1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/FileNetworkInputChannel.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.comm.channels;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import org.apache.hyracks.api.channels.IInputChannel;
+import org.apache.hyracks.api.channels.IInputChannelMonitor;
+import org.apache.hyracks.api.comm.IBufferAcceptor;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FileNetworkInputChannel implements IInputChannel {
+
+ private static final int NUM_READ_BUFFERS = 1;
+ public static final long FILE_CHANNEL_CODE = -1;
+
+ private final IChannelConnectionFactory netManager;
+ private final SocketAddress remoteAddress;
+ private final long jobId;
+ private final long fileId;
+ private final Queue<ByteBuffer> fullQueue;
+ private final int nBuffers;
+ private IChannelControlBlock ccb;
+ private IInputChannelMonitor monitor;
+ private Object attachment;
+
+ public FileNetworkInputChannel(IChannelConnectionFactory netManager, SocketAddress remoteAddress, long jobId,
+ long fileId) {
+ this.netManager = netManager;
+ this.remoteAddress = remoteAddress;
+ this.jobId = jobId;
+ this.fileId = fileId;
+ this.fullQueue = new ArrayDeque<>(NUM_READ_BUFFERS);
+ this.nBuffers = NUM_READ_BUFFERS;
+ }
+
+ @Override
+ public void registerMonitor(IInputChannelMonitor monitor) {
+ this.monitor = monitor;
+ }
+
+ @Override
+ public void setAttachment(Object attachment) {
+ this.attachment = attachment;
+ }
+
+ @Override
+ public Object getAttachment() {
+ return attachment;
+ }
+
+ @Override
+ public synchronized ByteBuffer getNextBuffer() {
+ return fullQueue.poll();
+ }
+
+ @Override
+ public void recycleBuffer(ByteBuffer buffer) {
+ buffer.clear();
+ ccb.getReadInterface().getEmptyBufferAcceptor().accept(buffer);
+ }
+
+ @Override
+ public void open(IHyracksCommonContext ctx) throws HyracksDataException {
+ try {
+ ccb = netManager.connect(remoteAddress);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor());
+ ccb.getWriteInterface().setEmptyBufferAcceptor(WriteEmptyBufferAcceptor.INSTANCE);
+ ccb.getReadInterface().setBufferFactory(new ReadBufferFactory(nBuffers, ctx), nBuffers,
+ ctx.getInitialFrameSize());
+
+ ByteBuffer writeBuffer = ByteBuffer.allocate(NetworkInputChannel.INITIAL_MESSAGE_SIZE);
+ writeBuffer.putLong(FILE_CHANNEL_CODE);
+ writeBuffer.putLong(jobId);
+ writeBuffer.putLong(fileId);
+ writeBuffer.flip();
+
+ ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer);
+ ccb.getWriteInterface().getFullBufferAcceptor().close();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void fail() {
+ // do nothing (covered by job lifecycle)
+ }
+
+ private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ fullQueue.add(buffer);
+ monitor.notifyDataAvailability(FileNetworkInputChannel.this, 1);
+ }
+
+ @Override
+ public void close() {
+ monitor.notifyEndOfStream(FileNetworkInputChannel.this);
+ }
+
+ @Override
+ public void error(int ecode) {
+ monitor.notifyFailure(FileNetworkInputChannel.this, ecode);
+ }
+ }
+
+ private static class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+
+ static final WriteEmptyBufferAcceptor INSTANCE = new WriteEmptyBufferAcceptor();
+
+ @Override
+ public void accept(ByteBuffer buffer) {
+ // do nothing
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
index 53bb7cd..5ae81fb 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
@@ -37,7 +37,7 @@
public class NetworkInputChannel implements IInputChannel {
private static final Logger LOGGER = LogManager.getLogger();
- static final int INITIAL_MESSAGE_SIZE = 20;
+ public static final int INITIAL_MESSAGE_SIZE = 24;
private final IChannelConnectionFactory netManager;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index f179c36..a96dfe5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -108,6 +108,7 @@
private final String jobStartTimeZoneId;
private final long maxWarnings;
+ private final AtomicLong uniqueIds;
public Joblet(NodeControllerService nodeController, DeploymentId deploymentId, JobId jobId,
INCServiceContext serviceCtx, ActivityClusterGraph acg,
@@ -140,6 +141,7 @@
this.jobStartTime = jobStartTime;
this.jobStartTimeZoneId = jobStartTimeZoneId;
this.maxWarnings = acg.getMaxWarnings();
+ this.uniqueIds = new AtomicLong();
}
@Override
@@ -161,6 +163,11 @@
}
@Override
+ public long nextUniqueId() {
+ return uniqueIds.getAndIncrement();
+ }
+
+ @Override
public long getJobStartTime() {
return jobStartTime;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
index 6876618..468a969 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
@@ -31,8 +31,12 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.network.ISocketChannelFactory;
import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.comm.channels.FileNetworkInputChannel;
import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
+import org.apache.hyracks.comm.channels.NetworkInputChannel;
import org.apache.hyracks.comm.channels.NetworkOutputChannel;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.partitions.PartitionFileReaderUtil;
import org.apache.hyracks.control.nc.partitions.PartitionManager;
import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
@@ -47,8 +51,6 @@
private static final int MAX_CONNECTION_ATTEMPTS = 5;
- static final int INITIAL_MESSAGE_SIZE = 20;
-
private final PartitionManager partitionManager;
private final int nBuffers;
@@ -113,7 +115,8 @@
@Override
public void channelOpened(ChannelControlBlock channel) {
channel.getReadInterface().setFullBufferAcceptor(new InitialBufferAcceptor(channel));
- channel.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(INITIAL_MESSAGE_SIZE));
+ channel.getReadInterface().getEmptyBufferAcceptor()
+ .accept(ByteBuffer.allocate(NetworkInputChannel.INITIAL_MESSAGE_SIZE));
}
}
@@ -128,12 +131,13 @@
@Override
public void accept(ByteBuffer buffer) {
- PartitionId pid = readInitialMessage(buffer);
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Received initial partition request: " + pid + " on channel: " + ccb);
- }
noc = new NetworkOutputChannel(ccb, nBuffers);
- partitionManager.registerPartitionRequest(pid, noc);
+ long id = buffer.getLong();
+ if (id == FileNetworkInputChannel.FILE_CHANNEL_CODE) {
+ handleFileRequest(buffer);
+ } else {
+ handlePartitionRequest(buffer, id);
+ }
}
@Override
@@ -147,16 +151,37 @@
noc.abort(ecode);
}
}
+
+ private void handlePartitionRequest(ByteBuffer buffer, long jid) {
+ PartitionId pid = readInitialMessage(buffer, jid);
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Received initial partition request: " + pid + " on channel: " + ccb);
+ }
+ partitionManager.registerPartitionRequest(pid, noc);
+ }
+
+ private void handleFileRequest(ByteBuffer buffer) {
+ JobId jobId = new JobId(buffer.getLong());
+ long fileId = buffer.getLong();
+ if (partitionManager.registerFileRequest(jobId, noc)) {
+ writeFileToChannel(partitionManager.getNodeControllerService(), noc, jobId, fileId);
+ }
+ }
}
- private static PartitionId readInitialMessage(ByteBuffer buffer) {
- JobId jobId = new JobId(buffer.getLong());
+ private static PartitionId readInitialMessage(ByteBuffer buffer, long jid) {
+ JobId jobId = new JobId(jid);
ConnectorDescriptorId cdid = new ConnectorDescriptorId(buffer.getInt());
int senderIndex = buffer.getInt();
int receiverIndex = buffer.getInt();
return new PartitionId(jobId, cdid, senderIndex, receiverIndex);
}
+ private static void writeFileToChannel(NodeControllerService ncs, NetworkOutputChannel noc, JobId jobId,
+ long fileId) {
+ PartitionFileReaderUtil.writeFileToChannel(ncs, noc, jobId, fileId);
+ }
+
public MuxDemuxPerformanceCounters getPerformanceCounters() {
return md.getPerformanceCounters();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/JobFileState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/JobFileState.java
new file mode 100644
index 0000000..e4e6f09
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/JobFileState.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.partitions;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hyracks.api.dataflow.state.IStateObject;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.job.JobId;
+
+public class JobFileState implements IStateObject {
+
+ private final FileReference fileRef;
+ private final JobId jobId;
+ private final JobFileId jobFileId;
+
+ public JobFileState(FileReference fileRef, JobId jobId, long fileId) {
+ this.fileRef = fileRef;
+ this.jobFileId = new JobFileId(fileId);
+ this.jobId = jobId;
+ }
+
+ @Override
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ @Override
+ public Object getId() {
+ return jobFileId;
+ }
+
+ @Override
+ public long getMemoryOccupancy() {
+ return 0;
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+
+ }
+
+ public FileReference getFileRef() {
+ return fileRef;
+ }
+
+ public static class JobFileId {
+
+ private final long fileId;
+
+ public JobFileId(long fileId) {
+ this.fileId = fileId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof JobFileId)) {
+ return false;
+ }
+ JobFileId otherFileId = (JobFileId) o;
+ return otherFileId.fileId == fileId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Long.hashCode(fileId);
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java
index 3e36946..218e557 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java
@@ -18,14 +18,11 @@
*/
package org.apache.hyracks.control.nc.partitions;
-import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IFileHandle;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.partitions.IPartition;
@@ -60,45 +57,7 @@
@Override
public void writeTo(final IFrameWriter writer) {
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- if (partitionFile == null) {
- writer.open();
- writer.close();
- return;
- }
- IFileHandle fh = ioManager.open(partitionFile, IIOManager.FileReadWriteMode.READ_ONLY,
- IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
- try {
- writer.open();
- try {
- long offset = 0;
- ByteBuffer buffer = ctx.allocateFrame();
- while (true) {
- buffer.clear();
- long size = ioManager.syncRead(fh, offset, buffer);
- if (size < 0) {
- break;
- } else if (size < buffer.capacity()) {
- throw new HyracksDataException("Premature end of file");
- }
- offset += size;
- buffer.flip();
- writer.nextFrame(buffer);
- }
- } finally {
- writer.close();
- }
- } finally {
- ioManager.close(fh);
- }
- } catch (HyracksDataException e) {
- throw new RuntimeException(e);
- }
- }
- });
+ executor.execute(new PartitionFileReader(ctx, partitionFile, ioManager, writer));
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReader.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReader.java
new file mode 100644
index 0000000..a0d8dcd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReader.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.io.IIOManager;
+
+public class PartitionFileReader implements Runnable {
+
+ private final IHyracksCommonContext ctx;
+ private final FileReference partitionFile;
+ private final IIOManager ioManager;
+ private final IFrameWriter writer;
+
+ public PartitionFileReader(IHyracksCommonContext ctx, FileReference partitionFile, IIOManager ioManager,
+ IFrameWriter writer) {
+ this.ctx = ctx;
+ this.partitionFile = partitionFile;
+ this.ioManager = ioManager;
+ this.writer = writer;
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (partitionFile == null) {
+ writer.open();
+ writer.close();
+ return;
+ }
+ IFileHandle fh = ioManager.open(partitionFile, IIOManager.FileReadWriteMode.READ_ONLY,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ try {
+ writer.open();
+ try {
+ long offset = 0;
+ ByteBuffer buffer = ctx.allocateFrame();
+ while (true) {
+ buffer.clear();
+ long size = ioManager.syncRead(fh, offset, buffer);
+ if (size < 0) {
+ break;
+ } else if (size < buffer.capacity()) {
+ throw new HyracksDataException("Premature end of file");
+ }
+ offset += size;
+ buffer.flip();
+ writer.nextFrame(buffer);
+ }
+ } finally {
+ writer.close();
+ }
+ } finally {
+ ioManager.close(fh);
+ }
+ } catch (HyracksDataException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReaderUtil.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReaderUtil.java
new file mode 100644
index 0000000..2675c27
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReaderUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.partitions;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.hyracks.api.dataflow.state.IStateObject;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.comm.channels.NetworkOutputChannel;
+import org.apache.hyracks.control.nc.Joblet;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface;
+
+public class PartitionFileReaderUtil {
+
+ private PartitionFileReaderUtil() {
+ }
+
+ public static void writeFileToChannel(NodeControllerService ncs, NetworkOutputChannel noc, JobId jobId,
+ long fileId) {
+ Joblet joblet = ncs.getJobletMap().get(jobId);
+ if (joblet == null) {
+ noc.abort(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
+ return;
+ }
+ IStateObject stateObject = joblet.getEnvironment().getStateObject(new JobFileState.JobFileId(fileId));
+ if (!(stateObject instanceof JobFileState)) {
+ noc.abort(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
+ return;
+ }
+ JobFileState fileState = (JobFileState) stateObject;
+ FileReference fileRef = fileState.getFileRef();
+ if (!fileRef.getFile().exists()) {
+ noc.abort(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
+ return;
+ }
+ ExecutorService executor = ncs.getExecutor();
+ noc.setFrameSize(joblet.getInitialFrameSize());
+ executor.execute(new PartitionFileReader(joblet, fileRef, ncs.getIoManager(), noc));
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
index c082b71..927dd7c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
@@ -58,6 +58,8 @@
private final Map<PartitionId, NetworkOutputChannel> partitionRequests = new HashMap<>();
+ private final Map<JobId, List<NetworkOutputChannel>> fileRequests = new HashMap<>();
+
private final Cache<JobId, JobId> failedJobsCache;
public PartitionManager(NodeControllerService ncs) {
@@ -122,10 +124,24 @@
}
}
+ public synchronized boolean registerFileRequest(JobId jid, NetworkOutputChannel noc) {
+ if (failedJobsCache.getIfPresent(jid) != null) {
+ noc.abort(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
+ return false;
+ }
+ List<NetworkOutputChannel> netOutChannels = fileRequests.computeIfAbsent(jid, k -> new ArrayList<>());
+ netOutChannels.add(noc);
+ return true;
+ }
+
public IWorkspaceFileFactory getFileFactory() {
return fileFactory;
}
+ public NodeControllerService getNodeControllerService() {
+ return ncs;
+ }
+
public void close() {
deallocatableRegistry.close();
}
@@ -175,6 +191,10 @@
}
private List<NetworkOutputChannel> removePendingRequests(JobId jobId, JobStatus status) {
+ List<NetworkOutputChannel> jobFileRequests = null;
+ if (!fileRequests.isEmpty()) {
+ jobFileRequests = fileRequests.remove(jobId);
+ }
if (status != JobStatus.FAILURE) {
return Collections.emptyList();
}
@@ -189,6 +209,9 @@
requestsIterator.remove();
}
}
+ if (jobFileRequests != null && !jobFileRequests.isEmpty()) {
+ pendingRequests.addAll(jobFileRequests);
+ }
return pendingRequests;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
index 50cacb2..7051e1d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
@@ -115,6 +115,10 @@
return size;
}
+ public FileReference getFile() {
+ return file;
+ }
+
public void setDeleteAfterClose(boolean deleteAfterClose) {
this.deleteAfterClose = deleteAfterClose;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
index 46e3eec..bf4cbd0 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
@@ -20,6 +20,7 @@
import java.nio.ByteBuffer;
import java.time.ZoneId;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.context.IHyracksJobletContext;
@@ -42,6 +43,7 @@
private final WorkspaceFileFactory fileFactory;
private final long jobStartTime;
private final String jobStartTimeZoneId;
+ private final AtomicLong ids;
TestJobletContext(int frameSize, INCServiceContext serviceContext, JobId jobId) throws HyracksException {
this.serviceContext = serviceContext;
@@ -50,6 +52,7 @@
this.frameManger = new FrameManager(frameSize);
this.jobStartTime = System.currentTimeMillis();
this.jobStartTimeZoneId = ZoneId.systemDefault().getId();
+ this.ids = new AtomicLong();
}
@Override
@@ -151,4 +154,8 @@
return this.getClass().getClassLoader();
}
+ @Override
+ public long nextUniqueId() {
+ return ids.getAndIncrement();
+ }
}