[NO ISSUE][OTH] Add API To Ensure Request Requirements

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Refactor IRequestParameters as ICommonRequestParameters
  and IRequestParameters to break cyclic dependencies.
- Add new API to ensure request can be scheduled for execution.

Change-Id: Ifb0513e0baf2b473006d4aa23040c86751fbb4fc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3181
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Murtadha Hubail <mhubail@apache.org>
Tested-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index 014bf3c..fe6aeea 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -18,9 +18,7 @@
  */
 package org.apache.asterix.translator;
 
-import java.util.Map;
-
-import org.apache.asterix.common.api.IRequestReference;
+import org.apache.asterix.common.api.ICommonRequestParameters;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -35,11 +33,10 @@
     protected Thread executor;
     protected String clientContextId;
 
-    public ClientRequest(IRequestReference requestReference, String clientContextId, String statement,
-            Map<String, String> optionalParameters) {
-        super(requestReference);
-        this.clientContextId = clientContextId;
-        this.statement = statement;
+    public ClientRequest(ICommonRequestParameters requestParameters) {
+        super(requestParameters.getRequestReference());
+        this.clientContextId = requestParameters.getClientContextId();
+        this.statement = requestParameters.getStatement();
         this.executor = Thread.currentThread();
     }
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
index 86ba301..6e41cd2 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
@@ -20,12 +20,13 @@
 
 import java.util.Map;
 
+import org.apache.asterix.common.api.ICommonRequestParameters;
 import org.apache.asterix.common.api.IRequestReference;
 import org.apache.asterix.om.base.IAObject;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.api.result.IResultSet;
 
-public interface IRequestParameters {
+public interface IRequestParameters extends ICommonRequestParameters {
 
     /**
      * @return A Resultset client object that is used to read the results.
@@ -50,36 +51,7 @@
     IStatementExecutor.ResultMetadata getOutMetadata();
 
     /**
-     * @return the client context id for the query
-     */
-    String getClientContextId();
-
-    /**
-     * @return Optional request parameters. Otherwise null.
-     */
-    Map<String, String> getOptionalParameters();
-
-    /**
      * @return Statement parameters
      */
     Map<String, IAObject> getStatementParameters();
-
-    /**
-     * @return true if the request accepts multiple statements. Otherwise, false.
-     */
-    boolean isMultiStatement();
-
-    /**
-     * Gets the statement the client provided with the request
-     *
-     * @return the request statement
-     */
-    String getStatement();
-
-    /**
-     * The request reference of this {@link IRequestParameters}
-     *
-     * @return the request reference
-     */
-    IRequestReference getRequestReference();
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
index c174c02..52aab20 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
@@ -18,15 +18,15 @@
  */
 package org.apache.asterix.translator;
 
-import java.util.Map;
 import java.util.UUID;
 
 import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.api.ICommonRequestParameters;
 import org.apache.asterix.common.api.IReceptionist;
 import org.apache.asterix.common.api.IRequestReference;
+import org.apache.asterix.common.api.ISchedulableClientRequest;
 import org.apache.asterix.common.api.RequestReference;
 import org.apache.http.HttpHeaders;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.util.NetworkUtil;
 
@@ -48,8 +48,12 @@
     }
 
     @Override
-    public IClientRequest requestReceived(IRequestReference requestRef, String clientContextId, String statement,
-            Map<String, String> optionalParameters) throws HyracksDataException {
-        return new ClientRequest(requestRef, clientContextId, statement, optionalParameters);
+    public IClientRequest requestReceived(ICommonRequestParameters requestParameters) {
+        return new ClientRequest(requestParameters);
+    }
+
+    @Override
+    public void ensureSchedulable(ISchedulableClientRequest schedulableRequest) {
+        // currently we don't have any restrictions
     }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SchedulableClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SchedulableClientRequest.java
new file mode 100644
index 0000000..ca04463
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SchedulableClientRequest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.translator;
+
+import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.api.ICommonRequestParameters;
+import org.apache.asterix.common.api.ISchedulableClientRequest;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class SchedulableClientRequest implements ISchedulableClientRequest {
+
+    private final IClientRequest clientRequest;
+    private final JobSpecification jobSpec;
+    private final IMetadataProvider metadataProvider;
+    private final ICommonRequestParameters requestParameters;
+
+    private SchedulableClientRequest(IClientRequest clientRequest, ICommonRequestParameters requestParameters,
+            IMetadataProvider metadataProvider, JobSpecification jobSpec) {
+        this.clientRequest = clientRequest;
+        this.requestParameters = requestParameters;
+        this.metadataProvider = metadataProvider;
+        this.jobSpec = jobSpec;
+    }
+
+    public static SchedulableClientRequest of(IClientRequest clientRequest, ICommonRequestParameters requestParameters,
+            IMetadataProvider metadataProvider, JobSpecification jobSpec) {
+        return new SchedulableClientRequest(clientRequest, requestParameters, metadataProvider, jobSpec);
+    }
+
+    @Override
+    public IClientRequest getClientRequest() {
+        return clientRequest;
+    }
+
+    @Override
+    public ICommonRequestParameters getRequestParameters() {
+        return requestParameters;
+    }
+
+    @Override
+    public JobSpecification getJobSpecification() {
+        return jobSpec;
+    }
+
+    @Override
+    public IMetadataProvider getMetadataProvider() {
+        return metadataProvider;
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index f89b2db..c49fcdd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -163,6 +163,7 @@
 import org.apache.asterix.translator.ExecutionPlansHtmlPrintUtil;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.SchedulableClientRequest;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionOutput;
 import org.apache.asterix.translator.TypeTranslator;
@@ -2499,7 +2500,7 @@
             case ASYNC:
                 MutableBoolean printed = new MutableBoolean(false);
                 executorService.submit(() -> asyncCreateAndRunJob(hcc, compiler, locker, resultDelivery,
-                        requestParameters, cancellable, resultSetId, printed));
+                        requestParameters, cancellable, resultSetId, printed, metadataProvider));
                 synchronized (printed) {
                     while (!printed.booleanValue()) {
                         printed.wait();
@@ -2514,7 +2515,7 @@
                     sessionOutput.release();
                     ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats,
                             metadataProvider.findOutputRecordType());
-                }, requestParameters, cancellable, appCtx);
+                }, requestParameters, cancellable, appCtx, metadataProvider);
                 break;
             case DEFERRED:
                 createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
