[ASTERIXDB-2269][RT] Use Job Locations To Estimate Resources
- user model changes: no
- storage format changes: no
- interface changes: yes
- INodeJobTracker (+) getJobParticipatingNodes
Details:
- Use job locations to calculate the job's required
resources rather than all cluster locations.
- Add test case.
Change-Id: Iecd8e234aa52a9f324e64044e01477fb12dc14e6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2330
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 4428e05..b0edb3e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -22,6 +22,7 @@
import java.io.PrintWriter;
import java.rmi.RemoteException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -32,6 +33,7 @@
import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslator;
import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslatorFactory;
import org.apache.asterix.api.http.server.ResultUtil;
+import org.apache.asterix.common.api.INodeJobTracker;
import org.apache.asterix.common.config.CompilerProperties;
import org.apache.asterix.common.config.OptimizationConfUtil;
import org.apache.asterix.common.exceptions.ACIDException;
@@ -100,6 +102,7 @@
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
import org.apache.hyracks.control.common.config.OptionTypes;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -346,8 +349,14 @@
if (statement == null) {
// Sets a required capacity, only for read-only queries.
// DDLs and DMLs are considered not that frequent.
- spec.setRequiredClusterCapacity(ResourceUtils.getRequiredCompacity(plan, computationLocations,
- sortFrameLimit, groupFrameLimit, joinFrameLimit, frameSize));
+ // limit the computation locations to the locations that will be used in the query
+ final AlgebricksAbsolutePartitionConstraint jobLocations =
+ getJobLocations(spec, metadataProvider.getApplicationContext().getNodeJobTracker(),
+ computationLocations);
+ final IClusterCapacity jobRequiredCapacity = ResourceUtils
+ .getRequiredCapacity(plan, jobLocations, sortFrameLimit, groupFrameLimit, joinFrameLimit,
+ frameSize);
+ spec.setRequiredClusterCapacity(jobRequiredCapacity);
}
if (conf.is(SessionConfig.OOB_HYRACKS_JOB)) {
@@ -499,4 +508,12 @@
}
}
}
+
+ public static AlgebricksAbsolutePartitionConstraint getJobLocations(JobSpecification spec,
+ INodeJobTracker jobTracker, AlgebricksAbsolutePartitionConstraint clusterLocations) {
+ final Set<String> jobParticipatingNodes = jobTracker.getJobParticipatingNodes(spec);
+ return new AlgebricksAbsolutePartitionConstraint(
+ Arrays.stream(clusterLocations.getLocations()).filter(jobParticipatingNodes::contains)
+ .toArray(String[]::new));
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
index 61c1dfe..1763a98 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
@@ -52,7 +52,7 @@
* @throws AlgebricksException
* if the query plan is malformed.
*/
- public static IClusterCapacity getRequiredCompacity(ILogicalPlan plan,
+ public static IClusterCapacity getRequiredCapacity(ILogicalPlan plan,
AlgebricksAbsolutePartitionConstraint computationLocations, int sortFrameLimit, int groupFrameLimit,
int joinFrameLimit, int frameSize)
throws AlgebricksException {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/APIFrameworkTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/APIFrameworkTest.java
index e041021..90fc646 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/APIFrameworkTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/APIFrameworkTest.java
@@ -29,12 +29,18 @@
import org.apache.asterix.common.config.CompilerProperties;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.runtime.job.listener.NodeJobTracker;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IClusterInfoCollector;
import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.constraints.Constraint;
+import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
+import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
+import org.apache.hyracks.api.job.JobSpecification;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
import junit.extensions.PA;
@@ -173,4 +179,25 @@
Assert.assertTrue(loc.getLocations().length == 8);
}
+ @Test
+ public void testJobLocations() {
+ final String nc1 = "nc1";
+ final String nc2 = "nc2";
+ final NodeJobTracker nodeJobTracker = new NodeJobTracker();
+ nodeJobTracker.notifyNodeJoin(nc1, null);
+ nodeJobTracker.notifyNodeJoin(nc2, null);
+
+ final JobSpecification jobSpec = new JobSpecification();
+ // add only nc1 to the job locations
+ final ConstantExpression nc1Location = new ConstantExpression(nc1);
+ final LValueConstraintExpression lValueMock = Mockito.mock(LValueConstraintExpression.class);
+ jobSpec.getUserConstraints().add(new Constraint(lValueMock, nc1Location));
+
+ final String[] clusterLocation = new String[] { nc1, nc2 };
+ final AlgebricksAbsolutePartitionConstraint jobLocations = APIFramework
+ .getJobLocations(jobSpec, nodeJobTracker, new AlgebricksAbsolutePartitionConstraint(clusterLocation));
+ // ensure nc2 wasn't included
+ Assert.assertEquals(1, jobLocations.getLocations().length);
+ Assert.assertEquals(nc1, jobLocations.getLocations()[0]);
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
index 4503620..9966c95 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.api.application.IClusterLifecycleListener;
import org.apache.hyracks.api.job.IJobLifecycleListener;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
public interface INodeJobTracker extends IJobLifecycleListener, IClusterLifecycleListener {
@@ -34,4 +35,14 @@
* @return unmodifiable set of the node pending jobs.
*/
Set<JobId> getPendingJobs(String nodeId);
+
+ /**
+ * Gets the set of nodes that will participate in the execution
+ * of the job. The nodes will include only nodes that are known
+ * to this {@link INodeJobTracker}
+ *
+ * @param spec
+ * @return The participating nodes in the job execution
+ */
+ Set<String> getJobParticipatingNodes(JobSpecification spec);
}
\ No newline at end of file
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
index 75c5582..ff009dc 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
@@ -45,11 +45,7 @@
@Override
public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) {
- final Set<String> matchedNodes = spec.getUserConstraints().stream().map(Constraint::getRValue)
- .filter(ce -> ce.getTag() == ExpressionTag.CONSTANT).map(ConstantExpression.class::cast)
- .map(ConstantExpression::getValue).map(Object::toString).filter(nodeJobs::containsKey)
- .collect(Collectors.toSet());
- matchedNodes.stream().map(nodeJobs::get).forEach(jobsSet -> jobsSet.add(jobId));
+ getJobParticipatingNodes(spec).stream().map(nodeJobs::get).forEach(jobsSet -> jobsSet.add(jobId));
}
@Override
@@ -78,4 +74,12 @@
Collections.unmodifiableSet(nodeJobs.get(nodeId)) :
Collections.emptySet();
}
+
+ @Override
+ public Set<String> getJobParticipatingNodes(JobSpecification spec) {
+ return spec.getUserConstraints().stream().map(Constraint::getRValue)
+ .filter(ce -> ce.getTag() == ExpressionTag.CONSTANT).map(ConstantExpression.class::cast)
+ .map(ConstantExpression::getValue).map(Object::toString).filter(nodeJobs::containsKey)
+ .collect(Collectors.toSet());
+ }
}
\ No newline at end of file