Implements concurrent query management support.
The following changes are included:
-- factor out JobManager, NodeManager, and ResourceManager from ClusterControllerService;
-- let each application plugin its own IJobCapacityController implementation;
-- let each job specify its required cluster capacity;
-- add a required cluster capacity estimation visitor for optimized query plans;
-- add admission control and queuing for queries, but always executes DDLs and DMLs immediately;
-- add tests for JobManger, NodeManager, ClusterCapacity, ClusterCapacityVisitor, and IJobCapacityController;
-- enlarge the -Xmx setting for ManagixSqlppExecutionTest.
Change-Id: I8fb6fda57efa139114dd234e08cc7de7129468c8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1424
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/pom.xml b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
index 7f132dd..2bb6cc2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
@@ -93,5 +93,10 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
index 8b83d83..5f4877d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.api.context.ICCContext;
import org.apache.hyracks.api.job.IJobLifecycleListener;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
/**
* Application Context at the Cluster Controller for an application.
@@ -38,7 +39,7 @@
* @param state
* The distributed state
*/
- public void setDistributedState(Serializable state);
+ void setDistributedState(Serializable state);
/**
* A listener that listens to Job Lifecycle events at the Cluster
@@ -46,21 +47,21 @@
*
* @param jobLifecycleListener
*/
- public void addJobLifecycleListener(IJobLifecycleListener jobLifecycleListener);
+ void addJobLifecycleListener(IJobLifecycleListener jobLifecycleListener);
/**
* A listener that listens to Cluster Lifecycle events at the Cluster
* Controller.
*
- * @param jobLifecycleListener
+ * @param clusterLifecycleListener
*/
- public void addClusterLifecycleListener(IClusterLifecycleListener clusterLifecycleListener);
+ void addClusterLifecycleListener(IClusterLifecycleListener clusterLifecycleListener);
/**
* Get the Cluster Controller Context.
*
* @return The Cluster Controller Context.
*/
- public ICCContext getCCContext();
+ ICCContext getCCContext();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java
index 9f7f222..c11cc7a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java
@@ -18,10 +18,14 @@
*/
package org.apache.hyracks.api.application;
-public interface ICCApplicationEntryPoint {
- public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
- public void stop() throws Exception;
+public interface ICCApplicationEntryPoint {
+ void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception;
+
+ void stop() throws Exception;
void startupCompleted() throws Exception;
+
+ IJobCapacityController getJobCapacityController();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
index a9bef18..191a4af 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.api.application;
+import java.util.Collection;
import java.util.Map;
import java.util.Set;
@@ -47,6 +48,6 @@
* @param deadNodeIds
* A set of Node Controller Ids that have left the cluster. The set is not cumulative.
*/
- public void notifyNodeFailure(Set<String> deadNodeIds) throws HyracksException;
+ public void notifyNodeFailure(Collection<String> deadNodeIds) throws HyracksException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java
index ea850c5..dea6e4b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java
@@ -18,10 +18,14 @@
*/
package org.apache.hyracks.api.application;
+import org.apache.hyracks.api.job.resource.NodeCapacity;
+
public interface INCApplicationEntryPoint {
- public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception;
+ void start(INCApplicationContext ncAppCtx, String[] args) throws Exception;
- public void notifyStartupComplete() throws Exception;
+ void notifyStartupComplete() throws Exception;
- public void stop() throws Exception;
+ void stop() throws Exception;
+
+ NodeCapacity getCapacity();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
index c90644f..57f389f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
@@ -35,16 +35,16 @@
private final NetworkAddress messagingNetworkAddress;
- private final int numCores;
+ private final int numAvailableCores;
public NodeControllerInfo(String nodeId, NodeStatus status, NetworkAddress netAddress,
- NetworkAddress datasetNetworkAddress, NetworkAddress messagingNetworkAddress, int numCores) {
+ NetworkAddress datasetNetworkAddress, NetworkAddress messagingNetworkAddress, int numAvailableCores) {
this.nodeId = nodeId;
this.status = status;
this.netAddress = netAddress;
this.datasetNetworkAddress = datasetNetworkAddress;
this.messagingNetworkAddress = messagingNetworkAddress;
- this.numCores = numCores;
+ this.numAvailableCores = numAvailableCores;
}
public String getNodeId() {
@@ -67,7 +67,7 @@
return messagingNetworkAddress;
}
- public int getNumCores() {
- return numCores;
+ public int getNumAvailableCores() {
+ return numAvailableCores;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index 00c2cc4..9a908ab 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -75,10 +75,8 @@
acg.setFrameSize(spec.getFrameSize());
acg.setMaxReattempts(spec.getMaxReattempts());
acg.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
- acg.setGlobalJobDataFactory(spec.getGlobalJobDataFactory());
acg.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
acg.setUseConnectorPolicyForScheduling(spec.isUseConnectorPolicyForScheduling());
- acg.setReportTaskDetails(spec.isReportTaskDetails());
final Set<Constraint> constraints = new HashSet<Constraint>();
final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 5a67188..1ca8bb3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -27,19 +27,28 @@
/**
* A registry of runtime/compile error codes
* Error code:
- * 0 --- 999: runtime errors
- * 1000 ---- 1999: compilation errors
+ * 0 --- 9999: runtime errors
+ * 10000 ---- 19999: compilation errors
*/
public class ErrorCode {
private static final String RESOURCE_PATH = "errormsg" + File.separator + "en.properties";
public static final String HYRACKS = "HYR";
+ // Runtime error codes.
public static final int INVALID_OPERATOR_OPERATION = 1;
public static final int ERROR_PROCESSING_TUPLE = 2;
public static final int FAILURE_ON_NODE = 3;
- public static final int RUNTIME_FILE_WITH_ABSOULTE_PATH_NOT_WITHIN_ANY_IO_DEVICE = 4;
- public static final int RUNTIME_FULLTEXT_PHRASE_FOUND = 5;
- public static final int COMPILATION_RULECOLLECTION_NOT_INSTANCE_OF_LIST = 1001;
+ public static final int FILE_WITH_ABSOULTE_PATH_NOT_WITHIN_ANY_IO_DEVICE = 4;
+ public static final int FULLTEXT_PHRASE_FOUND = 5;
+ public static final int JOB_QUEUE_FULL = 6;
+ public static final int INVALID_NETWORK_ADDRESS = 7;
+ public static final int INVALID_INPUT_PARAMETER = 8;
+ public static final int JOB_REQUIREMENTS_EXCEED_CAPACITY = 9;
+ public static final int NO_SUCH_NODE = 10;
+ public static final int CLASS_LOADING_ISSUE = 11;
+
+ // Compilation error codes.
+ public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10001;
// Loads the map that maps error codes to error message templates.
private static Map<Integer, String> errorMessageMap = null;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 56be93e..0fd6923 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -16,10 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.hyracks.api.exceptions;
import java.io.Serializable;
-import java.util.logging.Logger;
import org.apache.hyracks.api.util.ErrorMessageUtil;
@@ -27,14 +27,6 @@
* The main execution time exception type for runtime errors in a hyracks environment
*/
public class HyracksDataException extends HyracksException {
- private static final long serialVersionUID = 1L;
-
- public static final int UNKNOWN = 0;
- private final String component;
- private final int errorCode;
- private final Serializable[] params;
- private final String nodeId;
- private transient volatile String msgCache;
public static HyracksDataException create(int code, Serializable... params) {
return new HyracksDataException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), params);
@@ -46,76 +38,68 @@
public HyracksDataException(String component, int errorCode, String message, Throwable cause, String nodeId,
Serializable... params) {
- super(message, cause);
- this.component = component;
- this.errorCode = errorCode;
- this.nodeId = nodeId;
- this.params = params;
+ super(component, errorCode, message, cause, nodeId, params);
}
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
public HyracksDataException(String message) {
- this(ErrorMessageUtil.NONE, UNKNOWN, message, null, null);
+ super(message);
}
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
public HyracksDataException(Throwable cause) {
- this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, null);
+ super(cause);
}
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
public HyracksDataException(Throwable cause, String nodeId) {
- this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, nodeId);
+ super(cause, nodeId);
}
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
public HyracksDataException(String message, Throwable cause, String nodeId) {
- this(ErrorMessageUtil.NONE, UNKNOWN, message, cause, nodeId);
+ super(message, cause, nodeId);
}
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
public HyracksDataException(String message, Throwable cause) {
- this(ErrorMessageUtil.NONE, UNKNOWN, message, cause, (String) null);
+ super(message, cause);
}
public HyracksDataException(String component, int errorCode, Serializable... params) {
- this(component, errorCode, null, null, null, params);
+ super(component, errorCode, null, null, null, params);
}
public HyracksDataException(Throwable cause, int errorCode, Serializable... params) {
- this(ErrorMessageUtil.NONE, errorCode, cause.getMessage(), cause, null, params);
+ super(ErrorMessageUtil.NONE, errorCode, cause.getMessage(), cause, null, params);
}
public HyracksDataException(String component, int errorCode, String message, Serializable... params) {
- this(component, errorCode, message, null, null, params);
+ super(component, errorCode, message, null, null, params);
}
public HyracksDataException(String component, int errorCode, Throwable cause, Serializable... params) {
- this(component, errorCode, cause.getMessage(), cause, null, params);
+ super(component, errorCode, cause.getMessage(), cause, null, params);
}
public HyracksDataException(String component, int errorCode, String message, Throwable cause,
Serializable... params) {
- this(component, errorCode, message, cause, null, params);
- }
-
- public String getComponent() {
- return component;
- }
-
- public int getErrorCode() {
- return errorCode;
- }
-
- public Object[] getParams() {
- return params;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- @Override
- public String getMessage() {
- if (msgCache == null) {
- msgCache = ErrorMessageUtil.formatMessage(component, errorCode, super.getMessage(), params);
- }
- return msgCache;
+ super(component, errorCode, message, cause, null, params);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
index e939d26..5d13212 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
@@ -19,22 +19,118 @@
package org.apache.hyracks.api.exceptions;
import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hyracks.api.util.ErrorMessageUtil;
public class HyracksException extends IOException {
private static final long serialVersionUID = 1L;
- public HyracksException() {
+ public static final int UNKNOWN = 0;
+ private final String component;
+ private final int errorCode;
+ private final Serializable[] params;
+ private final String nodeId;
+ private transient volatile String msgCache;
+
+ public static HyracksException create(int code, Serializable... params) {
+ return new HyracksException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), params);
}
- public HyracksException(String message) {
- super(message);
+ public static HyracksException create(int code, Throwable cause, Serializable... params) {
+ return new HyracksException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), cause, params);
}
- public HyracksException(Throwable cause) {
- super(cause);
- }
-
- public HyracksException(String message, Throwable cause) {
+ public HyracksException(String component, int errorCode, String message, Throwable cause, String nodeId,
+ Serializable... params) {
super(message, cause);
+ this.component = component;
+ this.errorCode = errorCode;
+ this.nodeId = nodeId;
+ this.params = params;
+ }
+
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
+ public HyracksException(String message) {
+ this(ErrorMessageUtil.NONE, UNKNOWN, message, null, null);
+ }
+
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
+ public HyracksException(Throwable cause) {
+ this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, null);
+ }
+
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
+ public HyracksException(Throwable cause, String nodeId) {
+ this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, nodeId);
+ }
+
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
+ public HyracksException(String message, Throwable cause, String nodeId) {
+ this(ErrorMessageUtil.NONE, UNKNOWN, message, cause, nodeId);
+ }
+
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
+ public HyracksException(String message, Throwable cause) {
+ this(ErrorMessageUtil.NONE, UNKNOWN, message, cause, (String) null);
+ }
+
+ public HyracksException(String component, int errorCode, Serializable... params) {
+ this(component, errorCode, null, null, null, params);
+ }
+
+ public HyracksException(Throwable cause, int errorCode, Serializable... params) {
+ this(ErrorMessageUtil.NONE, errorCode, cause.getMessage(), cause, null, params);
+ }
+
+ public HyracksException(String component, int errorCode, String message, Serializable... params) {
+ this(component, errorCode, message, null, null, params);
+ }
+
+ public HyracksException(String component, int errorCode, Throwable cause, Serializable... params) {
+ this(component, errorCode, cause.getMessage(), cause, null, params);
+ }
+
+ public HyracksException(String component, int errorCode, String message, Throwable cause, Serializable... params) {
+ this(component, errorCode, message, cause, null, params);
+ }
+
+ public String getComponent() {
+ return component;
+ }
+
+ public int getErrorCode() {
+ return errorCode;
+ }
+
+ public Object[] getParams() {
+ return params;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public String getMessage() {
+ if (msgCache == null) {
+ msgCache = ErrorMessageUtil.formatMessage(component, errorCode, super.getMessage(), params);
+ }
+ return msgCache;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
index 84a961e..5787c72 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
@@ -28,10 +28,6 @@
import java.util.Map;
import java.util.Set;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.constraints.Constraint;
import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
@@ -44,6 +40,12 @@
import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.resource.ClusterCapacity;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
public class JobSpecification implements Serializable, IOperatorDescriptorRegistry, IConnectorDescriptorRegistry {
private static final long serialVersionUID = 1L;
@@ -76,11 +78,9 @@
private IJobletEventListenerFactory jobletEventListenerFactory;
- private IGlobalJobDataFactory globalJobDataFactory;
-
private boolean useConnectorPolicyForScheduling;
- private boolean reportTaskDetails;
+ private IClusterCapacity requiredClusterCapacity;
private transient int operatorIdCounter;
@@ -106,7 +106,7 @@
connectorIdCounter = 0;
maxReattempts = 2;
useConnectorPolicyForScheduling = false;
- reportTaskDetails = true;
+ requiredClusterCapacity = new ClusterCapacity();
setFrameSize(frameSize);
}
@@ -281,14 +281,6 @@
this.jobletEventListenerFactory = jobletEventListenerFactory;
}
- public IGlobalJobDataFactory getGlobalJobDataFactory() {
- return globalJobDataFactory;
- }
-
- public void setGlobalJobDataFactory(IGlobalJobDataFactory globalJobDataFactory) {
- this.globalJobDataFactory = globalJobDataFactory;
- }
-
public boolean isUseConnectorPolicyForScheduling() {
return useConnectorPolicyForScheduling;
}
@@ -297,12 +289,12 @@
this.useConnectorPolicyForScheduling = useConnectorPolicyForScheduling;
}
- public boolean isReportTaskDetails() {
- return reportTaskDetails;
+ public void setRequiredClusterCapacity(IClusterCapacity capacity) {
+ this.requiredClusterCapacity = capacity;
}
- public void setReportTaskDetails(boolean reportTaskDetails) {
- this.reportTaskDetails = reportTaskDetails;
+ public IClusterCapacity getRequiredClusterCapacity() {
+ return requiredClusterCapacity;
}
private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java
index 4351e39..50db00d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java
@@ -16,11 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.hyracks.api.job;
public enum JobStatus {
- INITIALIZED,
+ PENDING,
RUNNING,
TERMINATED,
FAILURE,
+ FAILURE_BEFORE_EXECUTION
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java
new file mode 100644
index 0000000..ded4b63
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+
+public class ClusterCapacity implements IClusterCapacity {
+
+ private long aggregatedMemoryByteSize = 0;
+ private int aggregatedCores = 0;
+ private final Map<String, Long> nodeMemoryMap = new HashMap<>();
+ private final Map<String, Integer> nodeCoreMap = new HashMap<>();
+
+ @Override
+ public long getAggregatedMemoryByteSize() {
+ return aggregatedMemoryByteSize;
+ }
+
+ @Override
+ public int getAggregatedCores() {
+ return aggregatedCores;
+ }
+
+ @Override
+ public long getMemoryByteSize(String nodeId) throws HyracksException {
+ if (!nodeMemoryMap.containsKey(nodeId)) {
+ throw HyracksException.create(ErrorCode.NO_SUCH_NODE, nodeId);
+ }
+ return nodeMemoryMap.get(nodeId);
+ }
+
+ @Override
+ public int getCores(String nodeId) throws HyracksException {
+ if (!nodeMemoryMap.containsKey(nodeId)) {
+ throw HyracksException.create(ErrorCode.NO_SUCH_NODE, nodeId);
+ }
+ return nodeCoreMap.get(nodeId);
+ }
+
+ @Override
+ public void setAggregatedMemoryByteSize(long aggregatedMemoryByteSize) {
+ this.aggregatedMemoryByteSize = aggregatedMemoryByteSize;
+ }
+
+ @Override
+ public void setAggregatedCores(int aggregatedCores) {
+ this.aggregatedCores = aggregatedCores;
+ }
+
+ @Override
+ public void setMemoryByteSize(String nodeId, long memoryByteSize) {
+ nodeMemoryMap.put(nodeId, memoryByteSize);
+ }
+
+ @Override
+ public void setCores(String nodeId, int cores) {
+ nodeCoreMap.put(nodeId, cores);
+ }
+
+ @Override
+ public void update(String nodeId, NodeCapacity nodeCapacity) throws HyracksException {
+ // Removes the existing node resource and the aggregated resource statistics.
+ if (nodeMemoryMap.containsKey(nodeId)) {
+ aggregatedMemoryByteSize -= nodeMemoryMap.remove(nodeId);
+ }
+ if (nodeCoreMap.containsKey(nodeId)) {
+ aggregatedCores -= nodeCoreMap.remove(nodeId);
+ }
+
+ long memorySize = nodeCapacity.getMemoryByteSize();
+ int cores = nodeCapacity.getCores();
+ // Updates the node capacity map when both memory size and cores are positive.
+ if (memorySize > 0 && cores > 0) {
+ aggregatedMemoryByteSize += memorySize;
+ aggregatedCores += cores;
+ nodeMemoryMap.put(nodeId, memorySize);
+ nodeCoreMap.put(nodeId, cores);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return ObjectUtils.hashCodeMulti(aggregatedMemoryByteSize, aggregatedCores, nodeMemoryMap,
+ nodeCoreMap);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof ClusterCapacity)) {
+ return false;
+ }
+ ClusterCapacity capacity = (ClusterCapacity) o;
+ return aggregatedMemoryByteSize == capacity.aggregatedMemoryByteSize
+ && aggregatedCores == capacity.aggregatedCores
+ && ObjectUtils.equals(nodeMemoryMap, capacity.nodeMemoryMap)
+ && ObjectUtils.equals(nodeCoreMap, capacity.nodeCoreMap);
+ }
+
+ @Override
+ public String toString() {
+ return "capacity (memory: " + aggregatedMemoryByteSize + " bytes, CPU cores: " + aggregatedCores + ")";
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
new file mode 100644
index 0000000..9e38a20
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class DefaultJobCapacityController implements IJobCapacityController {
+
+ public static final DefaultJobCapacityController INSTANCE = new DefaultJobCapacityController();
+
+ private DefaultJobCapacityController() {
+ }
+
+ @Override
+ public JobSubmissionStatus allocate(JobSpecification job) {
+ return JobSubmissionStatus.EXECUTE;
+ }
+
+ @Override
+ public void release(JobSpecification job) {
+ // No operation here.
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IClusterCapacity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IClusterCapacity.java
new file mode 100644
index 0000000..ac3261d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IClusterCapacity.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+
+/**
+ * This interface abstracts the mutable capacity for a cluster.
+ */
+public interface IClusterCapacity extends IReadOnlyClusterCapacity {
+
+ /**
+ * Sets the aggregated memory size for a cluster.
+ *
+ * @param aggregatedMemoryByteSize,
+ * the aggregated memory size.
+ */
+ void setAggregatedMemoryByteSize(long aggregatedMemoryByteSize);
+
+ /**
+ * Sets the aggregated number of CPU cores for a cluster.
+ *
+ * @param aggregatedCores,
+ * the total number of cores.
+ */
+ void setAggregatedCores(int aggregatedCores);
+
+ /**
+ * Sets the memory byte size (for computation) of a specific node.
+ *
+ * @param nodeId,
+ * the node id.
+ * @param memoryByteSize,
+ * the available memory byte size for computation of the node.
+ */
+ void setMemoryByteSize(String nodeId, long memoryByteSize);
+
+ /**
+ * Sets the number of CPU cores for a specific node.
+ *
+ * @param nodeId,
+ * the node id.
+ * @param cores,
+ * the number of CPU cores for the node.
+ */
+ void setCores(String nodeId, int cores);
+
+ /**
+ * Updates the cluster capacity information with the capacity of one particular node.
+ *
+ * @param nodeId,
+ * the id of the node for updating.
+ * @param capacity,
+ * the capacity of one particular node.
+ * @throws HyracksException
+ * when the parameters are invalid.
+ */
+ void update(String nodeId, NodeCapacity capacity) throws HyracksException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
new file mode 100644
index 0000000..5fa4bd9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobSpecification;
+
+/**
+ * This interface determines the behavior of a job when it is submitted to the job manager.
+ * The job could be one of the following three cases:
+ * -- rejected immediately because its capacity requirement exceeds the cluster's capacity.
+ * -- entered into a pending job queue for deferred execution, due to the current capacity limitation because of
+ * concurrent running jobs;
+ * -- executed immediately because there is sufficient capacity.
+ */
+public interface IJobCapacityController {
+
+ enum JobSubmissionStatus {
+ EXECUTE,
+ QUEUE
+ }
+
+ /**
+ * Allocates required cluster capacity for a job.
+ *
+ * @param job,
+ * the job specification.
+ * @return EXECUTE, if the job can be executed immediately;
+ * QUEUE, if the job cannot be executed
+ * @throws HyracksException
+ * if the job's capacity requirement exceeds the maximum capacity of the cluster.
+ */
+ JobSubmissionStatus allocate(JobSpecification job) throws HyracksException;
+
+ /**
+ * Releases cluster capacity for a job when it completes.
+ *
+ * @param job,
+ * the job specification.
+ */
+ void release(JobSpecification job);
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IReadOnlyClusterCapacity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IReadOnlyClusterCapacity.java
new file mode 100644
index 0000000..59b6bfd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IReadOnlyClusterCapacity.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+
+/**
+ * This interface provides read-only methods for the capacity of a cluster.
+ */
+public interface IReadOnlyClusterCapacity extends Serializable {
+
+ /**
+ * @return the aggregated memory byte size for the cluster.
+ */
+ long getAggregatedMemoryByteSize();
+
+ /**
+ * @return the aggregated number of cores
+ */
+ int getAggregatedCores();
+
+ /**
+ * Retrieves the memory byte size for computation on a specific node.
+ * (Note that usually a portion of memory is used for storage.)
+ *
+ * @param nodeId,
+ * the node id.
+ * @return the memory byte size for computation on the node.
+ * @throws HyracksException
+ * when the input node does not exist.
+ */
+ long getMemoryByteSize(String nodeId) throws HyracksException;
+
+ /**
+ * Retrieves the number of CPU cores for computation on a specific node.
+ *
+ * @param nodeId,
+ * the node id.
+ * @return the number of CPU cores for computation on the node.
+ * @throws HyracksException,
+ * when the input node does not exist.
+ */
+ int getCores(String nodeId) throws HyracksException;
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/NodeCapacity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/NodeCapacity.java
new file mode 100644
index 0000000..7902e7d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/NodeCapacity.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import java.io.Serializable;
+
+/**
+ * Specifies the capacity for computation on a particular node, i.e., a NCDriver process.
+ */
+public class NodeCapacity implements Serializable {
+
+ // All memory for computations -- this is not changed during the lifetime of a running instance.
+ private final long memoryByteSize;
+
+ // All CPU cores -- currently we assume that it doesn't change during the lifetime of a running instance.
+ // Otherwise, for each heartbeat, we will have to update global cluster capacity of a cluster.
+ private final int cores;
+
+ /**
+ * NOTE: neither parameters can be negative.
+ * However, both of them can be zero, which means the node is to be removed from the cluster.
+ *
+ * @param memorySize,
+ * the memory size of the node.
+ * @param cores,
+ * the number of cores of the node.
+ */
+ public NodeCapacity(long memorySize, int cores) {
+ this.memoryByteSize = memorySize;
+ this.cores = cores;
+ }
+
+ public long getMemoryByteSize() {
+ return memoryByteSize;
+ }
+
+ public int getCores() {
+ return cores;
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 52367ee..d17c9aa 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -17,9 +17,18 @@
# under the License.
#
-1 = Unsupported operation %1$s in %2$s operator
-2 = Error in processing tuple %1$s in a frame
-4 = The file with absolute path %1$s is not within any of the current IO devices
-5 = Phrase search in Full-text is not supported. An expression should include only one word
+# 0 --- 9999: runtime errors
+# 10000 ---- 19999: compilation errors
-1001 = The given rule collection %1$s is not an instance of the List class.
\ No newline at end of file
+1=Unsupported operation %1$s in %2$s operator
+2=Error in processing tuple %1$s in a frame
+4=The file with absolute path %1$s is not within any of the current IO devices
+5=Phrase search in Full-text is not supported. An expression should include only one word
+6=Job queue is full with %1$s jobs
+7=Network address cannot be resolved -- %1$s
+8=Invalid internal input parameter
+9=Job requirement %1$s exceeds capacity %2$s
+10=Node %1$s does not exist
+11=Class loading issue: %1$s
+
+10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/resource/ClusterCapacityTest.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/resource/ClusterCapacityTest.java
new file mode 100644
index 0000000..277e8e2
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/resource/ClusterCapacityTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ClusterCapacityTest {
+
+ @Test
+ public void test() throws HyracksException {
+ ClusterCapacity capacity = new ClusterCapacity();
+ String nodeId = "node1";
+
+ // Adds one node.
+ capacity.update(nodeId, new NodeCapacity(1024L, 8));
+ Assert.assertTrue(capacity.getAggregatedMemoryByteSize() == 1024L);
+ Assert.assertTrue(capacity.getAggregatedCores() == 8);
+
+ // Updates the node.
+ capacity.update(nodeId, new NodeCapacity(-1L, -2));
+
+ // Verifies that node is removed
+ Assert.assertTrue(capacity.getAggregatedMemoryByteSize() == 0L);
+ Assert.assertTrue(capacity.getAggregatedCores() == 0);
+
+ boolean nodeNotExist = false;
+ try {
+ capacity.getMemoryByteSize(nodeId);
+ } catch (HyracksException e) {
+ nodeNotExist = e.getErrorCode() == ErrorCode.NO_SUCH_NODE;
+ }
+ Assert.assertTrue(nodeNotExist);
+ nodeNotExist = false;
+ try {
+ capacity.getCores(nodeId);
+ } catch (HyracksException e) {
+ nodeNotExist = e.getErrorCode() == ErrorCode.NO_SUCH_NODE;
+ }
+ Assert.assertTrue(nodeNotExist);
+
+ // Adds the node again.
+ capacity.update(nodeId, new NodeCapacity(1024L, 8));
+ // Updates the node.
+ capacity.update(nodeId, new NodeCapacity(4L, 0));
+
+ // Verifies that node does not exist
+ Assert.assertTrue(capacity.getAggregatedMemoryByteSize() == 0L);
+ Assert.assertTrue(capacity.getAggregatedCores() == 0);
+ nodeNotExist = false;
+ try {
+ capacity.getMemoryByteSize(nodeId);
+ } catch (HyracksException e) {
+ nodeNotExist = e.getErrorCode() == ErrorCode.NO_SUCH_NODE;
+ }
+ Assert.assertTrue(nodeNotExist);
+ nodeNotExist = false;
+ try {
+ capacity.getCores(nodeId);
+ } catch (HyracksException e) {
+ nodeNotExist = e.getErrorCode() == ErrorCode.NO_SUCH_NODE;
+ }
+ Assert.assertTrue(nodeNotExist);
+ }
+
+}