[ASTERIXDB-1706][RT] Use System.nanoTime For Result Timestamp
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Use System.nanoTime for result timestamp to
avoid results being incorrectly swept due to
System.currentTimeMillis system time adjustments.
- Move sweep logic to AbstractDatasetManager.
Change-Id: I388d2a477bcfdc47d11dc6a4873483b82c9fadbf
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2315
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
index 55f1d7c..4e7ddda 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
@@ -84,7 +84,7 @@
private Map<ResultSetId, ResultSetMetaData> resultSetMetadataMap = new HashMap<>();
public DatasetJobRecord() {
- this.timestamp = System.currentTimeMillis();
+ this.timestamp = System.nanoTime();
this.status = new Status();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java
index c8463d3..a0c1f78 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java
@@ -24,11 +24,15 @@
public interface IDatasetManager {
- public Set<JobId> getJobIds();
+ Set<JobId> getJobIds();
- public IDatasetStateRecord getState(JobId jobId);
+ IDatasetStateRecord getState(JobId jobId);
- public void deinitState(JobId jobId);
+ void sweep(JobId jobId);
- public long getResultTimestamp(JobId jobId);
-}
+ /**
+ * Removes all references and deletes persisted files for
+ * all expired datasets.
+ */
+ void sweepExpiredDatasets();
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index a57baf5..04aaddd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -40,6 +40,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.control.common.dataset.AbstractDatasetManager;
import org.apache.hyracks.control.common.dataset.ResultStateSweeper;
import org.apache.hyracks.control.common.work.IResultCallback;
import org.apache.logging.log4j.Level;
@@ -53,25 +54,23 @@
* the job (after it receives all the results) completely. Then we can just get rid of the location information for that
* job.
*/
-public class DatasetDirectoryService implements IDatasetDirectoryService {
+public class DatasetDirectoryService extends AbstractDatasetManager implements IDatasetDirectoryService {
private static final Logger LOGGER = LogManager.getLogger();
- private final long resultTTL;
-
private final long resultSweepThreshold;
private final Map<JobId, JobResultInfo> jobResultLocations;
public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
- this.resultTTL = resultTTL;
+ super(resultTTL);
this.resultSweepThreshold = resultSweepThreshold;
- jobResultLocations = new LinkedHashMap<JobId, JobResultInfo>();
+ jobResultLocations = new LinkedHashMap<>();
}
@Override
public void init(ExecutorService executor) {
- executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold, LOGGER));
+ executor.execute(new ResultStateSweeper(this, resultSweepThreshold, LOGGER));
}
@Override
@@ -181,12 +180,7 @@
}
@Override
- public synchronized long getResultTimestamp(JobId jobId) {
- return getState(jobId).getTimestamp();
- }
-
- @Override
- public synchronized void deinitState(JobId jobId) {
+ public synchronized void sweep(JobId jobId) {
jobResultLocations.remove(jobId);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/AbstractDatasetManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/AbstractDatasetManager.java
new file mode 100644
index 0000000..f95229e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/AbstractDatasetManager.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.dataset;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hyracks.api.dataset.IDatasetManager;
+import org.apache.hyracks.api.dataset.IDatasetStateRecord;
+import org.apache.hyracks.api.job.JobId;
+
+public abstract class AbstractDatasetManager implements IDatasetManager {
+
+ private final long nanoResultTTL;
+
+ protected AbstractDatasetManager(long resultTTL) {
+ this.nanoResultTTL = TimeUnit.MILLISECONDS.toNanos(resultTTL);
+ }
+
+ @Override
+ public synchronized void sweepExpiredDatasets() {
+ final List<JobId> expiredDatasets = new ArrayList<>();
+ final long sweepTime = System.nanoTime();
+ for (JobId jobId : getJobIds()) {
+ final IDatasetStateRecord state = getState(jobId);
+ if (state != null && hasExpired(state, sweepTime, nanoResultTTL)) {
+ expiredDatasets.add(jobId);
+ }
+ }
+ for (JobId jobId : expiredDatasets) {
+ sweep(jobId);
+ }
+ }
+
+ private static boolean hasExpired(IDatasetStateRecord dataset, long currentTime, long ttl) {
+ return currentTime - dataset.getTimestamp() - ttl > 0;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
index a9ca771..901ec67 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
@@ -19,12 +19,7 @@
package org.apache.hyracks.control.common.dataset;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.hyracks.api.dataset.IDatasetManager;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
/**
@@ -33,53 +28,26 @@
public class ResultStateSweeper implements Runnable {
private final IDatasetManager datasetManager;
-
- private final long resultTTL;
-
private final long resultSweepThreshold;
-
private final Logger logger;
- private final List<JobId> toBeCollected;
-
- public ResultStateSweeper(IDatasetManager datasetManager, long resultTTL, long resultSweepThreshold,
- Logger logger) {
+ public ResultStateSweeper(IDatasetManager datasetManager, long resultSweepThreshold, Logger logger) {
this.datasetManager = datasetManager;
- this.resultTTL = resultTTL;
this.resultSweepThreshold = resultSweepThreshold;
this.logger = logger;
- toBeCollected = new ArrayList<JobId>();
}
@Override
- @SuppressWarnings("squid:S2142") // catch interrupted exception
public void run() {
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(resultSweepThreshold);
- sweep();
+ datasetManager.sweepExpiredDatasets();
+ logger.trace("Result state cleanup instance successfully completed.");
} catch (InterruptedException e) {
- logger.log(Level.WARN, "Result cleaner thread interrupted, shutting down.");
- break; // the interrupt was explicit from another thread. This thread should shut down...
+ logger.warn("Result cleaner thread interrupted, shutting down.");
+ Thread.currentThread().interrupt();
}
}
}
-
- private void sweep() {
- synchronized (datasetManager) {
- toBeCollected.clear();
- for (JobId jobId : datasetManager.getJobIds()) {
- final long timestamp = datasetManager.getResultTimestamp(jobId);
- if (timestamp != -1 && System.currentTimeMillis() > timestamp + resultTTL) {
- toBeCollected.add(jobId);
- }
- }
- for (JobId jobId : toBeCollected) {
- datasetManager.deinitState(jobId);
- }
- }
- if (logger.isTraceEnabled()) {
- logger.trace("Result state cleanup instance successfully completed.");
- }
- }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index d381a67..476aeae 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -26,11 +26,11 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
-import org.apache.hyracks.api.dataset.IDatasetStateRecord;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.io.IWorkspaceFileFactory;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.common.dataset.AbstractDatasetManager;
import org.apache.hyracks.control.common.dataset.ResultStateSweeper;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
@@ -38,7 +38,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-public class DatasetPartitionManager implements IDatasetPartitionManager {
+public class DatasetPartitionManager extends AbstractDatasetManager implements IDatasetPartitionManager {
private static final Logger LOGGER = LogManager.getLogger();
private final NodeControllerService ncs;
@@ -55,6 +55,7 @@
public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory, long resultTTL,
long resultSweepThreshold) {
+ super(resultTTL);
this.ncs = ncs;
this.executor = executor;
deallocatableRegistry = new DefaultDeallocatableRegistry();
@@ -65,7 +66,7 @@
datasetMemoryManager = null;
}
partitionResultStateMap = new LinkedHashMap<>();
- executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold, LOGGER));
+ executor.execute(new ResultStateSweeper(this, resultSweepThreshold, LOGGER));
}
@Override
@@ -179,16 +180,7 @@
}
@Override
- public synchronized long getResultTimestamp(JobId jobId) {
- IDatasetStateRecord r = getState(jobId);
- if (r == null) {
- return -1;
- }
- return r.getTimestamp();
- }
-
- @Override
- public synchronized void deinitState(JobId jobId) {
+ public synchronized void sweep(JobId jobId) {
deinit(jobId);
partitionResultStateMap.remove(jobId);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java
index 3957401..1a64a5a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java
@@ -37,7 +37,7 @@
private final HashMap<ResultSetId, ResultState[]> resultStateMap;
ResultSetMap() {
- timestamp = System.currentTimeMillis();
+ timestamp = System.nanoTime();
resultStateMap = new HashMap<>();
}