Control Number of Readers for LocalFS Data

Change-Id: Ib9d5ece656220d5f562cc385f882c5ddfd3283a6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/776
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
index b98618d..d8f1893 100644
--- a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
+++ b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
@@ -20,6 +20,8 @@
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.utils.StoragePathUtil;
@@ -64,6 +66,7 @@
 
     /**
      * Builds the job spec for ingesting a (primary) feed from its external source via the feed adaptor.
+     *
      * @param primaryFeed
      * @param metadataProvider
      * @return JobSpecification the Hyracks job specification for receiving data from external source
@@ -251,12 +254,18 @@
 
     public static JobSpecification buildRemoveFeedStorageJob(Feed feed) throws Exception {
         JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        AlgebricksAbsolutePartitionConstraint locations = AsterixClusterProperties.INSTANCE.getClusterLocations();
+        AlgebricksAbsolutePartitionConstraint allCluster = AsterixClusterProperties.INSTANCE.getClusterLocations();
+        Set<String> nodes = new TreeSet<>();
+        for (String node : allCluster.getLocations()) {
+            nodes.add(node);
+        }
+        AlgebricksAbsolutePartitionConstraint locations = new AlgebricksAbsolutePartitionConstraint(
+                nodes.toArray(new String[nodes.size()]));
         FileSplit[] feedLogFileSplits = FeedUtils.splitsForAdapter(feed.getDataverseName(), feed.getFeedName(),
                 locations);
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = StoragePathUtil
                 .splitProviderAndPartitionConstraints(feedLogFileSplits);
-        FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, splitsAndConstraint.first);
+        FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, splitsAndConstraint.first, true);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod, splitsAndConstraint.second);
         spec.addRoot(frod);
         return spec;
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java
index c77ca10..d5765f1 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java
@@ -32,7 +32,7 @@
         JobSpecification jobSpec = JobSpecificationUtils.createJobSpecification();
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
                 .splitProviderAndPartitionConstraintsForDataverse(dataverse.getDataverseName());
-        FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(jobSpec, splitsAndConstraint.first);
+        FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(jobSpec, splitsAndConstraint.first, false);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, frod, splitsAndConstraint.second);
         jobSpec.addRoot(frod);
         return jobSpec;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index 041f706..d3abd50 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -39,6 +39,7 @@
 import org.apache.asterix.external.util.ExternalDataCompatibilityUtils;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -58,6 +59,7 @@
     private boolean isFeed;
     private FileSplit[] feedLogFileSplits;
     private ARecordType metaType;
+    private FeedLogManager feedLogManager = null;
 
     @Override
     public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
@@ -86,8 +88,14 @@
         } catch (AsterixException e) {
             throw new HyracksDataException(e);
         }
+        if (isFeed) {
+            if (feedLogManager == null) {
+                feedLogManager = FeedUtils.getFeedLogManager(ctx, partition, feedLogFileSplits);
+            }
+            feedLogManager.touch();
+        }
         IDataFlowController controller = DataflowControllerProvider.getDataflowController(recordType, ctx, partition,
-                dataSourceFactory, dataParserFactory, configuration, indexingOp, isFeed, feedLogFileSplits);
+                dataSourceFactory, dataParserFactory, configuration, indexingOp, isFeed, feedLogManager);
         if (isFeed) {
             return new FeedAdapter((AbstractFeedDataFlowController) controller);
         } else {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
index 83d7a3a..a4c2fae 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
@@ -22,11 +22,13 @@
 
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public abstract class AsterixInputStream extends InputStream {
 
     protected AbstractFeedDataFlowController controller;
     protected FeedLogManager logManager;
+    protected IStreamNotificationHandler notificationHandler;
 
     public abstract boolean stop() throws Exception;
 
@@ -38,7 +40,11 @@
     }
 
     // TODO: Find a better way to send notifications
-    public void setFeedLogManager(FeedLogManager logManager) {
+    public void setFeedLogManager(FeedLogManager logManager) throws HyracksDataException {
         this.logManager = logManager;
     }
+
+    public void setNotificationHandler(IStreamNotificationHandler notificationHandler) {
+        this.notificationHandler = notificationHandler;
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
index 11e2472..9cce1c9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
@@ -23,9 +23,11 @@
 
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * This interface represents a record reader that reads data from external source as a set of records
+ *
  * @param <T>
  */
 public interface IRecordReader<T> extends Closeable {
@@ -33,7 +35,7 @@
     /**
      * @return true if the reader has more records remaining, false, otherwise.
      * @throws Exception
-     *         if an error takes place
+     *             if an error takes place
      */
     public boolean hasNext() throws Exception;
 
@@ -46,6 +48,7 @@
 
     /**
      * used to stop reader from producing more records.
+     *
      * @return true if the connection to the external source has been suspended, false otherwise.
      */
     public boolean stop();
@@ -61,8 +64,10 @@
     /**
      * set a pointer to the log manager of the feed. the log manager can be used to log
      * progress and errors
+     *
+     * @throws HyracksDataException
      */
-    public void setFeedLogManager(FeedLogManager feedLogManager);
+    public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException;
 
     /**
      * gives the record reader a chance to recover from IO errors during feed intake
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamNotificationHandler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamNotificationHandler.java
new file mode 100644
index 0000000..8b014ad
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamNotificationHandler.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+public interface IStreamNotificationHandler {
+
+    /**
+     * Used to notify a handler that the stream is about to start reading data from a new source.
+     * An example use is by the parser to skip CSV file headers in case the stream reads from a set of files.
+     */
+    public void notifyNewSource();
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
index 8ec422f..a301ac9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
@@ -25,6 +25,7 @@
 import org.apache.asterix.external.api.IRecordWithPKDataParser;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
 public class ChangeFeedDataFlowController<T> extends FeedRecordDataFlowController<T> {
@@ -33,7 +34,8 @@
 
     public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
             final FeedLogManager feedLogManager, final int numOfOutputFields,
-            final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader) {
+            final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader)
+            throws HyracksDataException {
         super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
         this.dataParser = dataParser;
     }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
index 370eec0..aac7be2 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
@@ -25,13 +25,15 @@
 import org.apache.asterix.external.parser.RecordWithMetadataParser;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
 public class ChangeFeedWithMetaDataFlowController<T, O> extends FeedWithMetaDataFlowController<T, O> {
 
     public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
             final FeedLogManager feedLogManager, final int numOfOutputFields,
-            final RecordWithMetadataParser<T, O> dataParser, final IRecordReader<T> recordReader) {
+            final RecordWithMetadataParser<T, O> dataParser, final IRecordReader<T> recordReader)
+            throws HyracksDataException {
         super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
     }
 
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 387e2dc..a092620 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -45,7 +45,7 @@
 
     public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
             @Nonnull FeedLogManager feedLogManager, int numOfOutputFields, @Nonnull IRecordDataParser<T> dataParser,
-            @Nonnull IRecordReader<T> recordReader) {
+            @Nonnull IRecordReader<T> recordReader) throws HyracksDataException {
         super(ctx, tupleForwarder, feedLogManager, numOfOutputFields);
         this.dataParser = dataParser;
         this.recordReader = recordReader;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
index 203b5a7..e7c396b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
@@ -25,6 +25,7 @@
 import org.apache.asterix.external.parser.RecordWithMetadataParser;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
 public class FeedWithMetaDataFlowController<T, O> extends FeedRecordDataFlowController<T> {
@@ -34,7 +35,7 @@
 
     public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
             FeedLogManager feedLogManager, int numOfOutputFields, RecordWithMetadataParser<T, O> dataParser,
-            IRecordReader<T> recordReader) {
+            IRecordReader<T> recordReader) throws HyracksDataException {
         super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
         this.dataParser = dataParser;
     }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java
index 2c2dd98..6eee892 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java
@@ -31,6 +31,7 @@
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class IndexingStreamRecordReader implements IRecordReader<char[]>, IIndexingDatasource {
 
@@ -73,7 +74,7 @@
     }
 
     @Override
-    public void setFeedLogManager(FeedLogManager feedLogManager) {
+    public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException {
         reader.setFeedLogManager(feedLogManager);
     }
 
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
index 8572fc7..59b72e4 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
@@ -26,87 +26,96 @@
 
 public class LineRecordReader extends StreamRecordReader {
 
+    private final boolean hasHeader;
     protected boolean prevCharCR;
     protected int newlineLength;
     protected int recordNumber = 0;
+    protected boolean nextIsHeader = false;
 
     public LineRecordReader(final boolean hasHeader, final AsterixInputStream stream) throws HyracksDataException {
         super(stream);
-        try {
-            if (hasHeader) {
-                if (hasNext()) {
-                    next();
-                }
-            }
-        } catch (final IOException e) {
-            throw new HyracksDataException(e);
+        this.hasHeader = hasHeader;
+        if (hasHeader) {
+            stream.setNotificationHandler(this);
         }
+    }
 
+    @Override
+    public void notifyNewSource() {
+        if (hasHeader) {
+            nextIsHeader = true;
+        }
     }
 
     @Override
     public boolean hasNext() throws IOException {
-        if (done) {
-            return false;
-        }
-        /*
-         * We're reading data from in, but the head of the stream may be
-         * already buffered in buffer, so we have several cases:
-         * 1. No newline characters are in the buffer, so we need to copy
-         *   everything and read another buffer from the stream.
-         * 2. An unambiguously terminated line is in buffer, so we just
-         *    copy to record.
-         * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
-         *    in CR. In this case we copy everything up to CR to record, but
-         * we also need to see what follows CR: if it's LF, then we
-         * need consume LF as well, so next call to readLine will read
-         * from after that.
-         * We use a flag prevCharCR to signal if previous character was CR
-         * and, if it happens to be at the end of the buffer, delay
-         * consuming it until we have a chance to look at the char that
-         * follows.
-         */
-        newlineLength = 0; //length of terminating newline
-        prevCharCR = false; //true of prev char was CR
-        record.reset();
-        int readLength = 0;
-        do {
-            int startPosn = bufferPosn; //starting from where we left off the last time
-            if (bufferPosn >= bufferLength) {
-                startPosn = bufferPosn = 0;
-                bufferLength = reader.read(inputBuffer);
-                if (bufferLength <= 0) {
-                    if (readLength > 0) {
-                        record.endRecord();
-                        recordNumber++;
-                        return true;
+        while (true) {
+            if (done) {
+                return false;
+            }
+            /*
+             * We're reading data from in, but the head of the stream may be
+             * already buffered in buffer, so we have several cases:
+             * 1. No newline characters are in the buffer, so we need to copy
+             *   everything and read another buffer from the stream.
+             * 2. An unambiguously terminated line is in buffer, so we just
+             *    copy to record.
+             * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+             *    in CR. In this case we copy everything up to CR to record, but
+             * we also need to see what follows CR: if it's LF, then we
+             * need consume LF as well, so next call to readLine will read
+             * from after that.
+             * We use a flag prevCharCR to signal if previous character was CR
+             * and, if it happens to be at the end of the buffer, delay
+             * consuming it until we have a chance to look at the char that
+             * follows.
+             */
+            newlineLength = 0; //length of terminating newline
+            prevCharCR = false; //true of prev char was CR
+            record.reset();
+            int readLength = 0;
+            do {
+                int startPosn = bufferPosn; //starting from where we left off the last time
+                if (bufferPosn >= bufferLength) {
+                    startPosn = bufferPosn = 0;
+                    bufferLength = reader.read(inputBuffer);
+                    if (bufferLength <= 0) {
+                        if (readLength > 0) {
+                            record.endRecord();
+                            recordNumber++;
+                            return true;
+                        }
+                        close();
+                        return false; //EOF
                     }
-                    close();
-                    return false; //EOF
                 }
-            }
-            for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
-                if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
-                    newlineLength = (prevCharCR) ? 2 : 1;
-                    ++bufferPosn; // at next invocation proceed from following byte
-                    break;
+                for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+                    if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
+                        newlineLength = (prevCharCR) ? 2 : 1;
+                        ++bufferPosn; // at next invocation proceed from following byte
+                        break;
+                    }
+                    if (prevCharCR) { //CR + notLF, we are at notLF
+                        newlineLength = 1;
+                        break;
+                    }
+                    prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
                 }
-                if (prevCharCR) { //CR + notLF, we are at notLF
-                    newlineLength = 1;
-                    break;
+                readLength = bufferPosn - startPosn;
+                if (prevCharCR && newlineLength == 0) {
+                    --readLength; //CR at the end of the buffer
+                    prevCharCR = false;
                 }
-                prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
+                if (readLength > 0) {
+                    record.append(inputBuffer, startPosn, readLength);
+                }
+            } while (newlineLength == 0);
+            if (nextIsHeader) {
+                nextIsHeader = false;
+                continue;
             }
-            readLength = bufferPosn - startPosn;
-            if (prevCharCR && newlineLength == 0) {
-                --readLength; //CR at the end of the buffer
-                prevCharCR = false;
-            }
-            if (readLength > 0) {
-                record.append(inputBuffer, startPosn, readLength);
-            }
-        } while (newlineLength == 0);
-        recordNumber++;
-        return true;
+            recordNumber++;
+            return true;
+        }
     }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
index 515e0e5..88964a1 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
@@ -44,76 +44,82 @@
 
     @Override
     public boolean hasNext() throws IOException {
-        if (done) {
-            return false;
-        }
-        newlineLength = 0;
-        prevCharCR = false;
-        prevCharEscape = false;
-        record.reset();
-        int readLength = 0;
-        inQuote = false;
-        do {
-            int startPosn = bufferPosn;
-            if (bufferPosn >= bufferLength) {
-                startPosn = bufferPosn = 0;
-                bufferLength = reader.read(inputBuffer);
-                if (bufferLength <= 0) {
-                    {
-                        if (readLength > 0) {
-                            if (inQuote) {
-                                throw new IOException("malformed input record ended inside quote");
+        while (true) {
+            if (done) {
+                return false;
+            }
+            newlineLength = 0;
+            prevCharCR = false;
+            prevCharEscape = false;
+            record.reset();
+            int readLength = 0;
+            inQuote = false;
+            do {
+                int startPosn = bufferPosn;
+                if (bufferPosn >= bufferLength) {
+                    startPosn = bufferPosn = 0;
+                    bufferLength = reader.read(inputBuffer);
+                    if (bufferLength <= 0) {
+                        {
+                            if (readLength > 0) {
+                                if (inQuote) {
+                                    throw new IOException("malformed input record ended inside quote");
+                                }
+                                record.endRecord();
+                                recordNumber++;
+                                return true;
                             }
-                            record.endRecord();
-                            recordNumber++;
-                            return true;
+                            close();
+                            return false;
                         }
-                        close();
-                        return false;
                     }
                 }
-            }
-            for (; bufferPosn < bufferLength; ++bufferPosn) {
-                if (!inQuote) {
-                    if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
-                        newlineLength = (prevCharCR) ? 2 : 1;
-                        ++bufferPosn;
-                        break;
-                    }
-                    if (prevCharCR) {
-                        newlineLength = 1;
-                        break;
-                    }
-                    prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
-                    if (inputBuffer[bufferPosn] == quote) {
-                        if (!prevCharEscape) {
-                            inQuote = true;
+                for (; bufferPosn < bufferLength; ++bufferPosn) {
+                    if (!inQuote) {
+                        if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
+                            newlineLength = (prevCharCR) ? 2 : 1;
+                            ++bufferPosn;
+                            break;
                         }
-                    }
-                    if (prevCharEscape) {
-                        prevCharEscape = false;
+                        if (prevCharCR) {
+                            newlineLength = 1;
+                            break;
+                        }
+                        prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
+                        if (inputBuffer[bufferPosn] == quote) {
+                            if (!prevCharEscape) {
+                                inQuote = true;
+                            }
+                        }
+                        if (prevCharEscape) {
+                            prevCharEscape = false;
+                        } else {
+                            prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
+                        }
                     } else {
+                        // only look for next quote
+                        if (inputBuffer[bufferPosn] == quote) {
+                            if (!prevCharEscape) {
+                                inQuote = false;
+                            }
+                        }
                         prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
                     }
-                } else {
-                    // only look for next quote
-                    if (inputBuffer[bufferPosn] == quote) {
-                        if (!prevCharEscape) {
-                            inQuote = false;
-                        }
-                    }
-                    prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
                 }
+                readLength = bufferPosn - startPosn;
+                if (prevCharCR && newlineLength == 0) {
+                    --readLength;
+                }
+                if (readLength > 0) {
+                    record.append(inputBuffer, startPosn, readLength);
+                }
+            } while (newlineLength == 0);
+            if (nextIsHeader) {
+                nextIsHeader = false;
+                continue;
             }
-            readLength = bufferPosn - startPosn;
-            if (prevCharCR && newlineLength == 0) {
-                --readLength;
-            }
-            if (readLength > 0) {
-                record.append(inputBuffer, startPosn, readLength);
-            }
-        } while (newlineLength == 0);
-        recordNumber++;
-        return true;
+            recordNumber++;
+            return true;
+        }
     }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
index 57ef3ae..7dc5bce 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
@@ -23,13 +23,16 @@
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IStreamNotificationHandler;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.input.record.CharArrayRecord;
 import org.apache.asterix.external.input.stream.AsterixInputStreamReader;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public abstract class StreamRecordReader implements IRecordReader<char[]> {
+public abstract class StreamRecordReader implements IRecordReader<char[]>, IStreamNotificationHandler {
     protected final AsterixInputStreamReader reader;
     protected CharArrayRecord record;
     protected char[] inputBuffer;
@@ -37,6 +40,7 @@
     protected int bufferPosn = 0;
     protected boolean done = false;
     protected FeedLogManager feedLogManager;
+    protected MutableBoolean newFile = new MutableBoolean(false);
 
     public StreamRecordReader(AsterixInputStream inputStream) {
         this.reader = new AsterixInputStreamReader(inputStream);
@@ -72,7 +76,8 @@
     public abstract boolean hasNext() throws IOException;
 
     @Override
-    public void setFeedLogManager(FeedLogManager feedLogManager) {
+    public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException {
+        this.feedLogManager = feedLogManager;
         reader.setFeedLogManager(feedLogManager);
     }
 
@@ -85,4 +90,9 @@
     public boolean handleException(Throwable th) {
         return reader.handleException(th);
     }
+
+    @Override
+    public void notifyNewSource() {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
index 7e280a5..94333d1 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
@@ -29,6 +29,7 @@
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class AsterixInputStreamReader extends Reader {
     private AsterixInputStream in;
@@ -56,7 +57,7 @@
         in.setController(controller);
     }
 
-    public void setFeedLogManager(FeedLogManager feedLogManager) {
+    public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException {
         in.setFeedLogManager(feedLogManager);
     }
 
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
index 00c1eb7..2519177 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
@@ -21,49 +21,39 @@
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.nio.file.Path;
-import java.util.Map;
 
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.asterix.external.util.FileSystemWatcher;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
 import org.apache.log4j.Logger;
 
 public class LocalFSInputStream extends AsterixInputStream {
 
     private static final Logger LOGGER = Logger.getLogger(LocalFSInputStream.class.getName());
-    private final Path path;
     private final FileSystemWatcher watcher;
     private FileInputStream in;
     private byte lastByte;
     private File currentFile;
 
-    public LocalFSInputStream(final FileSplit[] fileSplits, final IHyracksTaskContext ctx,
-            final Map<String, String> configuration, final int partition, final String expression, final boolean isFeed)
-            throws IOException {
-        this.path = fileSplits[partition].getLocalFile().getFile().toPath();
-        this.watcher = new FileSystemWatcher(path, expression, isFeed);
-        this.watcher.init();
-    }
-
-    @Override
-    public void setFeedLogManager(FeedLogManager logManager) {
-        super.setFeedLogManager(logManager);
-        watcher.setFeedLogManager(logManager);
+    public LocalFSInputStream(FileSystemWatcher watcher) throws IOException {
+        this.watcher = watcher;
     }
 
     @Override
     public void setController(AbstractFeedDataFlowController controller) {
         super.setController(controller);
-        watcher.setController(controller);
     }
 
     @Override
+    public void setFeedLogManager(FeedLogManager logManager) throws HyracksDataException {
+        super.setFeedLogManager(logManager);
+        watcher.setFeedLogManager(logManager);
+    };
+
+    @Override
     public void close() throws IOException {
         IOException ioe = null;
         if (in != null) {
@@ -86,6 +76,9 @@
 
     private void closeFile() throws IOException {
         if (in != null) {
+            if (logManager != null) {
+                logManager.endPartition(currentFile.getAbsolutePath());
+            }
             try {
                 in.close();
             } finally {
@@ -100,9 +93,18 @@
      */
     private boolean advance() throws IOException {
         closeFile();
-        if (watcher.hasNext()) {
-            currentFile = watcher.next();
+        currentFile = watcher.poll();
+        if (currentFile == null) {
+            if (controller != null) {
+                controller.flush();
+            }
+            currentFile = watcher.take();
+        }
+        if (currentFile != null) {
             in = new FileInputStream(currentFile);
+            if (notificationHandler != null) {
+                notificationHandler.notifyNewSource();
+            }
             return true;
         }
         return false;
@@ -141,6 +143,7 @@
 
     @Override
     public boolean stop() throws Exception {
+        closeFile();
         watcher.close();
         return true;
     }
@@ -165,18 +168,11 @@
                     advance();
                     return true;
                 } catch (Exception e) {
-                    return false;
-                }
-            } else {
-                try {
-                    watcher.init();
-                } catch (IOException e) {
-                    LOGGER.warn("Failed to initialize watcher during failure recovery", e);
-                    return false;
+                    LOGGER.warn("An exception was thrown while trying to skip a file", e);
                 }
             }
-            return true;
         }
+        LOGGER.warn("Failed to recover from failure", th);
         return false;
     }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
index 85d0e41..08fce87 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
@@ -20,10 +20,14 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IInputStreamFactory;
@@ -32,8 +36,9 @@
 import org.apache.asterix.external.input.stream.LocalFSInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.external.util.FileSystemWatcher;
 import org.apache.asterix.external.util.NodeResolverFactory;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -49,16 +54,27 @@
     protected static INodeResolver nodeResolver;
     protected Map<String, String> configuration;
     protected FileSplit[] inputFileSplits;
-    protected FileSplit[] feedLogFileSplits; // paths where instances of this feed can use as log storage
     protected boolean isFeed;
     protected String expression;
     // transient fields (They don't need to be serialized and transferred)
     private transient AlgebricksAbsolutePartitionConstraint constraints;
+    private transient FileSystemWatcher watcher;
 
     @Override
-    public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
+    public synchronized AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException {
+        if (watcher == null) {
+            String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+            ArrayList<Path> inputResources = new ArrayList<>();
+            for (int i = 0; i < inputFileSplits.length; i++) {
+                if (inputFileSplits[i].getNodeName().equals(nodeName)) {
+                    inputResources.add(inputFileSplits[i].getLocalFile().getFile().toPath());
+                }
+            }
+            watcher = new FileSystemWatcher(inputResources, expression, isFeed);
+        }
         try {
-            return new LocalFSInputStream(inputFileSplits, ctx, configuration, partition, expression, isFeed);
+            return new LocalFSInputStream(watcher);
         } catch (IOException e) {
             throw new HyracksDataException(e);
         }
@@ -81,10 +97,6 @@
         configureFileSplits(splits);
         configurePartitionConstraint();
         this.isFeed = ExternalDataUtils.isFeed(configuration) && ExternalDataUtils.keepDataSourceOpen(configuration);
-        if (isFeed) {
-            feedLogFileSplits = FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration),
-                    ExternalDataUtils.getFeedName(configuration), constraints);
-        }
         this.expression = configuration.get(ExternalDataConstants.KEY_EXPRESSION);
     }
 
@@ -94,6 +106,7 @@
     }
 
     private void configureFileSplits(String[] splits) throws AsterixException {
+        INodeResolver resolver = getNodeResolver();
         if (inputFileSplits == null) {
             inputFileSplits = new FileSplit[splits.length];
             String nodeName;
@@ -106,7 +119,7 @@
                     throw new AsterixException(
                             "Invalid path: " + splitPath + "\nUsage- path=\"Host://Absolute File Path\"");
                 }
-                nodeName = trimmedValue.split(":")[0];
+                nodeName = resolver.resolveNode(trimmedValue.split(":")[0]);
                 nodeLocalPath = trimmedValue.split("://")[1];
                 FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath)));
                 inputFileSplits[count++] = fileSplit;
@@ -115,13 +128,21 @@
     }
 
     private void configurePartitionConstraint() throws AsterixException {
-        String[] locs = new String[inputFileSplits.length];
-        String location;
+        Map<String, ClusterPartition[]> partitions = AsterixAppContextInfo.getInstance().getMetadataProperties()
+                .getNodePartitions();
+        List<String> locs = new ArrayList<>();
         for (int i = 0; i < inputFileSplits.length; i++) {
-            location = getNodeResolver().resolveNode(inputFileSplits[i].getNodeName());
-            locs[i] = location;
+            String location = inputFileSplits[i].getNodeName();
+            if (!locs.contains(location)) {
+                int numOfPartitions = partitions.get(location).length;
+                int j = 0;
+                while (j < numOfPartitions) {
+                    locs.add(location);
+                    j++;
+                }
+            }
         }
-        constraints = new AlgebricksAbsolutePartitionConstraint(locs);
+        constraints = new AlgebricksAbsolutePartitionConstraint(locs.toArray(new String[locs.size()]));
     }
 
     protected INodeResolver getNodeResolver() {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index d362201..6ba27d8 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -52,7 +52,6 @@
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
 
 public class DataflowControllerProvider {
 
@@ -60,13 +59,9 @@
     @SuppressWarnings({ "rawtypes", "unchecked" })
     public static IDataFlowController getDataflowController(ARecordType recordType, IHyracksTaskContext ctx,
             int partition, IExternalDataSourceFactory dataSourceFactory, IDataParserFactory dataParserFactory,
-            Map<String, String> configuration, boolean indexingOp, boolean isFeed, FileSplit[] feedLogFileSplits)
-                    throws HyracksDataException {
+            Map<String, String> configuration, boolean indexingOp, boolean isFeed, FeedLogManager feedLogManager)
+            throws HyracksDataException {
         try {
-            FeedLogManager feedLogManager = null;
-            if (isFeed) {
-                feedLogManager = FeedUtils.getFeedLogManager(ctx, partition, feedLogFileSplits);
-            }
             switch (dataSourceFactory.getDataSourceType()) {
                 case RECORDS:
                     IRecordReaderFactory<?> recordReaderFactory = (IRecordReaderFactory<?>) dataSourceFactory;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
index fc15d3c..5bb8ec3 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
@@ -38,8 +38,7 @@
         START, // partition start
         END, // partition end
         COMMIT, // a record commit within a partition
-        SNAPSHOT // an identifier that partitions with identifiers before this one should be
-                 // ignored
+        SNAPSHOT // an identifier that partitions with identifiers before this one should be ignored
     }
 
     public static final String PROGRESS_LOG_FILE_NAME = "progress.log";
@@ -55,6 +54,7 @@
     private BufferedWriter errorLogger;
     private BufferedWriter recordLogger;
     private final StringBuilder stringBuilder = new StringBuilder();
+    private int count = 0;
 
     public FeedLogManager(File file) throws HyracksDataException {
         try {
@@ -69,18 +69,22 @@
         }
     }
 
-    public void endPartition() throws IOException {
+    public synchronized void touch() {
+        count++;
+    }
+
+    public synchronized void endPartition() throws IOException {
         logProgress(END_PREFIX + currentPartition);
         completed.add(currentPartition);
     }
 
-    public void endPartition(String partition) throws IOException {
+    public synchronized void endPartition(String partition) throws IOException {
         currentPartition = partition;
         logProgress(END_PREFIX + currentPartition);
         completed.add(currentPartition);
     }
 
-    public void startPartition(String partition) throws IOException {
+    public synchronized void startPartition(String partition) throws IOException {
         currentPartition = partition;
         logProgress(START_PREFIX + currentPartition);
     }
@@ -89,7 +93,7 @@
         return Files.exists(dir);
     }
 
-    public void open() throws IOException {
+    public synchronized void open() throws IOException {
         // read content of logs.
         BufferedReader reader = Files.newBufferedReader(
                 Paths.get(dir.toAbsolutePath().toString() + File.separator + PROGRESS_LOG_FILE_NAME));
@@ -113,13 +117,17 @@
                 StandardCharsets.UTF_8, StandardOpenOption.APPEND);
     }
 
-    public void close() throws IOException {
+    public synchronized void close() throws IOException {
+        count--;
+        if (count > 0) {
+            return;
+        }
         progressLogger.close();
         errorLogger.close();
         recordLogger.close();
     }
 
-    public boolean create() throws IOException {
+    public synchronized boolean create() throws IOException {
         File f = dir.toFile();
         f.mkdirs();
         new File(f, PROGRESS_LOG_FILE_NAME).createNewFile();
@@ -128,13 +136,13 @@
         return true;
     }
 
-    public boolean destroy() throws IOException {
+    public synchronized boolean destroy() throws IOException {
         File f = dir.toFile();
         FileUtils.deleteDirectory(f);
         return true;
     }
 
-    public void logProgress(String log) throws IOException {
+    public synchronized void logProgress(String log) throws IOException {
         stringBuilder.setLength(0);
         stringBuilder.append(log);
         stringBuilder.append(ExternalDataConstants.LF);
@@ -142,7 +150,7 @@
         progressLogger.flush();
     }
 
-    public void logError(String error, Throwable th) throws IOException {
+    public synchronized void logError(String error, Throwable th) throws IOException {
         stringBuilder.setLength(0);
         stringBuilder.append(error);
         stringBuilder.append(ExternalDataConstants.LF);
@@ -152,7 +160,7 @@
         errorLogger.flush();
     }
 
-    public void logRecord(String record, String errorMessage) throws IOException {
+    public synchronized void logRecord(String record, String errorMessage) throws IOException {
         stringBuilder.setLength(0);
         stringBuilder.append(record);
         stringBuilder.append(ExternalDataConstants.LF);
@@ -166,7 +174,7 @@
         return log.substring(PREFIX_SIZE);
     }
 
-    public boolean isSplitRead(String split) {
+    public synchronized boolean isSplitRead(String split) {
         return completed.contains(split);
     }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index 5ab41af..502a432 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -45,17 +45,16 @@
         return dataverseName + File.separator + feedName;
     }
 
-    public static FileSplit splitsForAdapter(String dataverseName, String feedName, int partition,
-            ClusterPartition[] nodePartitions) {
+    public static FileSplit splitsForAdapter(String dataverseName, String feedName, String nodeName,
+            ClusterPartition partition) {
         File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName));
         String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
-        ClusterPartition nodePartition = nodePartitions[0];
         String storagePartitionPath = StoragePathUtil.prepareStoragePartitionPath(storageDirName,
-                nodePartition.getPartitionId());
-        // format: 'storage dir name'/partition_#/dataverse/feed/adapter_#
-        File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator
-                + StoragePathUtil.ADAPTER_INSTANCE_PREFIX + partition);
-        return StoragePathUtil.getFileSplitForClusterPartition(nodePartition, f);
+                partition.getPartitionId());
+        // Note: feed adapter instances in a single node share the feed logger
+        // format: 'storage dir name'/partition_#/dataverse/feed/node
+        File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator + nodeName);
+        return StoragePathUtil.getFileSplitForClusterPartition(partition, f);
     }
 
     public static FileSplit[] splitsForAdapter(String dataverseName, String feedName,
@@ -63,22 +62,11 @@
         if (partitionConstraints.getPartitionConstraintType() == PartitionConstraintType.COUNT) {
             throw new AsterixException("Can't create file splits for adapter with count partitioning constraints");
         }
-        File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName));
-        String[] locations = null;
-        locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations();
+        String[] locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations();
         List<FileSplit> splits = new ArrayList<FileSplit>();
-        String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
-        int i = 0;
         for (String nd : locations) {
-            // Always get the first partition
-            ClusterPartition nodePartition = AsterixClusterProperties.INSTANCE.getNodePartitions(nd)[0];
-            String storagePartitionPath = StoragePathUtil.prepareStoragePartitionPath(storageDirName,
-                    nodePartition.getPartitionId());
-            // format: 'storage dir name'/partition_#/dataverse/feed/adapter_#
-            File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator
-                    + StoragePathUtil.ADAPTER_INSTANCE_PREFIX + i);
-            splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartition, f));
-            i++;
+            splits.add(splitsForAdapter(dataverseName, feedName, nd,
+                    AsterixClusterProperties.INSTANCE.getNodePartitions(nd)[0]));
         }
         return splits.toArray(new FileSplit[] {});
     }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
index 4eec348..b15d097 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
@@ -33,8 +33,9 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -48,44 +49,51 @@
     private Iterator<File> it;
     private final String expression;
     private FeedLogManager logManager;
-    private final Path path;
+    private final List<Path> paths;
     private final boolean isFeed;
     private boolean done;
-    private File current;
-    private AbstractFeedDataFlowController controller;
     private final LinkedList<Path> dirs;
+    private final ReentrantLock lock = new ReentrantLock();
 
-    public FileSystemWatcher(Path inputResource, String expression, boolean isFeed) {
+    public FileSystemWatcher(List<Path> inputResources, String expression, boolean isFeed) throws HyracksDataException {
+        this.isFeed = isFeed;
         this.keys = isFeed ? new HashMap<WatchKey, Path>() : null;
         this.expression = expression;
-        this.path = inputResource;
-        this.isFeed = isFeed;
+        this.paths = inputResources;
         this.dirs = new LinkedList<Path>();
+        if (!isFeed) {
+            init();
+        }
     }
 
-    public void setFeedLogManager(FeedLogManager feedLogManager) {
-        this.logManager = feedLogManager;
+    public synchronized void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException {
+        if (logManager == null) {
+            this.logManager = feedLogManager;
+            init();
+        }
     }
 
-    public void init() throws HyracksDataException {
+    public synchronized void init() throws HyracksDataException {
         try {
             dirs.clear();
-            LocalFileSystemUtils.traverse(files, path.toFile(), expression, dirs);
-            it = files.iterator();
-            if (isFeed) {
-                keys.clear();
-                if (watcher != null) {
-                    try {
-                        watcher.close();
-                    } catch (IOException e) {
-                        LOGGER.warn("Failed to close watcher service", e);
+            for (Path path : paths) {
+                LocalFileSystemUtils.traverse(files, path.toFile(), expression, dirs);
+                it = files.iterator();
+                if (isFeed) {
+                    keys.clear();
+                    if (watcher != null) {
+                        try {
+                            watcher.close();
+                        } catch (IOException e) {
+                            LOGGER.warn("Failed to close watcher service", e);
+                        }
                     }
+                    watcher = FileSystems.getDefault().newWatchService();
+                    for (Path dirPath : dirs) {
+                        register(dirPath);
+                    }
+                    resume();
                 }
-                watcher = FileSystems.getDefault().newWatchService();
-                for (Path path : dirs) {
-                    register(path);
-                }
-                resume();
             }
         } catch (IOException e) {
             throw new HyracksDataException(e);
@@ -102,7 +110,7 @@
         keys.put(key, dir);
     }
 
-    private void resume() throws IOException {
+    private synchronized void resume() throws IOException {
         if (logManager == null) {
             return;
         }
@@ -142,14 +150,12 @@
         }
         for (WatchEvent<?> event : key.pollEvents()) {
             Kind<?> kind = event.kind();
-            // TODO: Do something about overflow events
             // An overflow event means that some events were dropped
             if (kind == StandardWatchEventKinds.OVERFLOW) {
                 if (LOGGER.isEnabledFor(Level.WARN)) {
                     LOGGER.warn("Overflow event. Some events might have been missed");
                 }
                 // need to read and validate all files.
-                //TODO: use btrees for all logs
                 init();
                 return;
             }
@@ -174,33 +180,90 @@
                 }
             }
         }
+        it = files.iterator();
     }
 
-    public void close() throws IOException {
+    public synchronized void close() throws IOException {
         if (!done) {
             if (watcher != null) {
                 watcher.close();
                 watcher = null;
             }
-            if (logManager != null) {
-                if (current != null) {
-                    logManager.startPartition(current.getAbsolutePath());
-                    logManager.endPartition();
-                }
-                logManager.close();
-                current = null;
-            }
             done = true;
         }
     }
 
-    public File next() throws IOException {
-        if ((current != null) && (logManager != null)) {
-            logManager.startPartition(current.getAbsolutePath());
-            logManager.endPartition();
+    // poll is not blocking
+    public synchronized File poll() throws IOException {
+        if (it.hasNext()) {
+            return it.next();
         }
-        current = it.next();
-        return current;
+        if (done || !isFeed) {
+            return null;
+        }
+        files.clear();
+        it = files.iterator();
+        if (keys.isEmpty()) {
+            close();
+            return null;
+        }
+        // Read new Events (Polling first to add all available files)
+        WatchKey key;
+        key = watcher.poll();
+        while (key != null) {
+            handleEvents(key);
+            if (endOfEvents(key)) {
+                close();
+                return null;
+            }
+            key = watcher.poll();
+        }
+        return null;
+    }
+
+    // take is blocking
+    public synchronized File take() throws IOException {
+        File next = poll();
+        if (next != null) {
+            return next;
+        }
+        if (done || !isFeed) {
+            return null;
+        }
+        // No file was found, wait for the filesystem to push events
+        WatchKey key = null;
+        lock.lock();
+        try {
+            while (!it.hasNext()) {
+                try {
+                    key = watcher.take();
+                } catch (InterruptedException x) {
+                    if (LOGGER.isEnabledFor(Level.WARN)) {
+                        LOGGER.warn("Feed Closed");
+                    }
+                    if (watcher == null) {
+                        return null;
+                    }
+                    continue;
+                } catch (ClosedWatchServiceException e) {
+                    if (LOGGER.isEnabledFor(Level.WARN)) {
+                        LOGGER.warn("The watcher has exited");
+                    }
+                    if (watcher == null) {
+                        return null;
+                    }
+                    continue;
+                }
+                handleEvents(key);
+                if (endOfEvents(key)) {
+                    return null;
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+        // files were found, re-create the iterator and move it one step
+        return it.next();
     }
 
     private boolean endOfEvents(WatchKey key) {
@@ -213,64 +276,4 @@
         }
         return false;
     }
-
-    public boolean hasNext() throws IOException {
-        if (it.hasNext()) {
-            return true;
-        }
-        if (done || !isFeed) {
-            return false;
-        }
-        files.clear();
-        if (keys.isEmpty()) {
-            return false;
-        }
-        // Read new Events (Polling first to add all available files)
-        WatchKey key;
-        key = watcher.poll();
-        while (key != null) {
-            handleEvents(key);
-            if (endOfEvents(key)) {
-                close();
-                return false;
-            }
-            key = watcher.poll();
-        }
-        // No file was found, wait for the filesystem to push events
-        if (controller != null) {
-            controller.flush();
-        }
-        while (files.isEmpty()) {
-            try {
-                key = watcher.take();
-            } catch (InterruptedException x) {
-                if (LOGGER.isEnabledFor(Level.WARN)) {
-                    LOGGER.warn("Feed Closed");
-                }
-                if (watcher == null) {
-                    return false;
-                }
-                continue;
-            } catch (ClosedWatchServiceException e) {
-                if (LOGGER.isEnabledFor(Level.WARN)) {
-                    LOGGER.warn("The watcher has exited");
-                }
-                if (watcher == null) {
-                    return false;
-                }
-                continue;
-            }
-            handleEvents(key);
-            if (endOfEvents(key)) {
-                return false;
-            }
-        }
-        // files were found, re-create the iterator and move it one step
-        it = files.iterator();
-        return it.hasNext();
-    }
-
-    public void setController(AbstractFeedDataFlowController controller) {
-        this.controller = controller;
-    }
 }
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
index 354aedb..876639d 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
@@ -18,7 +18,10 @@
  */
 package org.apache.asterix.external.classad.test;
 
+import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -32,8 +35,7 @@
 import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReader;
 import org.apache.asterix.external.input.stream.LocalFSInputStream;
 import org.apache.asterix.external.library.ClassAdParser;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.asterix.external.util.FileSystemWatcher;
 
 import junit.framework.Test;
 import junit.framework.TestCase;
@@ -69,10 +71,10 @@
             ClassAdParser parser = new ClassAdParser(objectPool);
             CharArrayLexerSource lexerSource = new CharArrayLexerSource();
             for (String path : files) {
-                LocalFSInputStream in = new LocalFSInputStream(
-                        new FileSplit[] { new FileSplit("",
-                                new FileReference(Paths.get(getClass().getResource(path).toURI()).toFile())) },
-                        null, null, 0, null, false);
+                List<Path> paths = new ArrayList<>();
+                paths.add(Paths.get(getClass().getResource(path).toURI()));
+                FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false);
+                LocalFSInputStream in = new LocalFSInputStream(watcher);
                 SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(in, "[", "]");
                 Value val = new Value(objectPool);
                 while (recordReader.hasNext()) {
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 7da6389..d1a4532 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -80,16 +80,16 @@
                 IAsterixPropertiesProvider propertiesProvider = (IAsterixPropertiesProvider) ((NodeControllerService) ctx
                         .getJobletContext().getApplicationContext().getControllerService()).getApplicationContext()
                                 .getApplicationObject();
-                ClusterPartition[] nodePartitions = propertiesProvider.getMetadataProperties().getNodePartitions()
-                        .get(nodeId);
+                ClusterPartition nodePartition = propertiesProvider.getMetadataProperties().getNodePartitions()
+                        .get(nodeId)[0];
                 try {
                     parser = new ADMDataParser(outputType, true);
                     forwarder = DataflowUtils
                             .getTupleForwarder(configuration,
                                     FeedUtils.getFeedLogManager(ctx,
                                             FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration),
-                                                    ExternalDataUtils.getFeedName(configuration), partition,
-                                                    nodePartitions)));
+                                                    ExternalDataUtils.getFeedName(configuration), nodeId,
+                                                    nodePartition)));
                     tb = new ArrayTupleBuilder(1);
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java b/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
index fc6e725..faff9df 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
@@ -21,6 +21,7 @@
 import java.io.File;
 import java.io.PrintStream;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
@@ -32,6 +33,7 @@
 import org.apache.asterix.external.parser.ADMDataParser;
 import org.apache.asterix.external.parser.RecordWithMetadataParser;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.FileSystemWatcher;
 import org.apache.asterix.formats.nontagged.AqlADMPrinterFactoryProvider;
 import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import org.apache.asterix.om.types.ARecordType;
@@ -42,9 +44,7 @@
 import org.apache.hyracks.algebricks.data.IPrinterFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -79,12 +79,11 @@
             int[] pkIndexes = { 0 };
             int[] pkIndicators = { 1 };
 
+            List<Path> paths = new ArrayList<>();
+            paths.add(Paths.get(getClass().getResource("/beer.csv").toURI()));
+            FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false);
             // create input stream
-            LocalFSInputStream inputStream = new LocalFSInputStream(
-                    new FileSplit[] { new FileSplit(null,
-                            new FileReference(Paths.get(getClass().getResource("/beer.csv").toURI()).toFile())) },
-                    null, null, 0, null, false);
-
+            LocalFSInputStream inputStream = new LocalFSInputStream(watcher);
             // create reader record reader
             QuotedLineRecordReader lineReader = new QuotedLineRecordReader(true, inputStream,
                     ExternalDataConstants.DEFAULT_QUOTE);
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java
index 51c3802..0e9aa0c 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java
@@ -26,6 +26,8 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hyracks.control.cc.ClusterControllerService;
+
 /**
  * Utility class for obtaining information on the set of Hyracks NodeController
  * processes that are running on a given host.
@@ -54,6 +56,8 @@
     }
 
     public static void getNodeControllerMap(Map<InetAddress, Set<String>> map) throws Exception {
-        AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext().getIPAddressNodeMap(map);
+        ClusterControllerService ccs = (ClusterControllerService) AsterixAppContextInfo.getInstance()
+                .getCCApplicationContext().getControllerService();
+        map.putAll(ccs.getIpAddressNodeNameMap());
     }
 }