[NO ISSUE][OTH] Add API To Ensure Request Requirements
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Refactor IRequestParameters as ICommonRequestParameters
and IRequestParameters to break cyclic dependencies.
- Add new API to ensure request can be scheduled for execution.
Change-Id: Ifb0513e0baf2b473006d4aa23040c86751fbb4fc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3181
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Murtadha Hubail <mhubail@apache.org>
Tested-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index 014bf3c..fe6aeea 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -18,9 +18,7 @@
*/
package org.apache.asterix.translator;
-import java.util.Map;
-
-import org.apache.asterix.common.api.IRequestReference;
+import org.apache.asterix.common.api.ICommonRequestParameters;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -35,11 +33,10 @@
protected Thread executor;
protected String clientContextId;
- public ClientRequest(IRequestReference requestReference, String clientContextId, String statement,
- Map<String, String> optionalParameters) {
- super(requestReference);
- this.clientContextId = clientContextId;
- this.statement = statement;
+ public ClientRequest(ICommonRequestParameters requestParameters) {
+ super(requestParameters.getRequestReference());
+ this.clientContextId = requestParameters.getClientContextId();
+ this.statement = requestParameters.getStatement();
this.executor = Thread.currentThread();
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
index 86ba301..6e41cd2 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
@@ -20,12 +20,13 @@
import java.util.Map;
+import org.apache.asterix.common.api.ICommonRequestParameters;
import org.apache.asterix.common.api.IRequestReference;
import org.apache.asterix.om.base.IAObject;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.api.result.IResultSet;
-public interface IRequestParameters {
+public interface IRequestParameters extends ICommonRequestParameters {
/**
* @return A Resultset client object that is used to read the results.
@@ -50,36 +51,7 @@
IStatementExecutor.ResultMetadata getOutMetadata();
/**
- * @return the client context id for the query
- */
- String getClientContextId();
-
- /**
- * @return Optional request parameters. Otherwise null.
- */
- Map<String, String> getOptionalParameters();
-
- /**
* @return Statement parameters
*/
Map<String, IAObject> getStatementParameters();
-
- /**
- * @return true if the request accepts multiple statements. Otherwise, false.
- */
- boolean isMultiStatement();
-
- /**
- * Gets the statement the client provided with the request
- *
- * @return the request statement
- */
- String getStatement();
-
- /**
- * The request reference of this {@link IRequestParameters}
- *
- * @return the request reference
- */
- IRequestReference getRequestReference();
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
index c174c02..52aab20 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
@@ -18,15 +18,15 @@
*/
package org.apache.asterix.translator;
-import java.util.Map;
import java.util.UUID;
import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.api.ICommonRequestParameters;
import org.apache.asterix.common.api.IReceptionist;
import org.apache.asterix.common.api.IRequestReference;
+import org.apache.asterix.common.api.ISchedulableClientRequest;
import org.apache.asterix.common.api.RequestReference;
import org.apache.http.HttpHeaders;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.util.NetworkUtil;
@@ -48,8 +48,12 @@
}
@Override
- public IClientRequest requestReceived(IRequestReference requestRef, String clientContextId, String statement,
- Map<String, String> optionalParameters) throws HyracksDataException {
- return new ClientRequest(requestRef, clientContextId, statement, optionalParameters);
+ public IClientRequest requestReceived(ICommonRequestParameters requestParameters) {
+ return new ClientRequest(requestParameters);
+ }
+
+ @Override
+ public void ensureSchedulable(ISchedulableClientRequest schedulableRequest) {
+ // currently we don't have any restrictions
}
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SchedulableClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SchedulableClientRequest.java
new file mode 100644
index 0000000..ca04463
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SchedulableClientRequest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.translator;
+
+import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.api.ICommonRequestParameters;
+import org.apache.asterix.common.api.ISchedulableClientRequest;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class SchedulableClientRequest implements ISchedulableClientRequest {
+
+ private final IClientRequest clientRequest;
+ private final JobSpecification jobSpec;
+ private final IMetadataProvider metadataProvider;
+ private final ICommonRequestParameters requestParameters;
+
+ private SchedulableClientRequest(IClientRequest clientRequest, ICommonRequestParameters requestParameters,
+ IMetadataProvider metadataProvider, JobSpecification jobSpec) {
+ this.clientRequest = clientRequest;
+ this.requestParameters = requestParameters;
+ this.metadataProvider = metadataProvider;
+ this.jobSpec = jobSpec;
+ }
+
+ public static SchedulableClientRequest of(IClientRequest clientRequest, ICommonRequestParameters requestParameters,
+ IMetadataProvider metadataProvider, JobSpecification jobSpec) {
+ return new SchedulableClientRequest(clientRequest, requestParameters, metadataProvider, jobSpec);
+ }
+
+ @Override
+ public IClientRequest getClientRequest() {
+ return clientRequest;
+ }
+
+ @Override
+ public ICommonRequestParameters getRequestParameters() {
+ return requestParameters;
+ }
+
+ @Override
+ public JobSpecification getJobSpecification() {
+ return jobSpec;
+ }
+
+ @Override
+ public IMetadataProvider getMetadataProvider() {
+ return metadataProvider;
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index f89b2db..c49fcdd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -163,6 +163,7 @@
import org.apache.asterix.translator.ExecutionPlansHtmlPrintUtil;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.SchedulableClientRequest;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionOutput;
import org.apache.asterix.translator.TypeTranslator;
@@ -2499,7 +2500,7 @@
case ASYNC:
MutableBoolean printed = new MutableBoolean(false);
executorService.submit(() -> asyncCreateAndRunJob(hcc, compiler, locker, resultDelivery,
- requestParameters, cancellable, resultSetId, printed));
+ requestParameters, cancellable, resultSetId, printed, metadataProvider));
synchronized (printed) {
while (!printed.booleanValue()) {
printed.wait();
@@ -2514,7 +2515,7 @@
sessionOutput.release();
ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats,
metadataProvider.findOutputRecordType());
- }, requestParameters, cancellable, appCtx);
+ }, requestParameters, cancellable, appCtx, metadataProvider);
break;
case DEFERRED:
createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
@@ -2524,7 +2525,7 @@
outMetadata.getResultSets()
.add(Triple.of(id, resultSetId, metadataProvider.findOutputRecordType()));
}
- }, requestParameters, cancellable, appCtx);
+ }, requestParameters, cancellable, appCtx, metadataProvider);
break;
default:
break;
@@ -2552,7 +2553,7 @@
private void asyncCreateAndRunJob(IHyracksClientConnection hcc, IStatementCompiler compiler, IMetadataLocker locker,
ResultDelivery resultDelivery, IRequestParameters requestParameters, boolean cancellable,
- ResultSetId resultSetId, MutableBoolean printed) {
+ ResultSetId resultSetId, MutableBoolean printed, MetadataProvider metadataProvider) {
Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID);
try {
createAndRunJob(hcc, jobFlags, jobId, compiler, locker, resultDelivery, id -> {
@@ -2563,7 +2564,7 @@
printed.setTrue();
printed.notify();
}
- }, requestParameters, cancellable, appCtx);
+ }, requestParameters, cancellable, appCtx, metadataProvider);
} catch (Exception e) {
if (Objects.equals(JobId.INVALID, jobId.getValue())) {
// compilation failed
@@ -2594,7 +2595,8 @@
private static void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer,
- IRequestParameters requestParameters, boolean cancellable, ICcApplicationContext appCtx) throws Exception {
+ IRequestParameters requestParameters, boolean cancellable, ICcApplicationContext appCtx,
+ MetadataProvider metadataProvider) throws Exception {
final IRequestTracker requestTracker = appCtx.getRequestTracker();
final ClientRequest clientRequest =
(ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
@@ -2607,6 +2609,9 @@
if (cancellable) {
clientRequest.markCancellable();
}
+ final SchedulableClientRequest schedulableRequest =
+ SchedulableClientRequest.of(clientRequest, requestParameters, metadataProvider, jobSpec);
+ appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
clientRequest.setJobId(jobId);
if (jId != null) {
@@ -2946,9 +2951,7 @@
}
protected void trackRequest(IRequestParameters requestParameters) throws HyracksDataException {
- final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(
- requestParameters.getRequestReference(), requestParameters.getClientContextId(),
- requestParameters.getStatement(), requestParameters.getOptionalParameters());
+ final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters);
appCtx.getRequestTracker().track(clientRequest);
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
index 1a526b2..2eef241 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -30,6 +30,7 @@
import org.apache.asterix.api.http.server.CcQueryCancellationServlet;
import org.apache.asterix.api.http.server.ServletConstants;
+import org.apache.asterix.app.translator.RequestParameters;
import org.apache.asterix.common.api.RequestReference;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.runtime.utils.RequestTracker;
@@ -66,7 +67,9 @@
verify(mockResponse, times(1)).setStatus(HttpResponseStatus.NOT_FOUND);
final RequestReference requestReference = RequestReference.of("1", "node1", System.currentTimeMillis());
- ClientRequest request = new ClientRequest(requestReference, "1", "select 1;", new HashMap<>());
+ RequestParameters requestParameters =
+ new RequestParameters(requestReference, "select 1", null, null, null, null, "1", null, null, true);
+ ClientRequest request = new ClientRequest(requestParameters);
request.setJobId(new JobId(1));
request.markCancellable();
tracker.track(request);
@@ -81,7 +84,9 @@
// Tests the case that the job cancellation hit some exception from Hyracks.
final RequestReference requestReference2 = RequestReference.of("2", "node1", System.currentTimeMillis());
- ClientRequest request2 = new ClientRequest(requestReference2, "2", "select 1;", new HashMap<>());
+ requestParameters =
+ new RequestParameters(requestReference2, "select 1", null, null, null, null, "2", null, null, true);
+ ClientRequest request2 = new ClientRequest(requestParameters);
request2.setJobId(new JobId(2));
request2.markCancellable();
tracker.track(request2);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ICommonRequestParameters.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ICommonRequestParameters.java
new file mode 100644
index 0000000..8daff1d
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ICommonRequestParameters.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import java.util.Map;
+
+public interface ICommonRequestParameters {
+
+ /**
+ * The request reference of this {@link ICommonRequestParameters}
+ *
+ * @return the request reference
+ */
+ IRequestReference getRequestReference();
+
+ /**
+ * @return the client context id for the request
+ */
+ String getClientContextId();
+
+ /**
+ * @return Optional request parameters. Otherwise null.
+ */
+ Map<String, String> getOptionalParameters();
+
+ /**
+ * @return true if the request accepts multiple statements. Otherwise, false.
+ */
+ boolean isMultiStatement();
+
+ /**
+ * Gets the statement the client provided with the request
+ *
+ * @return the request statement
+ */
+ String getStatement();
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
index 51df306..95ed22e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.common.api;
-import java.util.Map;
-
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.http.api.IServletRequest;
@@ -36,13 +34,17 @@
/**
* Generates a {@link IClientRequest} based on the requests parameters
*
- * @param requestRef
- * @param clientContextId
- * @param statement
- * @param getOptionalParameters
- * @return A client request
+ * @param requestParameters
+ * @return the client request
* @throws HyracksDataException
*/
- IClientRequest requestReceived(IRequestReference requestRef, String clientContextId, String statement,
- Map<String, String> getOptionalParameters) throws HyracksDataException;
+ IClientRequest requestReceived(ICommonRequestParameters requestParameters) throws HyracksDataException;
+
+ /**
+ * Ensures a client's request can be executed before its job is started
+ *
+ * @param schedulableRequest
+ * @throws HyracksDataException
+ */
+ void ensureSchedulable(ISchedulableClientRequest schedulableRequest) throws HyracksDataException;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ISchedulableClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ISchedulableClientRequest.java
new file mode 100644
index 0000000..7723550
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ISchedulableClientRequest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public interface ISchedulableClientRequest {
+
+ /**
+ * Gets the client request
+ *
+ * @return the client request
+ */
+ IClientRequest getClientRequest();
+
+ /**
+ * Gets the request common parameters
+ *
+ * @return the request common parameters
+ */
+ ICommonRequestParameters getRequestParameters();
+
+ /**
+ * Gets the request's job specification
+ *
+ * @return
+ */
+ JobSpecification getJobSpecification();
+
+ /**
+ * Gets the metadata provider used to execute this request
+ *
+ * @return the metadata provider
+ */
+ IMetadataProvider getMetadataProvider();
+}
\ No newline at end of file