Fix Pre-Distributed Jobs
This fix prevents ResultStateSweeper from deinitializing
pre-distributed jobs before they are dropped.
Also fixes issues with new lock manager code
Change-Id: Id50c52fbc7c891761dcabd654fb9b853b5f7a91d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1656
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 59b3e88..88d07e4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -1735,7 +1735,12 @@
}
};
if (compileOnly) {
- return compiler.compile();
+ locker.lock();
+ try {
+ return compiler.compile();
+ } finally {
+ locker.unlock();
+ }
}
if (stmtInsertUpsert.getReturnExpression() != null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java
index a376d53..c8463d3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java
@@ -29,4 +29,6 @@
public IDatasetStateRecord getState(JobId jobId);
public void deinitState(JobId jobId);
+
+ public long getResultTimestamp(JobId jobId);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index c47a612..8722b92 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -155,7 +155,7 @@
ccContext = new ClusterControllerContext(topology);
sweeper = new DeadNodeSweeper();
datasetDirectoryService = new DatasetDirectoryService(ccConfig.getResultTTL(),
- ccConfig.getResultSweepThreshold());
+ ccConfig.getResultSweepThreshold(), preDistributedJobStore);
deploymentRunMap = new HashMap<>();
stateDumpRunMap = new HashMap<>();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
index c573ae8..117621f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
@@ -62,6 +62,10 @@
return descriptor;
}
+ public boolean jobIsPredistributed(JobId jobId) {
+ return preDistributedJobDescriptorMap.get(jobId) != null;
+ }
+
public void removeDistributedJobDescriptor(JobId jobId) throws HyracksException {
PreDistributedJobDescriptor descriptor = preDistributedJobDescriptorMap.get(jobId);
if (descriptor == null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 8fd15f9..3cc41c5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -41,6 +41,7 @@
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.control.cc.PreDistributedJobStore;
import org.apache.hyracks.control.common.dataset.ResultStateSweeper;
import org.apache.hyracks.control.common.work.IResultCallback;
@@ -61,9 +62,13 @@
private final Map<JobId, JobResultInfo> jobResultLocations;
- public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
+ private final PreDistributedJobStore preDistributedJobStore;
+
+ public DatasetDirectoryService(long resultTTL, long resultSweepThreshold,
+ PreDistributedJobStore preDistributedJobStore) {
this.resultTTL = resultTTL;
this.resultSweepThreshold = resultSweepThreshold;
+ this.preDistributedJobStore = preDistributedJobStore;
jobResultLocations = new LinkedHashMap<JobId, JobResultInfo>();
}
@@ -178,11 +183,19 @@
}
@Override
- public synchronized IDatasetStateRecord getState(JobId jobId) {
+ public IDatasetStateRecord getState(JobId jobId) {
return getDatasetJobRecord(jobId);
}
@Override
+ public synchronized long getResultTimestamp(JobId jobId) {
+ if (preDistributedJobStore.jobIsPredistributed(jobId)){
+ return -1;
+ }
+ return getState(jobId).getTimestamp();
+ }
+
+ @Override
public synchronized void deinitState(JobId jobId) {
jobResultLocations.remove(jobId);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
index e7ac389..da1714b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
@@ -25,7 +25,6 @@
import java.util.logging.Logger;
import org.apache.hyracks.api.dataset.IDatasetManager;
-import org.apache.hyracks.api.dataset.IDatasetStateRecord;
import org.apache.hyracks.api.job.JobId;
/**
@@ -70,8 +69,8 @@
synchronized (datasetManager) {
toBeCollected.clear();
for (JobId jobId : datasetManager.getJobIds()) {
- final IDatasetStateRecord state = datasetManager.getState(jobId);
- if (state != null && System.currentTimeMillis() > state.getTimestamp() + resultTTL) {
+ final long timestamp = datasetManager.getResultTimestamp(jobId);
+ if (timestamp != -1 && System.currentTimeMillis() > timestamp + resultTTL) {
toBeCollected.add(jobId);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 9c28c16..9242bba 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -120,7 +120,7 @@
private final Map<JobId, Joblet> jobletMap;
- private final Map<JobId, ActivityClusterGraph> activityClusterGraphMap;
+ private final Map<JobId, ActivityClusterGraph> preDistributedJobActivityClusterGraphMap;
private ExecutorService executor;
@@ -180,7 +180,7 @@
lccm = new LifeCycleComponentManager();
workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
jobletMap = new Hashtable<>();
- activityClusterGraphMap = new Hashtable<>();
+ preDistributedJobActivityClusterGraphMap = new Hashtable<>();
timer = new Timer(true);
serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
new File(new File(NodeControllerService.class.getName()), id));
@@ -373,27 +373,27 @@
}
public void storeActivityClusterGraph(JobId jobId, ActivityClusterGraph acg) throws HyracksException {
- if (activityClusterGraphMap.get(jobId) != null) {
+ if (preDistributedJobActivityClusterGraphMap.get(jobId) != null) {
throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
}
- activityClusterGraphMap.put(jobId, acg);
+ preDistributedJobActivityClusterGraphMap.put(jobId, acg);
}
public void removeActivityClusterGraph(JobId jobId) throws HyracksException {
- if (activityClusterGraphMap.get(jobId) == null) {
+ if (preDistributedJobActivityClusterGraphMap.get(jobId) == null) {
throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
}
- activityClusterGraphMap.remove(jobId);
+ preDistributedJobActivityClusterGraphMap.remove(jobId);
}
public void checkForDuplicateDistributedJob(JobId jobId) throws HyracksException {
- if (activityClusterGraphMap.get(jobId) != null) {
+ if (preDistributedJobActivityClusterGraphMap.get(jobId) != null) {
throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
}
}
public ActivityClusterGraph getActivityClusterGraph(JobId jobId) throws HyracksException {
- return activityClusterGraphMap.get(jobId);
+ return preDistributedJobActivityClusterGraphMap.get(jobId);
}
public NetworkManager getNetworkManager() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 930a3e0..c7b563b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -178,11 +178,20 @@
}
@Override
- public synchronized IDatasetStateRecord getState(JobId jobId) {
+ public IDatasetStateRecord getState(JobId jobId) {
return partitionResultStateMap.get(jobId);
}
@Override
+ public synchronized long getResultTimestamp(JobId jobId) {
+ IDatasetStateRecord r = getState(jobId);
+ if (r == null) {
+ return -1;
+ }
+ return r.getTimestamp();
+ }
+
+ @Override
public synchronized void deinitState(JobId jobId) {
deinit(jobId);
partitionResultStateMap.remove(jobId);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
index f911a75..4a01fdb 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
@@ -78,6 +78,7 @@
ncConfig1.setClusterListenAddress("127.0.0.1");
ncConfig1.setDataListenAddress("127.0.0.1");
ncConfig1.setResultListenAddress("127.0.0.1");
+ ncConfig1.setResultSweepThreshold(5000);
ncConfig1.setIODevices(new String [] { joinPath(System.getProperty("user.dir"), "target", "data", "device0") });
NodeControllerService nc1Base = new NodeControllerService(ncConfig1);
nc1 = Mockito.spy(nc1Base);
@@ -89,6 +90,7 @@
ncConfig2.setClusterListenAddress("127.0.0.1");
ncConfig2.setDataListenAddress("127.0.0.1");
ncConfig2.setResultListenAddress("127.0.0.1");
+ ncConfig2.setResultSweepThreshold(5000);
ncConfig2.setIODevices(new String [] { joinPath(System.getProperty("user.dir"), "target", "data", "device1") });
NodeControllerService nc2Base = new NodeControllerService(ncConfig2);
nc2 = Mockito.spy(nc2Base);
@@ -143,6 +145,10 @@
hcc.startJob(jobId2);
hcc.waitForCompletion(jobId2);
+ //wait ten seconds to ensure the result sweeper does not break the job
+ //The result sweeper runs every 5 seconds during the tests
+ Thread.sleep(10000);
+
//run the second job again
hcc.startJob(jobId2);
hcc.waitForCompletion(jobId2);