Implements concurrent query management support.

The following changes are included:
-- factor out JobManager, NodeManager, and ResourceManager from ClusterControllerService;
-- let each application plugin its own IJobCapacityController implementation;
-- let each job specify its required cluster capacity;
-- add a required cluster capacity estimation visitor for optimized query plans;
-- add admission control and queuing for queries, but always executes DDLs and DMLs immediately;
-- add tests for JobManger, NodeManager, ClusterCapacity, ClusterCapacityVisitor, and IJobCapacityController;
-- enlarge the -Xmx setting for ManagixSqlppExecutionTest.

Change-Id: I8fb6fda57efa139114dd234e08cc7de7129468c8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1424
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/pom.xml b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
index 7f132dd..2bb6cc2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
@@ -93,5 +93,10 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
index 8b83d83..5f4877d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
@@ -22,6 +22,7 @@
 
 import org.apache.hyracks.api.context.ICCContext;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 
 /**
  * Application Context at the Cluster Controller for an application.
@@ -38,7 +39,7 @@
      * @param state
      *            The distributed state
      */
-    public void setDistributedState(Serializable state);
+    void setDistributedState(Serializable state);
 
     /**
      * A listener that listens to Job Lifecycle events at the Cluster
@@ -46,21 +47,21 @@
      *
      * @param jobLifecycleListener
      */
-    public void addJobLifecycleListener(IJobLifecycleListener jobLifecycleListener);
+    void addJobLifecycleListener(IJobLifecycleListener jobLifecycleListener);
 
     /**
      * A listener that listens to Cluster Lifecycle events at the Cluster
      * Controller.
      *
-     * @param jobLifecycleListener
+     * @param clusterLifecycleListener
      */
-    public void addClusterLifecycleListener(IClusterLifecycleListener clusterLifecycleListener);
+    void addClusterLifecycleListener(IClusterLifecycleListener clusterLifecycleListener);
 
     /**
      * Get the Cluster Controller Context.
      *
      * @return The Cluster Controller Context.
      */
-    public ICCContext getCCContext();
+    ICCContext getCCContext();
 
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java
index 9f7f222..c11cc7a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java
@@ -18,10 +18,14 @@
  */
 package org.apache.hyracks.api.application;
 