@@ -2524,7 +2525,7 @@
                         outMetadata.getResultSets()
                                 .add(Triple.of(id, resultSetId, metadataProvider.findOutputRecordType()));
                     }
-                }, requestParameters, cancellable, appCtx);
+                }, requestParameters, cancellable, appCtx, metadataProvider);
                 break;
             default:
                 break;
@@ -2552,7 +2553,7 @@
 
     private void asyncCreateAndRunJob(IHyracksClientConnection hcc, IStatementCompiler compiler, IMetadataLocker locker,
             ResultDelivery resultDelivery, IRequestParameters requestParameters, boolean cancellable,
-            ResultSetId resultSetId, MutableBoolean printed) {
+            ResultSetId resultSetId, MutableBoolean printed, MetadataProvider metadataProvider) {
         Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID);
         try {
             createAndRunJob(hcc, jobFlags, jobId, compiler, locker, resultDelivery, id -> {
@@ -2563,7 +2564,7 @@
                     printed.setTrue();
                     printed.notify();
                 }
-            }, requestParameters, cancellable, appCtx);
+            }, requestParameters, cancellable, appCtx, metadataProvider);
         } catch (Exception e) {
             if (Objects.equals(JobId.INVALID, jobId.getValue())) {
                 // compilation failed
@@ -2594,7 +2595,8 @@
 
     private static void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
             IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer,
-            IRequestParameters requestParameters, boolean cancellable, ICcApplicationContext appCtx) throws Exception {
+            IRequestParameters requestParameters, boolean cancellable, ICcApplicationContext appCtx,
+            MetadataProvider metadataProvider) throws Exception {
         final IRequestTracker requestTracker = appCtx.getRequestTracker();
         final ClientRequest clientRequest =
                 (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
@@ -2607,6 +2609,9 @@
             if (cancellable) {
                 clientRequest.markCancellable();
             }
+            final SchedulableClientRequest schedulableRequest =
+                    SchedulableClientRequest.of(clientRequest, requestParameters, metadataProvider, jobSpec);
+            appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
             final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
             clientRequest.setJobId(jobId);
             if (jId != null) {
@@ -2946,9 +2951,7 @@
     }
 
     protected void trackRequest(IRequestParameters requestParameters) throws HyracksDataException {
-        final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(
-                requestParameters.getRequestReference(), requestParameters.getClientContextId(),
-                requestParameters.getStatement(), requestParameters.getOptionalParameters());
+        final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters);
         appCtx.getRequestTracker().track(clientRequest);
     }
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
index 1a526b2..2eef241 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -30,6 +30,7 @@
 
 import org.apache.asterix.api.http.server.CcQueryCancellationServlet;
 import org.apache.asterix.api.http.server.ServletConstants;
+import org.apache.asterix.app.translator.RequestParameters;
 import org.apache.asterix.common.api.RequestReference;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.runtime.utils.RequestTracker;
@@ -66,7 +67,9 @@
         verify(mockResponse, times(1)).setStatus(HttpResponseStatus.NOT_FOUND);
 
         final RequestReference requestReference = RequestReference.of("1", "node1", System.currentTimeMillis());
-        ClientRequest request = new ClientRequest(requestReference, "1", "select 1;", new HashMap<>());
+        RequestParameters requestParameters =
+                new RequestParameters(requestReference, "select 1", null, null, null, null, "1", null, null, true);
+        ClientRequest request = new ClientRequest(requestParameters);
         request.setJobId(new JobId(1));
         request.markCancellable();
         tracker.track(request);
@@ -81,7 +84,9 @@
 
         // Tests the case that the job cancellation hit some exception from Hyracks.
         final RequestReference requestReference2 = RequestReference.of("2", "node1", System.currentTimeMillis());
-        ClientRequest request2 = new ClientRequest(requestReference2, "2", "select 1;", new HashMap<>());
+        requestParameters =
+                new RequestParameters(requestReference2, "select 1", null, null, null, null, "2", null, null, true);
+        ClientRequest request2 = new ClientRequest(requestParameters);
         request2.setJobId(new JobId(2));
         request2.markCancellable();
         tracker.track(request2);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ICommonRequestParameters.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ICommonRequestParameters.java
new file mode 100644
index 0000000..8daff1d
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ICommonRequestParameters.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import java.util.Map;
+
+public interface ICommonRequestParameters {
+
+    /**
+     * The request reference of this {@link ICommonRequestParameters}
+     *
+     * @return the request reference
+     */
+    IRequestReference getRequestReference();
+
+    /**
+     * @return the client context id for the request
+     */
+    String getClientContextId();
+
+    /**
+     * @return Optional request parameters. Otherwise null.
+     */
+    Map<String, String> getOptionalParameters();
+
+    /**
+     * @return true if the request accepts multiple statements. Otherwise, false.
+     */
+    boolean isMultiStatement();
+
+    /**
+     * Gets the statement the client provided with the request
+     *
+     * @return the request statement
+     */
+    String getStatement();
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
index 51df306..95ed22e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.common.api;
 
-import java.util.Map;
-
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.http.api.IServletRequest;
 
@@ -36,13 +34,17 @@
     /**
      * Generates a {@link IClientRequest} based on the requests parameters
      *
-     * @param requestRef
-     * @param clientContextId
-     * @param statement
-     * @param getOptionalParameters
-     * @return A client request
+     * @param requestParameters
+     * @return the client request
      * @throws HyracksDataException
      */
-    IClientRequest requestReceived(IRequestReference requestRef, String clientContextId, String statement,
-            Map<String, String> getOptionalParameters) throws HyracksDataException;
+    IClientRequest requestReceived(ICommonRequestParameters requestParameters) throws HyracksDataException;
+
+    /**
+     * Ensures a client's request can be executed before its job is started
+     *
+     * @param schedulableRequest
+     * @throws HyracksDataException
+     */
+    void ensureSchedulable(ISchedulableClientRequest schedulableRequest) throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ISchedulableClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ISchedulableClientRequest.java
new file mode 100644
index 0000000..7723550
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ISchedulableClientRequest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public interface ISchedulableClientRequest {
+
+    /**
+     * Gets the client request
+     *
+     * @return the client request
+     */
+    IClientRequest getClientRequest();
+
+    /**
+     * Gets the request common parameters
+     *
+     * @return the request common parameters
+     */
+    ICommonRequestParameters getRequestParameters();
+
+    /**
+     * Gets the request's job specification
+     *
+     * @return
+     */
+    JobSpecification getJobSpecification();
+
+    /**
+     * Gets the metadata provider used to execute this request
+     *
+     * @return the metadata provider
+     */
+    IMetadataProvider getMetadataProvider();
+}
\ No newline at end of file