Fix Pre-Distributed Jobs

This fix prevents ResultStateSweeper from deinitializing
pre-distributed jobs before they are dropped.

Also fixes issues with new lock manager code

Change-Id: Id50c52fbc7c891761dcabd654fb9b853b5f7a91d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1656
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 59b3e88..88d07e4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -1735,7 +1735,12 @@
             }
         };
         if (compileOnly) {
-            return compiler.compile();
+            locker.lock();
+            try {
+                return compiler.compile();
+            } finally {
+                locker.unlock();
+            }
         }
 
         if (stmtInsertUpsert.getReturnExpression() != null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java
index a376d53..c8463d3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java
@@ -29,4 +29,6 @@
     public IDatasetStateRecord getState(JobId jobId);
 
     public void deinitState(JobId jobId);
+
+    public long getResultTimestamp(JobId jobId);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index c47a612..8722b92 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -155,7 +155,7 @@
         ccContext = new ClusterControllerContext(topology);
         sweeper = new DeadNodeSweeper();
         datasetDirectoryService = new DatasetDirectoryService(ccConfig.getResultTTL(),
-                ccConfig.getResultSweepThreshold());
+                ccConfig.getResultSweepThreshold(), preDistributedJobStore);
 
         deploymentRunMap = new HashMap<>();
         stateDumpRunMap = new HashMap<>();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
index c573ae8..117621f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
@@ -62,6 +62,10 @@
         return descriptor;
     }
 
