Fix ASTERIXDB-1690
Fix the deadlock problem ASTERIXDB-1690 in FileSystemWatcher
Change-Id: Iad358fdeeb47f5d5884fed8806a234f8b3196bec
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1393
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
index 258f194..ab1c424 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
@@ -34,7 +34,6 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -55,7 +54,6 @@
private final boolean isFeed;
private boolean done;
private final LinkedList<Path> dirs;
- private final ReentrantLock lock = new ReentrantLock();
public FileSystemWatcher(List<Path> inputResources, String expression, boolean isFeed) throws HyracksDataException {
this.isFeed = isFeed;
@@ -145,7 +143,7 @@
return (WatchEvent<T>) event;
}
- private void handleEvents(WatchKey key) throws IOException {
+ private synchronized void handleEvents(WatchKey key) throws IOException {
// get dir associated with the key
Path dir = keys.get(key);
if (dir == null) {
@@ -172,7 +170,7 @@
Path name = ev.context();
Path child = dir.resolve(name);
// if directory is created then register it and its sub-directories
- if ((kind == StandardWatchEventKinds.ENTRY_CREATE)) {
+ if (kind == StandardWatchEventKinds.ENTRY_CREATE) {
try {
if (Files.isDirectory(child, LinkOption.NOFOLLOW_LINKS)) {
register(child);
@@ -229,7 +227,7 @@
}
// take is blocking
- public synchronized File take() throws IOException {
+ public File take() throws IOException {
File next = poll();
if (next != null) {
return next;
@@ -238,36 +236,31 @@
return null;
}
// No file was found, wait for the filesystem to push events
- WatchKey key = null;
- lock.lock();
- try {
- while (!it.hasNext()) {
- try {
- key = watcher.take();
- } catch (InterruptedException x) {
- if (LOGGER.isEnabledFor(Level.WARN)) {
- LOGGER.warn("Feed Closed");
- }
- if (watcher == null) {
- return null;
- }
- continue;
- } catch (ClosedWatchServiceException e) {
- if (LOGGER.isEnabledFor(Level.WARN)) {
- LOGGER.warn("The watcher has exited");
- }
- if (watcher == null) {
- return null;
- }
- continue;
+ WatchKey key;
+ while (!it.hasNext()) {
+ try {
+ key = watcher.take();
+ } catch (InterruptedException x) {
+ if (LOGGER.isEnabledFor(Level.WARN)) {
+ LOGGER.warn("Feed Closed");
}
- handleEvents(key);
- if (endOfEvents(key)) {
+ if (watcher == null) {
return null;
}
+ continue;
+ } catch (ClosedWatchServiceException e) {
+ if (LOGGER.isEnabledFor(Level.WARN)) {
+ LOGGER.warn("The watcher has exited");
+ }
+ if (watcher == null) {
+ return null;
+ }
+ continue;
}
- } finally {
- lock.unlock();
+ handleEvents(key);
+ if (endOfEvents(key)) {
+ return null;
+ }
}
// files were found, re-create the iterator and move it one step
return it.next();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/LocalFileSystemUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/LocalFileSystemUtils.java
index f6046ef..2cb842b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/LocalFileSystemUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/LocalFileSystemUtils.java
@@ -27,6 +27,7 @@
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.LinkedList;
+import java.util.List;
import java.util.regex.Pattern;
import org.apache.asterix.common.exceptions.ErrorCode;
@@ -34,7 +35,7 @@
public class LocalFileSystemUtils {
- public static void traverse(final LinkedList<File> files, File root, final String expression,
+ public static void traverse(final List<File> files, File root, final String expression,
final LinkedList<Path> dirs) throws IOException {
final Path path = root.toPath();
if (!Files.exists(path)) {
@@ -70,8 +71,17 @@
});
}
- public static void validateAndAdd(Path path, String expression, LinkedList<File> files) {
- if (expression == null || Pattern.matches(expression, path.toString())) {
+ private static boolean fileNotExistsInList(List<File> files, Path path) {
+ for (File file : files) {
+ if (file.getPath().equals(path.toString())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static void validateAndAdd(Path path, String expression, List<File> files) {
+ if ((expression == null || Pattern.matches(expression, path.toString())) && fileNotExistsInList(files, path)) {
files.add(new File(path.toString()));
}
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index 584aead..b54c9e6 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -103,7 +103,6 @@
ARecordType adapterOutputType = getOutputType(feed, configuration, ExternalDataConstants.KEY_TYPE_NAME);
ARecordType metaType = getOutputType(feed, configuration, ExternalDataConstants.KEY_META_TYPE_NAME);
ExternalDataUtils.prepareFeed(configuration, feed.getDataverseName(), feed.getFeedName());
- ExternalDataUtils.prepareFeed(configuration, feed.getDataverseName(), feed.getFeedName());
// Get adapter from metadata dataset <Metadata dataverse>
DatasourceAdapter adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx,
MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);