-public interface ICCApplicationEntryPoint {
-    public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 
-    public void stop() throws Exception;
+public interface ICCApplicationEntryPoint {
+    void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception;
+
+    void stop() throws Exception;
 
     void startupCompleted() throws Exception;
+
+    IJobCapacityController getJobCapacityController();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
index a9bef18..191a4af 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.api.application;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 
@@ -47,6 +48,6 @@
      * @param deadNodeIds
      *            A set of Node Controller Ids that have left the cluster. The set is not cumulative.
      */
-    public void notifyNodeFailure(Set<String> deadNodeIds) throws HyracksException;
+    public void notifyNodeFailure(Collection<String> deadNodeIds) throws HyracksException;
 
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java
index ea850c5..dea6e4b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java
@@ -18,10 +18,14 @@
  */
 package org.apache.hyracks.api.application;
 
+import org.apache.hyracks.api.job.resource.NodeCapacity;
+
 public interface INCApplicationEntryPoint {
-    public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception;
+    void start(INCApplicationContext ncAppCtx, String[] args) throws Exception;
 
-    public void notifyStartupComplete() throws Exception;
+    void notifyStartupComplete() throws Exception;
 
-    public void stop() throws Exception;
+    void stop() throws Exception;
+
+    NodeCapacity getCapacity();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
index c90644f..57f389f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
@@ -35,16 +35,16 @@
 
     private final NetworkAddress messagingNetworkAddress;
 
-    private final int numCores;
+    private final int numAvailableCores;
 
     public NodeControllerInfo(String nodeId, NodeStatus status, NetworkAddress netAddress,
-            NetworkAddress datasetNetworkAddress, NetworkAddress messagingNetworkAddress, int numCores) {
+            NetworkAddress datasetNetworkAddress, NetworkAddress messagingNetworkAddress, int numAvailableCores) {
         this.nodeId = nodeId;
         this.status = status;
         this.netAddress = netAddress;
         this.datasetNetworkAddress = datasetNetworkAddress;
         this.messagingNetworkAddress = messagingNetworkAddress;
-        this.numCores = numCores;
+        this.numAvailableCores = numAvailableCores;
     }
 
     public String getNodeId() {
@@ -67,7 +67,7 @@
         return messagingNetworkAddress;
     }
 
-    public int getNumCores() {
-        return numCores;
+    public int getNumAvailableCores() {
+        return numAvailableCores;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index 00c2cc4..9a908ab 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -75,10 +75,8 @@
         acg.setFrameSize(spec.getFrameSize());
         acg.setMaxReattempts(spec.getMaxReattempts());
         acg.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
-        acg.setGlobalJobDataFactory(spec.getGlobalJobDataFactory());
         acg.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
         acg.setUseConnectorPolicyForScheduling(spec.isUseConnectorPolicyForScheduling());
-        acg.setReportTaskDetails(spec.isReportTaskDetails());
         final Set<Constraint> constraints = new HashSet<Constraint>();
         final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
             @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 5a67188..1ca8bb3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -27,19 +27,28 @@
 /**
  * A registry of runtime/compile error codes
  * Error code:
- * 0 --- 999: runtime errors
- * 1000 ---- 1999: compilation errors
+ * 0 --- 9999: runtime errors
+ * 10000 ---- 19999: compilation errors
  */
 public class ErrorCode {
     private static final String RESOURCE_PATH = "errormsg" + File.separator + "en.properties";
     public static final String HYRACKS = "HYR";
 
+    // Runtime error codes.
     public static final int INVALID_OPERATOR_OPERATION = 1;
     public static final int ERROR_PROCESSING_TUPLE = 2;
     public static final int FAILURE_ON_NODE = 3;
-    public static final int RUNTIME_FILE_WITH_ABSOULTE_PATH_NOT_WITHIN_ANY_IO_DEVICE = 4;
-    public static final int RUNTIME_FULLTEXT_PHRASE_FOUND = 5;
-    public static final int COMPILATION_RULECOLLECTION_NOT_INSTANCE_OF_LIST = 1001;
+    public static final int FILE_WITH_ABSOULTE_PATH_NOT_WITHIN_ANY_IO_DEVICE = 4;
+    public static final int FULLTEXT_PHRASE_FOUND = 5;
+    public static final int JOB_QUEUE_FULL = 6;
+    public static final int INVALID_NETWORK_ADDRESS = 7;
+    public static final int INVALID_INPUT_PARAMETER = 8;
+    public static final int JOB_REQUIREMENTS_EXCEED_CAPACITY = 9;
+    public static final int NO_SUCH_NODE = 10;
+    public static final int CLASS_LOADING_ISSUE = 11;
+
+    // Compilation error codes.
+    public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10001;
 
     // Loads the map that maps error codes to error message templates.
     private static Map<Integer, String> errorMessageMap = null;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 56be93e..0fd6923 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -16,10 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.hyracks.api.exceptions;
 
 import java.io.Serializable;
-import java.util.logging.Logger;
 
 import org.apache.hyracks.api.util.ErrorMessageUtil;
 
@@ -27,14 +27,6 @@
  * The main execution time exception type for runtime errors in a hyracks environment
  */
 public class HyracksDataException extends HyracksException {
-    private static final long serialVersionUID = 1L;
-
-    public static final int UNKNOWN = 0;
-    private final String component;
-    private final int errorCode;
-    private final Serializable[] params;
-    private final String nodeId;
-    private transient volatile String msgCache;
 
     public static HyracksDataException create(int code, Serializable... params) {
         return new HyracksDataException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), params);
@@ -46,76 +38,68 @@
 
     public HyracksDataException(String component, int errorCode, String message, Throwable cause, String nodeId,
             Serializable... params) {
-        super(message, cause);
-        this.component = component;
-        this.errorCode = errorCode;
-        this.nodeId = nodeId;
-        this.params = params;
+        super(component, errorCode, message, cause, nodeId, params);
     }
 
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
     public HyracksDataException(String message) {
-        this(ErrorMessageUtil.NONE, UNKNOWN, message, null, null);
+        super(message);
     }
 
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
     public HyracksDataException(Throwable cause) {
-        this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, null);
+        super(cause);
     }
 
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
     public HyracksDataException(Throwable cause, String nodeId) {
-        this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, nodeId);
+        super(cause, nodeId);
     }
 
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
     public HyracksDataException(String message, Throwable cause, String nodeId) {
-        this(ErrorMessageUtil.NONE, UNKNOWN, message, cause, nodeId);
+        super(message, cause, nodeId);
     }
 
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
     public HyracksDataException(String message, Throwable cause) {
-        this(ErrorMessageUtil.NONE, UNKNOWN, message, cause, (String) null);
+        super(message, cause);
     }
 
     public HyracksDataException(String component, int errorCode, Serializable... params) {
-        this(component, errorCode, null, null, null, params);
+        super(component, errorCode, null, null, null, params);
     }
 
     public HyracksDataException(Throwable cause, int errorCode, Serializable... params) {
-        this(ErrorMessageUtil.NONE, errorCode, cause.getMessage(), cause, null, params);
+        super(ErrorMessageUtil.NONE, errorCode, cause.getMessage(), cause, null, params);
     }
 
     public HyracksDataException(String component, int errorCode, String message, Serializable... params) {
-        this(component, errorCode, message, null, null, params);
+        super(component, errorCode, message, null, null, params);
     }
 
     public HyracksDataException(String component, int errorCode, Throwable cause, Serializable... params) {
-        this(component, errorCode, cause.getMessage(), cause, null, params);
+        super(component, errorCode, cause.getMessage(), cause, null, params);
     }
 
     public HyracksDataException(String component, int errorCode, String message, Throwable cause,
             Serializable... params) {
-        this(component, errorCode, message, cause, null, params);
-    }
-
-    public String getComponent() {
-        return component;
-    }
-
-    public int getErrorCode() {
-        return errorCode;
-    }
-
-    public Object[] getParams() {
-        return params;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    @Override
-    public String getMessage() {
-        if (msgCache == null) {
-            msgCache = ErrorMessageUtil.formatMessage(component, errorCode, super.getMessage(), params);
-        }
-        return msgCache;
+        super(component, errorCode, message, cause, null, params);
     }
 
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
index e939d26..5d13212 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
@@ -19,22 +19,118 @@
 package org.apache.hyracks.api.exceptions;
 
 import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hyracks.api.util.ErrorMessageUtil;
 
 public class HyracksException extends IOException {
     private static final long serialVersionUID = 1L;
 
-    public HyracksException() {
+    public static final int UNKNOWN = 0;
+    private final String component;
+    private final int errorCode;
+    private final Serializable[] params;
+    private final String nodeId;
+    private transient volatile String msgCache;
+
+    public static HyracksException create(int code, Serializable... params) {
+        return new HyracksException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), params);
     }
 
-    public HyracksException(String message) {
-        super(message);
+    public static HyracksException create(int code, Throwable cause, Serializable... params) {
+        return new HyracksException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), cause, params);
     }
 
-    public HyracksException(Throwable cause) {
-        super(cause);
-    }
-
-    public HyracksException(String message, Throwable cause) {
+    public HyracksException(String component, int errorCode, String message, Throwable cause, String nodeId,
+            Serializable... params) {
         super(message, cause);
+        this.component = component;
+        this.errorCode = errorCode;
+        this.nodeId = nodeId;
+        this.params = params;
+    }
+
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
+    public HyracksException(String message) {
+        this(ErrorMessageUtil.NONE, UNKNOWN, message, null, null);
+    }
+
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
+    public HyracksException(Throwable cause) {
+        this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, null);
+    }
+
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
+    public HyracksException(Throwable cause, String nodeId) {
+        this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, nodeId);
+    }
+
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
+    public HyracksException(String message, Throwable cause, String nodeId) {
+        this(ErrorMessageUtil.NONE, UNKNOWN, message, cause, nodeId);
+    }
+
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
+    public HyracksException(String message, Throwable cause) {
+        this(ErrorMessageUtil.NONE, UNKNOWN, message, cause, (String) null);
+    }
+
+    public HyracksException(String component, int errorCode, Serializable... params) {
+        this(component, errorCode, null, null, null, params);
+    }
+
+    public HyracksException(Throwable cause, int errorCode, Serializable... params) {
+        this(ErrorMessageUtil.NONE, errorCode, cause.getMessage(), cause, null, params);
+    }
+
+    public HyracksException(String component, int errorCode, String message, Serializable... params) {
+        this(component, errorCode, message, null, null, params);
+    }
+
+    public HyracksException(String component, int errorCode, Throwable cause, Serializable... params) {
+        this(component, errorCode, cause.getMessage(), cause, null, params);
+    }
+
+    public HyracksException(String component, int errorCode, String message, Throwable cause, Serializable... params) {
+        this(component, errorCode, message, cause, null, params);
+    }
+
+    public String getComponent() {
+        return component;
+    }
+
+    public int getErrorCode() {
+        return errorCode;
+    }
+
+    public Object[] getParams() {
+        return params;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    @Override
+    public String getMessage() {
+        if (msgCache == null) {
+            msgCache = ErrorMessageUtil.formatMessage(component, errorCode, super.getMessage(), params);
+        }
+        return msgCache;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
index 84a961e..5787c72 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
@@ -28,10 +28,6 @@
 import java.util.Map;
 import java.util.Set;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.constraints.Constraint;
 import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
@@ -44,6 +40,12 @@
 import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.resource.ClusterCapacity;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public class JobSpecification implements Serializable, IOperatorDescriptorRegistry, IConnectorDescriptorRegistry {
     private static final long serialVersionUID = 1L;
@@ -76,11 +78,9 @@
 
     private IJobletEventListenerFactory jobletEventListenerFactory;
 
-    private IGlobalJobDataFactory globalJobDataFactory;
-
     private boolean useConnectorPolicyForScheduling;
 
-    private boolean reportTaskDetails;
+    private IClusterCapacity requiredClusterCapacity;
 
     private transient int operatorIdCounter;
 
@@ -106,7 +106,7 @@
         connectorIdCounter = 0;
         maxReattempts = 2;
         useConnectorPolicyForScheduling = false;
-        reportTaskDetails = true;
+        requiredClusterCapacity = new ClusterCapacity();
         setFrameSize(frameSize);
     }
 
@@ -281,14 +281,6 @@
         this.jobletEventListenerFactory = jobletEventListenerFactory;
     }
 
-    public IGlobalJobDataFactory getGlobalJobDataFactory() {
-        return globalJobDataFactory;
-    }
-
-    public void setGlobalJobDataFactory(IGlobalJobDataFactory globalJobDataFactory) {
-        this.globalJobDataFactory = globalJobDataFactory;
-    }
-
     public boolean isUseConnectorPolicyForScheduling() {
         return useConnectorPolicyForScheduling;
     }
@@ -297,12 +289,12 @@
         this.useConnectorPolicyForScheduling = useConnectorPolicyForScheduling;
     }
 
-    public boolean isReportTaskDetails() {
-        return reportTaskDetails;
+    public void setRequiredClusterCapacity(IClusterCapacity capacity) {
+        this.requiredClusterCapacity = capacity;
     }
 
-    public void setReportTaskDetails(boolean reportTaskDetails) {
-        this.reportTaskDetails = reportTaskDetails;
+    public IClusterCapacity getRequiredClusterCapacity() {
+        return requiredClusterCapacity;
     }
 
     private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java
index 4351e39..50db00d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java
@@ -16,11 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.hyracks.api.job;
 
 public enum JobStatus {
-    INITIALIZED,
+    PENDING,
     RUNNING,
     TERMINATED,
     FAILURE,
+    FAILURE_BEFORE_EXECUTION
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java
new file mode 100644
index 0000000..ded4b63
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+
+public class ClusterCapacity implements IClusterCapacity {
+
+    private long aggregatedMemoryByteSize = 0;
+    private int aggregatedCores = 0;
+    private final Map<String, Long> nodeMemoryMap = new HashMap<>();
+    private final Map<String, Integer> nodeCoreMap = new HashMap<>();
+
+    @Override
+    public long getAggregatedMemoryByteSize() {
+        return aggregatedMemoryByteSize;
+    }
+
+    @Override
+    public int getAggregatedCores() {
+        return aggregatedCores;
+    }
+
+    @Override
+    public long getMemoryByteSize(String nodeId) throws HyracksException {
+        if (!nodeMemoryMap.containsKey(nodeId)) {
+            throw HyracksException.create(ErrorCode.NO_SUCH_NODE, nodeId);
+        }
+        return nodeMemoryMap.get(nodeId);
+    }
+
+    @Override
+    public int getCores(String nodeId) throws HyracksException {
+        if (!nodeMemoryMap.containsKey(nodeId)) {
+            throw HyracksException.create(ErrorCode.NO_SUCH_NODE, nodeId);
+        }
+        return nodeCoreMap.get(nodeId);
+    }
+
+    @Override
+    public void setAggregatedMemoryByteSize(long aggregatedMemoryByteSize) {
+        this.aggregatedMemoryByteSize = aggregatedMemoryByteSize;
+    }
+
+    @Override
+    public void setAggregatedCores(int aggregatedCores) {
+        this.aggregatedCores = aggregatedCores;
+    }
+
+    @Override
+    public void setMemoryByteSize(String nodeId, long memoryByteSize) {
+        nodeMemoryMap.put(nodeId, memoryByteSize);
+    }
+
+    @Override
+    public void setCores(String nodeId, int cores) {
+        nodeCoreMap.put(nodeId, cores);
+    }
+
+    @Override
+    public void update(String nodeId, NodeCapacity nodeCapacity) throws HyracksException {
+        // Removes the existing node resource and the aggregated resource statistics.
+        if (nodeMemoryMap.containsKey(nodeId)) {
+            aggregatedMemoryByteSize -= nodeMemoryMap.remove(nodeId);
+        }
+        if (nodeCoreMap.containsKey(nodeId)) {
+            aggregatedCores -= nodeCoreMap.remove(nodeId);
+        }
+
+        long memorySize = nodeCapacity.getMemoryByteSize();
+        int cores = nodeCapacity.getCores();
+        // Updates the node capacity map when both memory size and cores are positive.
+        if (memorySize > 0 && cores > 0) {
+            aggregatedMemoryByteSize += memorySize;
+            aggregatedCores += cores;
+            nodeMemoryMap.put(nodeId, memorySize);
+            nodeCoreMap.put(nodeId, cores);
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return ObjectUtils.hashCodeMulti(aggregatedMemoryByteSize, aggregatedCores, nodeMemoryMap,
+                nodeCoreMap);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof ClusterCapacity)) {
+            return false;
+        }
+        ClusterCapacity capacity = (ClusterCapacity) o;
+        return aggregatedMemoryByteSize == capacity.aggregatedMemoryByteSize
+                && aggregatedCores == capacity.aggregatedCores
+                && ObjectUtils.equals(nodeMemoryMap, capacity.nodeMemoryMap)
+                && ObjectUtils.equals(nodeCoreMap, capacity.nodeCoreMap);
+    }
+
+    @Override
+    public String toString() {
+        return "capacity (memory: " + aggregatedMemoryByteSize + " bytes, CPU cores: " + aggregatedCores + ")";
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
new file mode 100644
index 0000000..9e38a20
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class DefaultJobCapacityController implements IJobCapacityController {
+
+    public static final DefaultJobCapacityController INSTANCE = new DefaultJobCapacityController();
+
+    private DefaultJobCapacityController() {
+    }
+
+    @Override
+    public JobSubmissionStatus allocate(JobSpecification job) {
+        return JobSubmissionStatus.EXECUTE;
+    }
+
+    @Override
+    public void release(JobSpecification job) {
+        // No operation here.
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IClusterCapacity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IClusterCapacity.java
new file mode 100644
index 0000000..ac3261d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IClusterCapacity.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+
+/**
+ * This interface abstracts the mutable capacity for a cluster.
+ */
+public interface IClusterCapacity extends IReadOnlyClusterCapacity {
+
+    /**
+     * Sets the aggregated memory size for a cluster.
+     *
+     * @param aggregatedMemoryByteSize,
+     *            the aggregated memory size.
+     */
+    void setAggregatedMemoryByteSize(long aggregatedMemoryByteSize);
+
+    /**
+     * Sets the aggregated number of CPU cores for a cluster.
+     *
+     * @param aggregatedCores,
+     *            the total number of cores.
+     */
+    void setAggregatedCores(int aggregatedCores);
+
+    /**
+     * Sets the memory byte size (for computation) of a specific node.
+     *
+     * @param nodeId,
+     *            the node id.
+     * @param memoryByteSize,
+     *            the available memory byte size for computation of the node.
+     */
+    void setMemoryByteSize(String nodeId, long memoryByteSize);
+
+    /**
+     * Sets the number of CPU cores for a specific node.
+     *
+     * @param nodeId,
+     *            the node id.
+     * @param cores,
+     *            the number of CPU cores for the node.
+     */
+    void setCores(String nodeId, int cores);
+
+    /**
+     * Updates the cluster capacity information with the capacity of one particular node.
+     *
+     * @param nodeId,
+     *            the id of the node for updating.
+     * @param capacity,
+     *            the capacity of one particular node.
+     * @throws HyracksException
+     *             when the parameters are invalid.
+     */
+    void update(String nodeId, NodeCapacity capacity) throws HyracksException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
new file mode 100644
index 0000000..5fa4bd9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobSpecification;
+
+/**
+ * This interface determines the behavior of a job when it is submitted to the job manager.
+ * The job could be one of the following three cases:
+ * -- rejected immediately because its capacity requirement exceeds the cluster's capacity.
+ * -- entered into a pending job queue for deferred execution, due to the current capacity limitation because of
+ * concurrent running jobs;
+ * -- executed immediately because there is sufficient capacity.
+ */
+public interface IJobCapacityController {
+
+    enum JobSubmissionStatus {
+        EXECUTE,
+        QUEUE
+    }
+
+    /**
+     * Allocates required cluster capacity for a job.
+     *
+     * @param job,
+     *            the job specification.
+     * @return EXECUTE, if the job can be executed immediately;
+     *         QUEUE, if the job cannot be executed
+     * @throws HyracksException
+     *             if the job's capacity requirement exceeds the maximum capacity of the cluster.
+     */
+    JobSubmissionStatus allocate(JobSpecification job) throws HyracksException;
+
+    /**
+     * Releases cluster capacity for a job when it completes.
+     *
+     * @param job,
+     *            the job specification.
+     */
+    void release(JobSpecification job);
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IReadOnlyClusterCapacity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IReadOnlyClusterCapacity.java
new file mode 100644
index 0000000..59b6bfd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IReadOnlyClusterCapacity.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+
+/**
+ * This interface provides read-only methods for the capacity of a cluster.
+ */
+public interface IReadOnlyClusterCapacity extends Serializable {
+
+    /**
+     * @return the aggregated memory byte size for the cluster.
+     */
+    long getAggregatedMemoryByteSize();
+
+    /**
+     * @return the aggregated number of cores
+     */
+    int getAggregatedCores();
+
+    /**
+     * Retrieves the memory byte size for computation on a specific node.
+     * (Note that usually a portion of memory is used for storage.)
+     *
+     * @param nodeId,
+     *            the node id.
+     * @return the memory byte size for computation on the node.
+     * @throws HyracksException
+     *             when the input node does not exist.
+     */
+    long getMemoryByteSize(String nodeId) throws HyracksException;
+
+    /**
+     * Retrieves the number of CPU cores for computation on a specific node.
+     *
+     * @param nodeId,
+     *            the node id.
+     * @return the number of CPU cores for computation on the node.
+     * @throws HyracksException,
+     *             when the input node does not exist.
+     */
+    int getCores(String nodeId) throws HyracksException;
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/NodeCapacity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/NodeCapacity.java
new file mode 100644
index 0000000..7902e7d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/NodeCapacity.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import java.io.Serializable;
+
+/**
+ * Specifies the capacity for computation on a particular node, i.e., a NCDriver process.
+ */
+public class NodeCapacity implements Serializable {
+
+    // All memory for computations -- this is not changed during the lifetime of a running instance.
+    private final long memoryByteSize;
+
+    // All CPU cores -- currently we assume that it doesn't change during the lifetime of a running instance.
+    // Otherwise, for each heartbeat, we will have to update global cluster capacity of a cluster.
+    private final int cores;
+
+    /**
+     * NOTE: neither parameters can be negative.
+     * However, both of them can be zero, which means the node is to be removed from the cluster.
+     *
+     * @param memorySize,
+     *            the memory size of the node.
+     * @param cores,
+     *            the number of cores of the node.
+     */
+    public NodeCapacity(long memorySize, int cores) {
+        this.memoryByteSize = memorySize;
+        this.cores = cores;
+    }
+
+    public long getMemoryByteSize() {
+        return memoryByteSize;
+    }
+
+    public int getCores() {
+        return cores;
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 52367ee..d17c9aa 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -17,9 +17,18 @@
 # under the License.
 #
 
-1 = Unsupported operation %1$s in %2$s operator
-2 = Error in processing tuple %1$s in a frame
-4 = The file with absolute path %1$s is not within any of the current IO devices
-5 = Phrase search in Full-text is not supported. An expression should include only one word
+# 0 --- 9999: runtime errors
+# 10000 ---- 19999: compilation errors
 
-1001 = The given rule collection %1$s is not an instance of the List class.
\ No newline at end of file
+1=Unsupported operation %1$s in %2$s operator
+2=Error in processing tuple %1$s in a frame
+4=The file with absolute path %1$s is not within any of the current IO devices
+5=Phrase search in Full-text is not supported. An expression should include only one word
+6=Job queue is full with %1$s jobs
+7=Network address cannot be resolved -- %1$s
+8=Invalid internal input parameter
+9=Job requirement %1$s exceeds capacity %2$s
+10=Node %1$s does not exist
+11=Class loading issue: %1$s
+
+10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/resource/ClusterCapacityTest.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/resource/ClusterCapacityTest.java
new file mode 100644
index 0000000..277e8e2
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/resource/ClusterCapacityTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ClusterCapacityTest {
+
+    @Test
+    public void test() throws HyracksException {
+        ClusterCapacity capacity = new ClusterCapacity();
+        String nodeId = "node1";
+
+        // Adds one node.
+        capacity.update(nodeId, new NodeCapacity(1024L, 8));
+        Assert.assertTrue(capacity.getAggregatedMemoryByteSize() == 1024L);
+        Assert.assertTrue(capacity.getAggregatedCores() == 8);
+
+        // Updates the node.
+        capacity.update(nodeId, new NodeCapacity(-1L, -2));
+
+        // Verifies that node is removed
+        Assert.assertTrue(capacity.getAggregatedMemoryByteSize() == 0L);
+        Assert.assertTrue(capacity.getAggregatedCores() == 0);
+
+        boolean nodeNotExist = false;
+        try {
+            capacity.getMemoryByteSize(nodeId);
+        } catch (HyracksException e) {
+            nodeNotExist = e.getErrorCode() == ErrorCode.NO_SUCH_NODE;
+        }
+        Assert.assertTrue(nodeNotExist);
+        nodeNotExist = false;
+        try {
+            capacity.getCores(nodeId);
+        } catch (HyracksException e) {
+            nodeNotExist = e.getErrorCode() == ErrorCode.NO_SUCH_NODE;
+        }
+        Assert.assertTrue(nodeNotExist);
+
+        // Adds the node again.
+        capacity.update(nodeId, new NodeCapacity(1024L, 8));
+        // Updates the node.
+        capacity.update(nodeId, new NodeCapacity(4L, 0));
+
+        // Verifies that node does not exist
+        Assert.assertTrue(capacity.getAggregatedMemoryByteSize() == 0L);
+        Assert.assertTrue(capacity.getAggregatedCores() == 0);
+        nodeNotExist = false;
+        try {
+            capacity.getMemoryByteSize(nodeId);
+        } catch (HyracksException e) {
+            nodeNotExist = e.getErrorCode() == ErrorCode.NO_SUCH_NODE;
+        }
+        Assert.assertTrue(nodeNotExist);
+        nodeNotExist = false;
+        try {
+            capacity.getCores(nodeId);
+        } catch (HyracksException e) {
+            nodeNotExist = e.getErrorCode() == ErrorCode.NO_SUCH_NODE;
+        }
+        Assert.assertTrue(nodeNotExist);
+    }
+
+}