[NO ISSUE] Add internal function jobs() to retrieve job information
- user model changes: added function jobs()
- storage format changes: no
- interface changes: no
Change-Id: I80d786899d6dd5970c8faf24041bc60da881a365
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3115
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesDatasource.java
new file mode 100644
index 0000000..b92dc55
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesDatasource.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+
+public class JobSummariesDatasource extends FunctionDataSource {
+
+ private static final DataSourceId JOB_SUMMARIES_DATASOURCE_ID = new DataSourceId(
+ JobSummariesRewriter.JOBSUMMARIES.getNamespace(), JobSummariesRewriter.JOBSUMMARIES.getName());
+
+ public JobSummariesDatasource(INodeDomain domain) throws AlgebricksException {
+ super(JOB_SUMMARIES_DATASOURCE_ID, domain);
+ }
+
+ @Override
+ protected IDatasourceFunction createFunction(MetadataProvider metadataProvider,
+ AlgebricksAbsolutePartitionConstraint locations) {
+ return new JobSummariesFunction(AlgebricksAbsolutePartitionConstraint.randomLocation(locations.getLocations()));
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesFunction.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesFunction.java
new file mode 100644
index 0000000..82062e2
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesFunction.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import static org.apache.asterix.app.message.ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.app.message.GetJobSummariesRequest;
+import org.apache.asterix.app.message.GetJobSummariesResponse;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.metadata.declared.AbstractDatasourceFunction;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class JobSummariesFunction extends AbstractDatasourceFunction {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ private static final long serialVersionUID = 1L;
+
+ public JobSummariesFunction(AlgebricksAbsolutePartitionConstraint locations) {
+ super(locations);
+ }
+
+ @Override
+ public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException {
+ INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext();
+ INCMessageBroker messageBroker = (INCMessageBroker) serviceCtx.getMessageBroker();
+ MessageFuture messageFuture = messageBroker.registerMessageFuture();
+ final long futureId = messageFuture.getFutureId();
+ GetJobSummariesRequest request = new GetJobSummariesRequest(serviceCtx.getNodeId(), futureId);
+ try {
+ messageBroker.sendMessageToPrimaryCC(request);
+ GetJobSummariesResponse response =
+ (GetJobSummariesResponse) messageFuture.get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ return new JobSummariesReader(response.getSummaries());
+ } catch (Exception e) {
+ LOGGER.warn("Could no retrieve jobs info", e);
+ throw HyracksDataException.create(e);
+ } finally {
+ messageBroker.deregisterMessageFuture(futureId);
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesReader.java
new file mode 100644
index 0000000..d5a3272
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesReader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+
+public class JobSummariesReader extends FunctionReader {
+
+ private final String[] summaries;
+ private int pos = 0;
+
+ public JobSummariesReader(String[] summaries) {
+ this.summaries = summaries;
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ return pos < summaries.length;
+ }
+
+ @Override
+ public IRawRecord<char[]> next() throws IOException, InterruptedException {
+ CharArrayRecord record = new CharArrayRecord();
+ record.set(summaries[pos++]);
+ record.endRecord();
+ return record;
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesRewriter.java
new file mode 100644
index 0000000..ef753b4
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesRewriter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class JobSummariesRewriter extends FunctionRewriter {
+
+ public static final FunctionIdentifier JOBSUMMARIES =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "jobs", 0);
+ public static final JobSummariesRewriter INSTANCE = new JobSummariesRewriter(JOBSUMMARIES);
+
+ private JobSummariesRewriter(FunctionIdentifier functionId) {
+ super(functionId);
+ }
+
+ @Override
+ protected FunctionDataSource toDatasource(IOptimizationContext context, AbstractFunctionCallExpression f)
+ throws AlgebricksException {
+ return new JobSummariesDatasource(context.getComputationNodeDomain());
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesRequest.java
new file mode 100644
index 0000000..b885b13
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesRequest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.work.GetJobSummariesJSONWork;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+
+public class GetJobSummariesRequest implements ICcAddressedMessage {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final long serialVersionUID = 1L;
+ private final String nodeId;
+ private final long reqId;
+
+ public GetJobSummariesRequest(String nodeId, long reqId) {
+ this.nodeId = nodeId;
+ this.reqId = reqId;
+ }
+
+ @Override
+ public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService();
+ GetJobSummariesJSONWork gjse = new GetJobSummariesJSONWork(ccs.getJobManager());
+ try {
+ ccs.getWorkQueue().scheduleAndSync(gjse);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARN, "Failure getting jobs", e);
+ throw HyracksDataException.create(e);
+ }
+ final ArrayNode gjseSummaries = gjse.getSummaries();
+ final int size = gjseSummaries.size();
+ String[] summaries = new String[size];
+ for (int i = 0; i < size; ++i) {
+ summaries[i] = gjseSummaries.get(i).toString();
+ }
+ GetJobSummariesResponse response = new GetJobSummariesResponse(reqId, summaries);
+ CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ try {
+ messageBroker.sendApplicationMessageToNC(response, nodeId);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARN, "Failure sending response to nc", e);
+ throw HyracksDataException.create(e);
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesResponse.java
new file mode 100644
index 0000000..7554d53
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesResponse.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class GetJobSummariesResponse implements INcAddressedMessage {
+
+ private static final long serialVersionUID = 1L;
+ private final long reqId;
+ private final String[] summaries;
+
+ public GetJobSummariesResponse(long reqId, String[] summaries) {
+ this.reqId = reqId;
+ this.summaries = summaries;
+ }
+
+ @Override
+ public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ MessageFuture future = mb.deregisterMessageFuture(reqId);
+ if (future != null) {
+ future.complete(this);
+ }
+ }
+
+ public String[] getSummaries() {
+ return summaries;
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
index 3407d59..a8314ec 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
@@ -22,6 +22,7 @@
import org.apache.asterix.app.function.DatasetResourcesRewriter;
import org.apache.asterix.app.function.DatasetRewriter;
import org.apache.asterix.app.function.FeedRewriter;
+import org.apache.asterix.app.function.JobSummariesRewriter;
import org.apache.asterix.app.function.PingRewriter;
import org.apache.asterix.app.function.StorageComponentsRewriter;
import org.apache.asterix.om.functions.BuiltinFunctions;
@@ -60,6 +61,11 @@
(expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true);
BuiltinFunctions.addUnnestFun(ActiveRequestsRewriter.ACTIVE_REQUESTS, true);
BuiltinFunctions.addDatasourceFunction(ActiveRequestsRewriter.ACTIVE_REQUESTS, ActiveRequestsRewriter.INSTANCE);
+ // job-summaries function
+ BuiltinFunctions.addPrivateFunction(JobSummariesRewriter.JOBSUMMARIES,
+ (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true);
+ BuiltinFunctions.addUnnestFun(JobSummariesRewriter.JOBSUMMARIES, true);
+ BuiltinFunctions.addDatasourceFunction(JobSummariesRewriter.JOBSUMMARIES, JobSummariesRewriter.INSTANCE);
}
private MetadataBuiltinFunctions() {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/jobs/jobs.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/jobs/jobs.1.query.sqlpp
new file mode 100644
index 0000000..e8e898f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/jobs/jobs.1.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+SET `import-private-functions` "true";
+select value j
+from jobs() j
+order by j.`start-time` desc
+limit 1;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/jobs/jobs.1.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/jobs/jobs.1.regex
new file mode 100644
index 0000000..914f6cb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/jobs/jobs.1.regex
@@ -0,0 +1 @@
+/\{ "type": "job-summary", "job-id": "JID:.*", "create-time": \d*, "start-time": \d*, "end-time": \d*, "status": "RUNNING" \}/
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index ceee5f9..f753bd0 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -4987,6 +4987,11 @@
<output-dir compare="Text">active_requests</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="misc">
+ <compilation-unit name="jobs">
+ <output-dir compare="Text">jobs</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
<test-group name="index">
<test-group name="index/validations">