+    public boolean jobIsPredistributed(JobId jobId) {
+        return preDistributedJobDescriptorMap.get(jobId) != null;
+    }
+
     public void removeDistributedJobDescriptor(JobId jobId) throws HyracksException {
         PreDistributedJobDescriptor descriptor = preDistributedJobDescriptorMap.get(jobId);
         if (descriptor == null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 8fd15f9..3cc41c5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -41,6 +41,7 @@
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.control.cc.PreDistributedJobStore;
 import org.apache.hyracks.control.common.dataset.ResultStateSweeper;
 import org.apache.hyracks.control.common.work.IResultCallback;
 
@@ -61,9 +62,13 @@
 
     private final Map<JobId, JobResultInfo> jobResultLocations;
 
-    public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
+    private final PreDistributedJobStore preDistributedJobStore;
+
+    public DatasetDirectoryService(long resultTTL, long resultSweepThreshold,
+            PreDistributedJobStore preDistributedJobStore) {
         this.resultTTL = resultTTL;
         this.resultSweepThreshold = resultSweepThreshold;
+        this.preDistributedJobStore = preDistributedJobStore;
         jobResultLocations = new LinkedHashMap<JobId, JobResultInfo>();
     }
 
@@ -178,11 +183,19 @@
     }
 
     @Override
-    public synchronized IDatasetStateRecord getState(JobId jobId) {
+    public IDatasetStateRecord getState(JobId jobId) {
         return getDatasetJobRecord(jobId);
     }
 
     @Override
+    public synchronized long getResultTimestamp(JobId jobId) {
+        if (preDistributedJobStore.jobIsPredistributed(jobId)){
+            return -1;
+        }
+        return getState(jobId).getTimestamp();
+    }
+
+    @Override
     public synchronized void deinitState(JobId jobId) {
         jobResultLocations.remove(jobId);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
index e7ac389..da1714b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
@@ -25,7 +25,6 @@
 import java.util.logging.Logger;
 
 import org.apache.hyracks.api.dataset.IDatasetManager;
-import org.apache.hyracks.api.dataset.IDatasetStateRecord;
 import org.apache.hyracks.api.job.JobId;
 
 /**
@@ -70,8 +69,8 @@
         synchronized (datasetManager) {
             toBeCollected.clear();
             for (JobId jobId : datasetManager.getJobIds()) {
-                final IDatasetStateRecord state = datasetManager.getState(jobId);
-                if (state != null && System.currentTimeMillis() > state.getTimestamp() + resultTTL) {
+                final long timestamp = datasetManager.getResultTimestamp(jobId);
+                if (timestamp != -1 && System.currentTimeMillis() > timestamp + resultTTL) {
                     toBeCollected.add(jobId);
                 }
             }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 9c28c16..9242bba 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -120,7 +120,7 @@
 
     private final Map<JobId, Joblet> jobletMap;
 
-    private final Map<JobId, ActivityClusterGraph> activityClusterGraphMap;
+    private final Map<JobId, ActivityClusterGraph> preDistributedJobActivityClusterGraphMap;
 
     private ExecutorService executor;
 
@@ -180,7 +180,7 @@
         lccm = new LifeCycleComponentManager();
         workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
         jobletMap = new Hashtable<>();
-        activityClusterGraphMap = new Hashtable<>();
+        preDistributedJobActivityClusterGraphMap = new Hashtable<>();
         timer = new Timer(true);
         serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
                 new File(new File(NodeControllerService.class.getName()), id));
@@ -373,27 +373,27 @@
     }
 
     public void storeActivityClusterGraph(JobId jobId, ActivityClusterGraph acg) throws HyracksException {
-        if (activityClusterGraphMap.get(jobId) != null) {
+        if (preDistributedJobActivityClusterGraphMap.get(jobId) != null) {
             throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
         }
-        activityClusterGraphMap.put(jobId, acg);
+        preDistributedJobActivityClusterGraphMap.put(jobId, acg);
     }
 
     public void removeActivityClusterGraph(JobId jobId) throws HyracksException {
-        if (activityClusterGraphMap.get(jobId) == null) {
+        if (preDistributedJobActivityClusterGraphMap.get(jobId) == null) {
             throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
         }
-        activityClusterGraphMap.remove(jobId);
+        preDistributedJobActivityClusterGraphMap.remove(jobId);
     }
 
     public void checkForDuplicateDistributedJob(JobId jobId) throws HyracksException {
-        if (activityClusterGraphMap.get(jobId) != null) {
+        if (preDistributedJobActivityClusterGraphMap.get(jobId) != null) {
             throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
         }
     }
 
     public ActivityClusterGraph getActivityClusterGraph(JobId jobId) throws HyracksException {
-        return activityClusterGraphMap.get(jobId);
+        return preDistributedJobActivityClusterGraphMap.get(jobId);
     }
 
     public NetworkManager getNetworkManager() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 930a3e0..c7b563b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -178,11 +178,20 @@
     }
 
     @Override
-    public synchronized IDatasetStateRecord getState(JobId jobId) {
+    public IDatasetStateRecord getState(JobId jobId) {
         return partitionResultStateMap.get(jobId);
     }
 
     @Override
+    public synchronized long getResultTimestamp(JobId jobId) {
+        IDatasetStateRecord r = getState(jobId);
+        if (r == null) {
+            return -1;
+        }
+        return r.getTimestamp();
+    }
+
+    @Override
     public synchronized void deinitState(JobId jobId) {
         deinit(jobId);
         partitionResultStateMap.remove(jobId);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
index f911a75..4a01fdb 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
@@ -78,6 +78,7 @@
         ncConfig1.setClusterListenAddress("127.0.0.1");
         ncConfig1.setDataListenAddress("127.0.0.1");
         ncConfig1.setResultListenAddress("127.0.0.1");
+        ncConfig1.setResultSweepThreshold(5000);
         ncConfig1.setIODevices(new String [] { joinPath(System.getProperty("user.dir"), "target", "data", "device0") });
         NodeControllerService nc1Base = new NodeControllerService(ncConfig1);
         nc1 = Mockito.spy(nc1Base);
@@ -89,6 +90,7 @@
         ncConfig2.setClusterListenAddress("127.0.0.1");
         ncConfig2.setDataListenAddress("127.0.0.1");
         ncConfig2.setResultListenAddress("127.0.0.1");
+        ncConfig2.setResultSweepThreshold(5000);
         ncConfig2.setIODevices(new String [] { joinPath(System.getProperty("user.dir"), "target", "data", "device1") });
         NodeControllerService nc2Base = new NodeControllerService(ncConfig2);
         nc2 = Mockito.spy(nc2Base);
@@ -143,6 +145,10 @@
         hcc.startJob(jobId2);
         hcc.waitForCompletion(jobId2);
 
+        //wait ten seconds to ensure the result sweeper does not break the job
+        //The result sweeper runs every 5 seconds during the tests
+        Thread.sleep(10000);
+
         //run the second job again
         hcc.startJob(jobId2);
         hcc.waitForCompletion(jobId2);