diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
index 92cd61a..ce80291 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
@@ -139,4 +139,8 @@
     public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
         this.frameTupleAccessor = new FrameTupleAccessor(recordDescriptor);
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+    }
 }
\ No newline at end of file
diff --git a/asterix-app/src/main/java/org/apache/asterix/feed/FeedMessageReceiver.java b/asterix-app/src/main/java/org/apache/asterix/feed/FeedMessageReceiver.java
index 66eca0c..4ae2e59 100644
--- a/asterix-app/src/main/java/org/apache/asterix/feed/FeedMessageReceiver.java
+++ b/asterix-app/src/main/java/org/apache/asterix/feed/FeedMessageReceiver.java
@@ -20,7 +20,6 @@
 
 import java.util.logging.Level;
 
-import org.json.JSONObject;
 import org.apache.asterix.external.feed.api.IFeedLoadManager;
 import org.apache.asterix.external.feed.api.IFeedTrackingManager;
 import org.apache.asterix.external.feed.api.IFeedMessage.MessageType;
@@ -35,6 +34,8 @@
 import org.apache.asterix.external.util.FeedConstants;
 import org.apache.asterix.feed.CentralFeedManager.AQLExecutor;
 import org.apache.asterix.hyracks.bootstrap.FeedBootstrap;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.json.JSONObject;
 
 public class FeedMessageReceiver extends MessageReceiver<String> {
 
@@ -88,4 +89,8 @@
         }
 
     }
+
+    @Override
+    public void emptyInbox() throws HyracksDataException {
+    }
 }
diff --git a/asterix-app/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java b/asterix-app/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
index 48bb6a9..0f705a9 100644
--- a/asterix-app/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
+++ b/asterix-app/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
@@ -49,7 +49,7 @@
         }
         try {
             writer.open();
-            appender.flush(writer, true);
+            appender.write(writer, true);
         } catch (Throwable th) {
             writer.fail();
             throw new HyracksDataException(th);
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 867c96b..7801fd7 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -277,5 +277,15 @@
             <version>1.2.2</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.couchbase.client</groupId>
+            <artifactId>core-io</artifactId>
+            <version>1.2.3</version>
+        </dependency>
+        <dependency>
+            <groupId>io.reactivex</groupId>
+            <artifactId>rxjava</artifactId>
+            <version>1.0.15</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
index 866910b..76c8b85 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
@@ -29,7 +29,7 @@
 import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
 import org.apache.asterix.external.indexing.RecordIdReader;
 import org.apache.asterix.external.indexing.RecordIdReaderFactory;
-import org.apache.asterix.external.input.record.reader.LookupReaderFactoryProvider;
+import org.apache.asterix.external.provider.LookupReaderFactoryProvider;
 import org.apache.asterix.external.provider.ParserFactoryProvider;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.api.comm.IFrameWriter;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
index a4a5a43..4ad4c4f 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
@@ -37,6 +37,7 @@
 import org.apache.asterix.om.types.AOrderedListType;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.AUnorderedListType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 
@@ -124,4 +125,11 @@
                 break;
         }
     }
+
+    public static <T> void toBytes(T serializable, ArrayBackedValueStorage buffer, ISerializerDeserializer<T> serde)
+            throws HyracksDataException {
+        buffer.reset();
+        DataOutput out = buffer.getDataOutput();
+        serde.serialize(serializable, out);
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index 580ac99..370ea93 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -59,6 +59,8 @@
      * Specify whether the external data source can be indexed
      * @return
      */
-    public boolean isIndexible();
+    public default boolean isIndexible() {
+        return false;
+    }
 
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRawRecord.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRawRecord.java
index 92b500d..fe15244 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRawRecord.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRawRecord.java
@@ -35,11 +35,6 @@
     public T get();
 
     /**
-     * @return The class of the record objects.
-     */
-    public Class<?> getRecordClass();
-
-    /**
      * Resets the object to prepare it for another write operation.
      */
     public void reset();
@@ -48,4 +43,10 @@
      * @return The size of the valid bytes of the object. If the object can't be serialized, this method returns -1
      */
     int size();
+
+    /**
+     * Sets the new value of the record
+     * @param t
+     */
+    public void set(T t);
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
index 019fe8f..3cf467e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
@@ -52,15 +52,16 @@
     public IRawRecord<T> next() throws IOException, InterruptedException;
 
     /**
-     * @return the class of the java objects representing the records. used to check compatibility between readers and
-     *         parsers.
-     * @throws IOException
-     */
-    public Class<? extends T> getRecordClass() throws IOException;
-
-    /**
      * used to stop reader from producing more records.
      * @return true if the connection to the external source has been suspended, false otherwise.
      */
     public boolean stop();
+
+    /**
+     * set a pointer to the controller of the feed. the controller can be used to flush()
+     * parsed records when waiting for more records to be pushed
+     */
+    public default void setController(IDataFlowController controller) throws UnsupportedOperationException {
+        throw new UnsupportedOperationException();
+    };
 }
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index aab4bf6..461eaf9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -65,4 +65,8 @@
         tupleForwarder.resume();
         return true;
     }
+
+    public void flush() throws HyracksDataException {
+        tupleForwarder.flush();
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index fe4557d..2a4eaf9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -43,6 +43,7 @@
             while (recordReader.hasNext()) {
                 IRawRecord<? extends T> record = recordReader.next();
                 if (record == null) {
+                    flush();
                     Thread.sleep(interval);
                     continue;
                 }
@@ -110,5 +111,6 @@
     @Override
     public void setRecordReader(IRecordReader<T> recordReader) {
         this.recordReader = recordReader;
+        recordReader.setController(this);
     }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index d1707665..b46a338 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -80,4 +80,8 @@
             FrameUtils.flushFrame(frame.getBuffer(), writer);
         }
     }
+
+    public void flush() throws HyracksDataException {
+        appender.flush(writer);
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
index ba6f83c..a97182c 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
@@ -154,9 +154,14 @@
     @Override
     public void close() throws HyracksDataException {
         try {
-            appender.flush(writer, true);
+            appender.write(writer, true);
         } finally {
             writer.close();
         }
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        appender.flush(writer);
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java
index 18b6ec0..3fc7ac8 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java
@@ -63,6 +63,7 @@
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        // always project the first field only. why?
         inputFrameTupleAccessor.reset(buffer);
         int nTuple = inputFrameTupleAccessor.getTupleCount();
         for (int t = 0; t < nTuple; t++) {
@@ -116,4 +117,9 @@
         this.downstreamWriter = writer;
     }
 
+    @Override
+    public void flush() throws HyracksDataException {
+        tupleAppender.flush(downstreamWriter);
+    }
+
 }
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
index 7367d5a..d314f74 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
@@ -156,4 +156,9 @@
     public FrameDistributor.DistributionMode getDistributionMode() {
         return frameDistributor.getDistributionMode();
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        frameDistributor.flush();
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java
index 0a595b7..159bc43 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java
@@ -169,4 +169,9 @@
             frameWriter.nextFrame(frame);
         }
     }
+
+    @Override
+    public void emptyInbox() throws HyracksDataException {
+        frameWriter.flush();
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java
index 0d53524..ef4b87d 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java
@@ -157,4 +157,13 @@
         return connectionId.toString().hashCode();
     }
 
+    @Override
+    public void emptyInbox() throws HyracksDataException {
+        flush();
+    }
+
+    public synchronized void flush() throws HyracksDataException {
+        frameWriter.flush();
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java
index 6ad00f1..3c45a20 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java
@@ -272,6 +272,10 @@
                 };
             }
 
+            @Override
+            public void emptyInbox() throws HyracksDataException {
+            }
+
         }
 
         @Override
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index 3a46b1a..a00e732 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -112,7 +112,6 @@
         this.mBuffer = MonitoredBuffer.getMonitoredBuffer(ctx, this, coreOperator, fta, recordDesc,
                 feedManager.getFeedMetricCollector(), connectionId, runtimeId, exceptionHandler, frameEventCallback,
                 nPartitions, fpa);
-        this.mBuffer.start();
         this.throttlingEnabled = false;
     }
 
@@ -414,6 +413,7 @@
     @Override
     public void open() throws HyracksDataException {
         coreOperator.open();
+        mBuffer.start();
     }
 
     @Override
@@ -465,4 +465,12 @@
     public void setBufferingEnabled(boolean bufferingEnabled) {
         this.bufferingEnabled = bufferingEnabled;
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        // Only flush when in process mode.
+        if (mode == Mode.PROCESS) {
+            coreOperator.flush();
+        }
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
index 543efb2..85308df 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
@@ -25,8 +25,8 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.external.feed.api.IFeedMemoryManager;
 import org.apache.asterix.external.feed.api.IFeedMemoryComponent.Type;
+import org.apache.asterix.external.feed.api.IFeedMemoryManager;
 import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -358,4 +358,14 @@
         return fta;
     }
 
+    public void flush() throws HyracksDataException {
+        switch (distributionMode) {
+            case SINGLE:
+                FeedFrameCollector collector = registeredCollectors.values().iterator().next();
+                collector.flush();
+            default:
+                break;
+        }
+    }
+
 }
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageReceiver.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageReceiver.java
index abeb994..f69c552 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageReceiver.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageReceiver.java
@@ -25,6 +25,7 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.external.feed.api.IMessageReceiver;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public abstract class MessageReceiver<T> implements IMessageReceiver<T> {
 
@@ -74,12 +75,17 @@
             this.inbox = messageReceiver.inbox;
             this.messageReceiver = messageReceiver;
         }
+        // TODO: this should handle exceptions better
 
         @Override
         public void run() {
             while (true) {
                 try {
-                    T message = inbox.take();
+                    T message = inbox.poll();
+                    if (message == null) {
+                        messageReceiver.emptyInbox();
+                        message = inbox.take();
+                    }
                     messageReceiver.processMessage(message);
                 } catch (InterruptedException e) {
                     e.printStackTrace();
@@ -108,4 +114,6 @@
         }
     }
 
+    public abstract void emptyInbox() throws HyracksDataException;
+
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java
index db38edf..b93410a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java
@@ -393,4 +393,9 @@
         return storageTimeTrackingRateTask;
     }
 
+    @Override
+    public void emptyInbox() throws HyracksDataException {
+        inputHandler.flush();
+    }
+
 }
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java
index 1f9551d..9db930e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java
@@ -38,8 +38,7 @@
 
 public class StorageSideMonitoredBuffer extends MonitoredBuffer {
 
-    private static final long STORAGE_TIME_TRACKING_FREQUENCY = 5000; // 10
-                                                                      // seconds
+    private static final long STORAGE_TIME_TRACKING_FREQUENCY = 5000;
 
     private boolean ackingEnabled;
     private final boolean timeTrackingEnabled;
@@ -207,5 +206,4 @@
     protected boolean reportInflowRate() {
         return false;
     }
-
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java
index 932aece..0fbbd2e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java
@@ -22,7 +22,7 @@
 
 import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.reader.HDFSRecordReader;
+import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
 import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import org.apache.asterix.om.base.AMutableInt32;
 import org.apache.asterix.om.base.AMutableInt64;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java
index 14235c00..9fa26f0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java
@@ -22,7 +22,7 @@
 
 import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.reader.HDFSRecordReader;
+import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
 import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import org.apache.asterix.om.base.AMutableInt32;
 import org.apache.asterix.om.base.AMutableInt64;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index 7e9fdcb..aa35383 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -29,8 +29,8 @@
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.IndexingScheduler;
-import org.apache.asterix.external.input.record.reader.HDFSRecordReader;
-import org.apache.asterix.external.input.stream.HDFSInputStreamProvider;
+import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
+import org.apache.asterix.external.input.stream.provider.HDFSInputStreamProvider;
 import org.apache.asterix.external.provider.ExternalIndexerProvider;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.HDFSUtils;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
index fd5c397..365aeb0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.external.input.record;
 
+import java.nio.CharBuffer;
 import java.util.Arrays;
 
 import org.apache.asterix.external.api.IRawRecord;
@@ -99,8 +100,21 @@
         size++;
     }
 
+    public void append(char[] recordBuffer) {
+        ensureCapacity(size + recordBuffer.length);
+        System.arraycopy(recordBuffer, 0, value, size, recordBuffer.length);
+        size += recordBuffer.length;
+    }
+
+    public void append(CharBuffer chars) {
+        ensureCapacity(size + chars.limit());
+        chars.get(value, size, chars.limit());
+        size += chars.limit();
+    }
+
     @Override
-    public Class<char[]> getRecordClass() {
-        return char[].class;
+    public void set(char[] value) {
+        this.value = value;
+        this.size = value.length;
     }
 }
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/GenericRecord.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/GenericRecord.java
index 365bc22..f405b82 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/GenericRecord.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/GenericRecord.java
@@ -47,10 +47,6 @@
     }
 
     @Override
-    public Class<?> getRecordClass() {
-        return record.getClass();
-    }
-
     public void set(T record) {
         this.record = record;
     }
@@ -58,5 +54,4 @@
     @Override
     public void reset() {
     }
-
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadata.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadata.java
new file mode 100644
index 0000000..d5640a6
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadata.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IDataParser;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.om.base.ABoolean;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AMutableDouble;
+import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.asterix.om.base.AMutableInt64;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+
+public class RecordWithMetadata<T> {
+
+    private ArrayBackedValueStorage[] fieldValueBuffers;
+    private DataOutput[] fieldValueBufferOutputs;
+    private IValueParserFactory[] valueParserFactories;
+    private byte[] fieldTypeTags;
+    private IRawRecord<T> record;
+
+    // Serializers
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ADOUBLE);
+    private AMutableDouble mutableDouble = new AMutableDouble(0);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ASTRING);
+    private AMutableString mutableString = new AMutableString(null);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<AInt32> int32Serde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT32);
+    private AMutableInt32 mutableInt = new AMutableInt32(0);
+    @SuppressWarnings("unchecked")
+    protected ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT64);
+    private AMutableInt64 mutableLong = new AMutableInt64(0);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ABoolean> booleanSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ABOOLEAN);
+
+    public RecordWithMetadata(Class<? extends T> recordClass) {
+    }
+
+    public RecordWithMetadata(IAType[] metaTypes, Class<? extends T> recordClass) {
+        int n = metaTypes.length;
+        this.fieldValueBuffers = new ArrayBackedValueStorage[n];
+        this.fieldValueBufferOutputs = new DataOutput[n];
+        this.valueParserFactories = new IValueParserFactory[n];
+        this.fieldTypeTags = new byte[n];
+        for (int i = 0; i < n; i++) {
+            ATypeTag tag = metaTypes[i].getTypeTag();
+            fieldTypeTags[i] = tag.serialize();
+            fieldValueBuffers[i] = new ArrayBackedValueStorage();
+            fieldValueBufferOutputs[i] = fieldValueBuffers[i].getDataOutput();
+            valueParserFactories[i] = ExternalDataUtils.getParserFactory(tag);
+        }
+    }
+
+    public IRawRecord<T> getRecord() {
+        return record;
+    }
+
+    public ArrayBackedValueStorage getMetadata(int index) {
+        return fieldValueBuffers[index];
+    }
+
+    public void setRecord(IRawRecord<T> record) {
+        this.record = record;
+    }
+
+    public void reset() {
+        record.reset();
+        for (ArrayBackedValueStorage fieldBuffer : fieldValueBuffers) {
+            fieldBuffer.reset();
+        }
+    }
+
+    public void setMetadata(int index, int value) throws IOException {
+        fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
+        mutableInt.setValue(value);
+        IDataParser.toBytes(mutableInt, fieldValueBuffers[index], int32Serde);
+    }
+
+    public void setMetadata(int index, long value) throws IOException {
+        fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
+        mutableLong.setValue(value);
+        IDataParser.toBytes(mutableLong, fieldValueBuffers[index], int64Serde);
+    }
+
+    public void setMetadata(int index, String value) throws IOException {
+        fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
+        mutableString.setValue(value);
+        IDataParser.toBytes(mutableString, fieldValueBuffers[index], stringSerde);
+    }
+
+    public void setMeta(int index, boolean value) throws IOException {
+        fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
+        IDataParser.toBytes(value ? ABoolean.TRUE : ABoolean.FALSE, fieldValueBuffers[index], booleanSerde);
+    }
+
+    public void setMeta(int index, double value) throws IOException {
+        fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
+        mutableDouble.setValue(value);
+        IDataParser.toBytes(mutableDouble, fieldValueBuffers[index], doubleSerde);
+    }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java
new file mode 100644
index 0000000..895af1b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.couchbase;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.asterix.external.api.IDataFlowController;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.input.record.RecordWithMetadata;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.log4j.Logger;
+
+import com.couchbase.client.core.CouchbaseCore;
+import com.couchbase.client.core.dcp.BucketStreamAggregator;
+import com.couchbase.client.core.dcp.BucketStreamAggregatorState;
+import com.couchbase.client.core.dcp.BucketStreamState;
+import com.couchbase.client.core.dcp.BucketStreamStateUpdatedEvent;
+import com.couchbase.client.core.env.DefaultCoreEnvironment;
+import com.couchbase.client.core.env.DefaultCoreEnvironment.Builder;
+import com.couchbase.client.core.message.cluster.CloseBucketRequest;
+import com.couchbase.client.core.message.cluster.OpenBucketRequest;
+import com.couchbase.client.core.message.cluster.SeedNodesRequest;
+import com.couchbase.client.core.message.dcp.DCPRequest;
+import com.couchbase.client.core.message.dcp.MutationMessage;
+import com.couchbase.client.core.message.dcp.RemoveMessage;
+import com.couchbase.client.core.message.dcp.SnapshotMarkerMessage;
+import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
+
+import rx.functions.Action1;
+
+public class CouchbaseReader implements IRecordReader<RecordWithMetadata<char[]>> {
+
+    private static final MutationMessage POISON_PILL = new MutationMessage((short) 0, null, null, 0, 0L, 0L, 0, 0, 0L,
+            null);
+    private final String feedName;
+    private final short[] vbuckets;
+    private final String bucket;
+    private final String password;
+    private final String[] couchbaseNodes;
+    private AbstractFeedDataFlowController controller;
+    private Builder builder;
+    private BucketStreamAggregator bucketStreamAggregator;
+    private CouchbaseCore core;
+    private DefaultCoreEnvironment env;
+    private Thread pushThread;
+    private ArrayBlockingQueue<MutationMessage> messages;
+    private GenericRecord<RecordWithMetadata<char[]>> record;
+    private RecordWithMetadata<char[]> recordWithMetadata;
+    private boolean done = false;
+    private CharArrayRecord value;
+    private CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
+    private ByteBuffer bytes = ByteBuffer.allocateDirect(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+    private CharBuffer chars = CharBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+    // metaTypes = {key(string), bucket(string), vbucket(int32), seq(long), cas(long), creationTime(long),expiration(int32),flags(int32),revSeqNumber(long),lockTime(int32)}
+    private static final IAType[] metaTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING,
+            BuiltinType.AINT32, BuiltinType.AINT64, BuiltinType.AINT64, BuiltinType.AINT64, BuiltinType.AINT32,
+            BuiltinType.AINT32, BuiltinType.AINT64, BuiltinType.AINT32 };
+    private static final Logger LOGGER = Logger.getLogger(CouchbaseReader.class);
+
+    public CouchbaseReader(String feedName, String bucket, String password, String[] couchbaseNodes, short[] vbuckets,
+            int queueSize) throws HyracksDataException {
+        this.feedName = feedName;
+        this.bucket = bucket;
+        this.password = password;
+        this.couchbaseNodes = couchbaseNodes;
+        this.vbuckets = vbuckets;
+        this.recordWithMetadata = new RecordWithMetadata<char[]>(metaTypes, char[].class);
+        this.messages = new ArrayBlockingQueue<MutationMessage>(queueSize);
+        this.value = new CharArrayRecord();
+        recordWithMetadata.setRecord(value);
+        this.record = new GenericRecord<RecordWithMetadata<char[]>>(recordWithMetadata);
+    }
+
+    @Override
+    public void close() {
+        if (!done) {
+            done = true;
+        }
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        this.builder = DefaultCoreEnvironment.builder().dcpEnabled(CouchbaseReaderFactory.DCP_ENABLED)
+                .autoreleaseAfter(CouchbaseReaderFactory.AUTO_RELEASE_AFTER_MILLISECONDS);
+        this.env = builder.build();
+        this.core = new CouchbaseCore(env);
+        this.bucketStreamAggregator = new BucketStreamAggregator(feedName, core, bucket);
+        connect();
+    }
+
+    private void connect() {
+        core.send(new SeedNodesRequest(couchbaseNodes))
+                .timeout(CouchbaseReaderFactory.TIMEOUT, CouchbaseReaderFactory.TIME_UNIT).toBlocking().single();
+        core.send(new OpenBucketRequest(bucket, password))
+                .timeout(CouchbaseReaderFactory.TIMEOUT, CouchbaseReaderFactory.TIME_UNIT).toBlocking().single();
+        this.pushThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                CouchbaseReader.this.run(bucketStreamAggregator);
+            }
+        }, feedName);
+        pushThread.start();
+    }
+
+    private void run(BucketStreamAggregator bucketStreamAggregator) {
+        BucketStreamAggregatorState state = new BucketStreamAggregatorState();
+        for (int i = 0; i < vbuckets.length; i++) {
+            state.put(new BucketStreamState(vbuckets[i], 0, 0, 0xffffffff, 0, 0xffffffff));
+        }
+        state.updates().subscribe(new Action1<BucketStreamStateUpdatedEvent>() {
+            @Override
+            public void call(BucketStreamStateUpdatedEvent event) {
+                if (event.partialUpdate()) {
+                } else {
+                }
+            }
+        });
+        try {
+            bucketStreamAggregator.feed(state).toBlocking().forEach(new Action1<DCPRequest>() {
+                @Override
+                public void call(final DCPRequest dcpRequest) {
+                    try {
+                        if (dcpRequest instanceof SnapshotMarkerMessage) {
+                            SnapshotMarkerMessage message = (SnapshotMarkerMessage) dcpRequest;
+                            final BucketStreamState oldState = state.get(message.partition());
+                            state.put(new BucketStreamState(message.partition(), oldState.vbucketUUID(),
+                                    message.endSequenceNumber(), oldState.endSequenceNumber(),
+                                    message.endSequenceNumber(), oldState.snapshotEndSequenceNumber()));
+                        } else if (dcpRequest instanceof MutationMessage) {
+
+                            messages.put((MutationMessage) dcpRequest);
+                        } else if (dcpRequest instanceof RemoveMessage) {
+                            RemoveMessage message = (RemoveMessage) dcpRequest;
+                            LOGGER.info(message.key() + " was deleted.");
+                        }
+                    } catch (Throwable th) {
+                        LOGGER.error(th);
+                    }
+                }
+            });
+        } catch (Throwable th) {
+            if (th.getCause() instanceof InterruptedException) {
+                LOGGER.warn("dcp thread was interrupted", th);
+                synchronized (this) {
+                    CouchbaseReader.this.close();
+                    notifyAll();
+                }
+            }
+            throw th;
+        }
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        return !done;
+    }
+
+    @Override
+    public IRawRecord<RecordWithMetadata<char[]>> next() throws IOException, InterruptedException {
+        if (messages.isEmpty()) {
+            controller.flush();
+        }
+        MutationMessage message = messages.take();
+        if (message == POISON_PILL) {
+            return null;
+        }
+        String key = message.key();
+        int vbucket = message.partition();
+        long seq = message.bySequenceNumber();
+        String bucket = message.bucket();
+        long cas = message.cas();
+        long creationTime = message.creationTime();
+        int expiration = message.expiration();
+        int flags = message.flags();
+        long revSeqNumber = message.revisionSequenceNumber();
+        int lockTime = message.lockTime();
+        recordWithMetadata.reset();
+        recordWithMetadata.setMetadata(0, key);
+        recordWithMetadata.setMetadata(1, bucket);
+        recordWithMetadata.setMetadata(2, vbucket);
+        recordWithMetadata.setMetadata(3, seq);
+        recordWithMetadata.setMetadata(4, cas);
+        recordWithMetadata.setMetadata(5, creationTime);
+        recordWithMetadata.setMetadata(6, expiration);
+        recordWithMetadata.setMetadata(7, flags);
+        recordWithMetadata.setMetadata(8, revSeqNumber);
+        recordWithMetadata.setMetadata(9, lockTime);
+        CouchbaseReader.set(message.content(), decoder, bytes, chars, value);
+        return record;
+    }
+
+    @Override
+    public boolean stop() {
+        done = true;
+        core.send(new CloseBucketRequest(bucket)).toBlocking();
+        try {
+            messages.put(CouchbaseReader.POISON_PILL);
+        } catch (InterruptedException e) {
+            LOGGER.warn(e);
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void setController(IDataFlowController controller) {
+        this.controller = (AbstractFeedDataFlowController) controller;
+    }
+
+    public static void set(ByteBuf content, CharsetDecoder decoder, ByteBuffer bytes, CharBuffer chars,
+            CharArrayRecord record) {
+        int position = content.readerIndex();
+        int limit = content.writerIndex();
+        int contentSize = content.capacity();
+        while (position < limit) {
+            bytes.clear();
+            chars.clear();
+            if (contentSize - position < bytes.capacity()) {
+                bytes.limit(contentSize - position);
+            }
+            content.getBytes(position, bytes);
+            position += bytes.position();
+            bytes.flip();
+            decoder.decode(bytes, chars, false);
+            chars.flip();
+            record.append(chars);
+        }
+        record.endRecord();
+    }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
new file mode 100644
index 0000000..b9b6f65
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.couchbase;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.input.record.RecordWithMetadata;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+import com.couchbase.client.core.CouchbaseCore;
+import com.couchbase.client.core.config.CouchbaseBucketConfig;
+import com.couchbase.client.core.env.DefaultCoreEnvironment;
+import com.couchbase.client.core.env.DefaultCoreEnvironment.Builder;
+import com.couchbase.client.core.message.cluster.CloseBucketRequest;
+import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
+import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
+import com.couchbase.client.core.message.cluster.OpenBucketRequest;
+import com.couchbase.client.core.message.cluster.SeedNodesRequest;
+
+import rx.functions.Func1;
+
+public class CouchbaseReaderFactory implements IRecordReaderFactory<RecordWithMetadata<char[]>> {
+
+    private static final long serialVersionUID = 1L;
+    // Constant fields
+    public static final boolean DCP_ENABLED = true;
+    public static final long AUTO_RELEASE_AFTER_MILLISECONDS = 5000L;
+    public static final int TIMEOUT = 5;
+    public static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
+    // Dynamic fields
+    private Map<String, String> configuration;
+    private String bucket;
+    private String password = "";
+    private String[] couchbaseNodes;
+    private int numOfVBuckets;
+    private int[] schedule;
+    private String feedName;
+    // Transient fields
+    private transient CouchbaseCore core;
+    private transient Builder builder;
+    private transient DefaultCoreEnvironment env;
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.RECORDS;
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        return AsterixClusterProperties.INSTANCE.getClusterLocations();
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        // validate first
+        if (!configuration.containsKey(ExternalDataConstants.KEY_BUCKET)) {
+            throw new AsterixException("Unspecified bucket");
+        }
+        if (!configuration.containsKey(ExternalDataConstants.KEY_NODES)) {
+            throw new AsterixException("Unspecified Couchbase nodes");
+        }
+        if (configuration.containsKey(ExternalDataConstants.KEY_PASSWORD)) {
+            password = configuration.get(ExternalDataConstants.KEY_PASSWORD);
+        }
+        this.configuration = configuration;
+        bucket = configuration.get(ExternalDataConstants.KEY_BUCKET);
+        couchbaseNodes = configuration.get(ExternalDataConstants.KEY_NODES).split(",");
+        feedName = configuration.get(ExternalDataConstants.KEY_FEED_NAME);
+        builder = DefaultCoreEnvironment.builder().dcpEnabled(DCP_ENABLED)
+                .autoreleaseAfter(AUTO_RELEASE_AFTER_MILLISECONDS);
+        env = builder.build();
+        core = new CouchbaseCore(env);
+        getNumberOfVbuckets();
+        schedule();
+    }
+
+    /*
+     * We distribute the work of streaming vbuckets between all the partitions in a round robin fashion.
+     */
+    private void schedule() {
+        schedule = new int[numOfVBuckets];
+        String[] locations = AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations();
+        for (int i = 0; i < numOfVBuckets; i++) {
+            schedule[i] = i % locations.length;
+        }
+    }
+
+    private void getNumberOfVbuckets() {
+        core.send(new SeedNodesRequest(couchbaseNodes)).timeout(TIMEOUT, TIME_UNIT).toBlocking().single();
+        core.send(new OpenBucketRequest(bucket, password)).timeout(TIMEOUT, TIME_UNIT).toBlocking().single();
+        numOfVBuckets = core.<GetClusterConfigResponse> send(new GetClusterConfigRequest())
+                .map(new Func1<GetClusterConfigResponse, Integer>() {
+                    @Override
+                    public Integer call(GetClusterConfigResponse response) {
+                        CouchbaseBucketConfig config = (CouchbaseBucketConfig) response.config().bucketConfig(bucket);
+                        return config.numberOfPartitions();
+                    }
+                }).timeout(TIMEOUT, TIME_UNIT).toBlocking().single();
+        core.send(new CloseBucketRequest(bucket)).toBlocking();
+    }
+
+    @Override
+    public IRecordReader<? extends RecordWithMetadata<char[]>> createRecordReader(IHyracksTaskContext ctx,
+            int partition) throws Exception {
+        String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+        ArrayList<Short> listOfAssignedVBuckets = new ArrayList<Short>();
+        for (int i = 0; i < schedule.length; i++) {
+            if (schedule[i] == partition) {
+                listOfAssignedVBuckets.add((short) i);
+            }
+        }
+        short[] vbuckets = new short[listOfAssignedVBuckets.size()];
+        for (int i = 0; i < vbuckets.length; i++) {
+            vbuckets[i] = listOfAssignedVBuckets.get(i);
+        }
+        CouchbaseReader reader = new CouchbaseReader(feedName + ":" + nodeName + ":" + partition, bucket, password,
+                couchbaseNodes, vbuckets, ExternalDataUtils.getQueueSize(configuration));
+        reader.configure(configuration);
+        return reader;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Class<? extends RecordWithMetadata<char[]>> getRecordClass() {
+        RecordWithMetadata<char[]> record = new RecordWithMetadata<char[]>(char[].class);
+        return (Class<? extends RecordWithMetadata<char[]>>) record.getClass();
+    }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractCharRecordLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractCharRecordLookupReader.java
similarity index 95%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractCharRecordLookupReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractCharRecordLookupReader.java
index 1b84e7a..0627660 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractCharRecordLookupReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractCharRecordLookupReader.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.hdfs;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -69,9 +69,10 @@
         reusableByteBuffer.put(value.getBytes(), 0, value.getLength());
         reusableByteBuffer.flip();
         while (reusableByteBuffer.hasRemaining()) {
-            decoder.decode(reusableByteBuffer, reusableCharBuffer, false);
-            record.append(reusableCharBuffer.array(), 0, reusableCharBuffer.position());
             reusableCharBuffer.clear();
+            decoder.decode(reusableByteBuffer, reusableCharBuffer, false);
+            reusableCharBuffer.flip();
+            record.append(reusableCharBuffer);
         }
         record.endRecord();
     }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractHDFSLookupRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractHDFSLookupRecordReader.java
similarity index 98%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractHDFSLookupRecordReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractHDFSLookupRecordReader.java
index 5a20962..28abddb 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractHDFSLookupRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractHDFSLookupRecordReader.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.hdfs;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/HDFSLookupReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
similarity index 92%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/HDFSLookupReaderFactory.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
index e9fad25..c302b9b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/HDFSLookupReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader.factory;
+package org.apache.asterix.external.input.record.reader.hdfs;
 
 import java.util.Map;
 
@@ -24,9 +24,6 @@
 import org.apache.asterix.external.api.ILookupReaderFactory;
 import org.apache.asterix.external.api.ILookupRecordReader;
 import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.asterix.external.input.record.reader.RCLookupReader;
-import org.apache.asterix.external.input.record.reader.SequenceLookupReader;
-import org.apache.asterix.external.input.record.reader.TextLookupReader;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.HDFSUtils;
 import org.apache.hadoop.fs.FileSystem;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java
similarity index 95%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSRecordReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java
index d88f967..564d55a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.hdfs;
 
 import java.io.IOException;
 import java.util.List;
@@ -28,6 +28,7 @@
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.input.record.reader.EmptyRecordReader;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -99,16 +100,6 @@
         return record;
     }
 
-    @Override
-    public Class<? extends Writable> getRecordClass() throws IOException {
-        if (value == null) {
-            if (!nextInputSplit()) {
-                return null;
-            }
-        }
-        return value.getClass();
-    }
-
     private boolean nextInputSplit() throws IOException {
         for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
             /**
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSTextLineReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSTextLineReader.java
similarity index 99%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSTextLineReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSTextLineReader.java
index ea851a5..90f1558 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSTextLineReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSTextLineReader.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.hdfs;
 
 import java.io.IOException;
 
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RCLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/RCLookupReader.java
similarity index 97%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RCLookupReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/RCLookupReader.java
index 5c33502..95d76ba 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RCLookupReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/RCLookupReader.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.hdfs;
 
 import java.io.IOException;
 
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SequenceLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/SequenceLookupReader.java
similarity index 97%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SequenceLookupReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/SequenceLookupReader.java
index c294ccb..23e647f 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SequenceLookupReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/SequenceLookupReader.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.hdfs;
 
 import java.io.IOException;
 
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TextLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/TextLookupReader.java
similarity index 96%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TextLookupReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/TextLookupReader.java
index b276bfa..2e1a11a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TextLookupReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/TextLookupReader.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.hdfs;
 
 import java.io.IOException;
 
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RSSRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReader.java
similarity index 96%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RSSRecordReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReader.java
index 1c2dc30..13cd26a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RSSRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReader.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.rss;
 
 import java.io.IOException;
 import java.net.MalformedURLException;
@@ -99,11 +99,6 @@
     }
 
     @Override
-    public Class<SyndEntryImpl> getRecordClass() throws IOException {
-        return SyndEntryImpl.class;
-    }
-
-    @Override
     public boolean stop() {
         done = true;
         return true;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/RSSRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
similarity index 95%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/RSSRecordReaderFactory.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
index a672f2f..bbe485c 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/RSSRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader.factory;
+package org.apache.asterix.external.input.record.reader.rss;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -24,7 +24,6 @@
 
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.RSSRecordReader;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java
similarity index 88%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java
index 93ba0a0..6225b82 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java
@@ -16,15 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.stream;
 
 import java.io.IOException;
 import java.util.Map;
 
+import org.apache.asterix.external.api.IDataFlowController;
 import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.api.IIndexingDatasource;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.input.record.CharArrayRecord;
 import org.apache.asterix.external.input.stream.AInputStream;
 import org.apache.asterix.external.input.stream.AInputStreamReader;
@@ -57,11 +59,6 @@
     }
 
     @Override
-    public Class<char[]> getRecordClass() {
-        return char[].class;
-    }
-
-    @Override
     public void configure(Map<String, String> configuration) throws Exception {
         record = new CharArrayRecord();
         inputBuffer = new char[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
@@ -87,4 +84,9 @@
             return false;
         }
     }
+
+    @Override
+    public void setController(IDataFlowController controller) {
+        reader.setController((AbstractFeedDataFlowController) controller);
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
similarity index 98%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReaderFactory.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
index c7acb1a..f02bd93 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.stream;
 
 import java.util.List;
 import java.util.Map;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
similarity index 98%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
index 2b33d7a..f8f7cda 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.stream;
 
 import java.io.IOException;
 import java.util.Map;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/LineRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
similarity index 85%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/LineRecordReaderFactory.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
index 05d419d..f0867d3 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/LineRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
@@ -16,14 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader.factory;
+package org.apache.asterix.external.input.record.reader.stream;
 
 import java.util.Map;
 
 import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.reader.AbstractStreamRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.LineRecordReader;
-import org.apache.asterix.external.input.record.reader.QuotedLineRecordReader;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
similarity index 98%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
index 49e67e9..a8eb07b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.stream;
 
 import java.io.IOException;
 import java.util.Map;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
similarity index 98%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 84c96d0..d469cb3 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.stream;
 
 import java.io.IOException;
 import java.util.Map;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/SemiStructuredRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
similarity index 87%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/SemiStructuredRecordReaderFactory.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
index 91b439c..ec8eac9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/SemiStructuredRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
@@ -16,13 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader.factory;
+package org.apache.asterix.external.input.record.reader.stream;
 
 import java.util.Map;
 
 import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.reader.AbstractStreamRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.SemiStructuredRecordReader;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public class SemiStructuredRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPullRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
similarity index 94%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPullRecordReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
index 34d8122..084d6d0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPullRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.twitter;
 
 import java.io.IOException;
 import java.util.List;
@@ -94,13 +94,7 @@
     }
 
     @Override
-    public Class<Status> getRecordClass() throws IOException {
-        return Status.class;
-    }
-
-    @Override
     public boolean stop() {
         return false;
     }
-
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
similarity index 95%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
index 3ce6a81..764ac1d 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.twitter;
 
 import java.io.IOException;
 import java.util.Map;
@@ -80,11 +80,6 @@
     }
 
     @Override
-    public Class<? extends Status> getRecordClass() throws IOException {
-        return Status.class;
-    }
-
-    @Override
     public boolean stop() {
         try {
             close();
@@ -128,5 +123,4 @@
         public void onTrackLimitationNotice(int arg0) {
         }
     }
-
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
similarity index 96%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 6840c11..f38c2cb 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader.factory;
+package org.apache.asterix.external.input.record.reader.twitter;
 
 import java.util.Map;
 import java.util.logging.Level;
@@ -25,8 +25,6 @@
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.TwitterPullRecordReader;
-import org.apache.asterix.external.input.record.reader.TwitterPushRecordReader;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.TwitterUtil;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
index 73f6195..ce65249 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
@@ -20,9 +20,14 @@
 
 import java.io.InputStream;
 
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+
 public abstract class AInputStream extends InputStream {
     public abstract boolean skipError() throws Exception;
 
     public abstract boolean stop() throws Exception;
 
+    public void setController(AbstractFeedDataFlowController controller) throws UnsupportedOperationException {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
index 7ba6032..25418b0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
@@ -21,6 +21,8 @@
 import java.io.IOException;
 import java.io.InputStreamReader;
 
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+
 public class AInputStreamReader extends InputStreamReader {
     private AInputStream in;
 
@@ -40,4 +42,8 @@
             throw new IOException(e);
         }
     }
+
+    public void setController(AbstractFeedDataFlowController controller) {
+        in.setController(controller);
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
index 7eebe4c..7b7cd8b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
@@ -22,6 +22,7 @@
 import java.io.IOException;
 import java.nio.file.Path;
 
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.asterix.external.util.FileSystemWatcher;
@@ -39,6 +40,11 @@
     }
 
     @Override
+    public void setController(AbstractFeedDataFlowController controller) {
+        watcher.setController(controller);
+    }
+
+    @Override
     public void close() throws IOException {
         IOException ioe = null;
         if (in != null) {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
index ab1f8a0..06833af 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
@@ -28,7 +28,7 @@
 import org.apache.asterix.external.api.IInputStreamProviderFactory;
 import org.apache.asterix.external.api.INodeResolver;
 import org.apache.asterix.external.api.INodeResolverFactory;
-import org.apache.asterix.external.input.stream.LocalFSInputStreamProvider;
+import org.apache.asterix.external.input.stream.provider.LocalFSInputStreamProvider;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.FeedUtils;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
index 37afa53..ea60f43 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
@@ -30,7 +30,7 @@
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.IInputStreamProvider;
 import org.apache.asterix.external.api.IInputStreamProviderFactory;
-import org.apache.asterix.external.input.stream.SocketInputStreamProvider;
+import org.apache.asterix.external.input.stream.provider.SocketInputStreamProvider;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.om.util.AsterixRuntimeUtil;
 import org.apache.commons.lang3.StringUtils;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
similarity index 95%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
index 8f4c094..93a1685 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.stream;
+package org.apache.asterix.external.input.stream.provider;
 
 import java.io.IOException;
 import java.util.List;
@@ -24,7 +24,8 @@
 
 import org.apache.asterix.external.api.IInputStreamProvider;
 import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.input.record.reader.HDFSRecordReader;
+import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
+import org.apache.asterix.external.input.stream.AInputStream;
 import org.apache.asterix.external.provider.ExternalIndexerProvider;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.hadoop.io.Text;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
similarity index 92%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
index 22d0a87..4c4edd3 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.stream;
+package org.apache.asterix.external.input.stream.provider;
 
 import java.io.File;
 import java.io.IOException;
@@ -24,6 +24,8 @@
 import java.util.Map;
 
 import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.asterix.external.input.stream.LocalFileSystemInputStream;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketInputStreamProvider.java
similarity index 86%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStreamProvider.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketInputStreamProvider.java
index 1f920e9..2b12675 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketInputStreamProvider.java
@@ -16,11 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.stream;
+package org.apache.asterix.external.input.stream.provider;
 
 import java.net.ServerSocket;
 
 import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.asterix.external.input.stream.SocketInputStream;
 
 public class SocketInputStreamProvider implements IInputStreamProvider {
     private ServerSocket server;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
similarity index 97%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
index 7c64aa3..06f7e72 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.stream;
+package org.apache.asterix.external.input.stream.provider;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -30,6 +30,7 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.input.stream.AInputStream;
 import org.apache.asterix.external.util.TweetGenerator;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
index 9e35617..2cab3a7 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
@@ -80,7 +80,7 @@
                 try {
                     adapter = adapterFactory.createAdapter(ctx, partition,
                             recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), snapshotAccessor, writer);
-                    //Open the file index accessor here
+                    // Open the file index accessor here
                     snapshotAccessor.open();
                     indexOpen = true;
                     adapter.open();
@@ -127,6 +127,11 @@
                     throw new HyracksDataException(th);
                 }
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                adapter.flush();
+            }
         };
     }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
index 80a54be..fa2e513 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -25,9 +25,9 @@
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.external.feed.api.IFeedManager;
-import org.apache.asterix.external.feed.api.ISubscribableRuntime;
 import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feed.api.ISubscribableRuntime;
 import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
 import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
@@ -183,6 +183,9 @@
                     inputSideHandler.nextFrame(null); // signal end of data
                     while (!inputSideHandler.isFinished()) {
                         synchronized (coreOperator) {
+                            if (inputSideHandler.isFinished()) {
+                                break;
+                            }
                             coreOperator.wait();
                         }
                     }
@@ -192,8 +195,8 @@
             }
             coreOperator.close();
             System.out.println("CLOSED " + coreOperator + " STALLED ?" + stalled + " ENDED " + end);
-        } catch (Exception e) {
-            e.printStackTrace();
+        } catch (InterruptedException e) {
+            throw new HyracksDataException(e);
         } finally {
             if (!stalled) {
                 deregister();
@@ -221,4 +224,9 @@
         }
     }
 
+    @Override
+    public void flush() throws HyracksDataException {
+        inputSideHandler.flush();
+    }
+
 }
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java
index 4dae72d..b09504a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java
@@ -181,4 +181,9 @@
         }
     }
 
+    @Override
+    public void flush() throws HyracksDataException {
+        inputSideHandler.flush();
+    }
+
 }
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index f75b3eb..3c4c9ad 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -188,6 +188,9 @@
                 inputSideHandler.nextFrame(null); // signal end of data
                 while (!inputSideHandler.isFinished()) {
                     synchronized (coreOperator) {
+                        if (inputSideHandler.isFinished()) {
+                            break;
+                        }
                         coreOperator.wait();
                     }
                 }
@@ -195,8 +198,7 @@
             }
             coreOperator.close();
         } catch (Exception e) {
-            e.printStackTrace();
-            // ignore
+            throw new HyracksDataException(e);
         } finally {
             if (!stalled) {
                 deregister();
@@ -217,4 +219,9 @@
         }
     }
 
+    @Override
+    public void flush() throws HyracksDataException {
+        inputSideHandler.flush();
+    }
+
 }
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
index 129b62f..14a3e2a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
@@ -39,7 +39,6 @@
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IStreamDataParser;
-import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.om.base.ABoolean;
 import org.apache.asterix.om.base.ANull;
@@ -67,7 +66,6 @@
 
     private AdmLexer admLexer;
     private ARecordType recordType;
-    private boolean datasetRec;
     private boolean isStreamParser = true;
 
     private int nullableFieldId = 0;
@@ -142,7 +140,7 @@
     public boolean parse(DataOutput out) throws AsterixException {
         try {
             resetPools();
-            return parseAdmInstance(recordType, datasetRec, out);
+            return parseAdmInstance(recordType, out);
         } catch (IOException e) {
             throw new ParseException(e, filename, admLexer.getLine(), admLexer.getColumn());
         } catch (AdmLexerException e) {
@@ -163,12 +161,6 @@
     public void configure(Map<String, String> configuration, ARecordType recordType) throws IOException {
         this.recordType = recordType;
         this.configuration = configuration;
-        String isDatasetRecordString = configuration.get(ExternalDataConstants.KEY_DATASET_RECORD);
-        if (isDatasetRecordString == null) {
-            this.datasetRec = true;
-        } else {
-            this.datasetRec = Boolean.parseBoolean(isDatasetRecordString);
-        }
         this.isStreamParser = ExternalDataUtils.isDataSourceStreamProvider(configuration);
         if (!isStreamParser) {
             this.admLexer = new AdmLexer();
@@ -180,7 +172,7 @@
         try {
             resetPools();
             admLexer.setBuffer(record.get());
-            parseAdmInstance(recordType, datasetRec, out);
+            parseAdmInstance(recordType, out);
         } catch (IOException e) {
             throw new ParseException(e, filename, admLexer.getLine(), admLexer.getColumn());
         } catch (AdmLexerException e) {
@@ -201,18 +193,18 @@
         admLexer = new AdmLexer(new java.io.InputStreamReader(in));
     }
 
-    protected boolean parseAdmInstance(IAType objectType, boolean datasetRec, DataOutput out)
+    protected boolean parseAdmInstance(IAType objectType, DataOutput out)
             throws AsterixException, IOException, AdmLexerException {
         int token = admLexer.next();
         if (token == AdmLexer.TOKEN_EOF) {
             return false;
         } else {
-            admFromLexerStream(token, objectType, out, datasetRec);
+            admFromLexerStream(token, objectType, out);
             return true;
         }
     }
 
-    private void admFromLexerStream(int token, IAType objectType, DataOutput out, Boolean datasetRec)
+    private void admFromLexerStream(int token, IAType objectType, DataOutput out)
             throws AsterixException, IOException, AdmLexerException {
 
         switch (token) {
@@ -441,7 +433,7 @@
             case AdmLexer.TOKEN_START_RECORD: {
                 if (checkType(ATypeTag.RECORD, objectType)) {
                     objectType = getComplexType(objectType, ATypeTag.RECORD);
-                    parseRecord((ARecordType) objectType, out, datasetRec);
+                    parseRecord((ARecordType) objectType, out);
                 } else {
                     throw new ParseException(mismatchErrorMessage + objectType.getTypeTag());
                 }
@@ -567,7 +559,7 @@
         return getTargetTypeTag(expectedTypeTag, aObjectType) != null;
     }
 
-    private void parseRecord(ARecordType recType, DataOutput out, Boolean datasetRec)
+    private void parseRecord(ARecordType recType, DataOutput out)
             throws IOException, AsterixException, AdmLexerException {
 
         ArrayBackedValueStorage fieldValueBuffer = getTempBuffer();
@@ -575,16 +567,8 @@
         IARecordBuilder recBuilder = getRecordBuilder();
 
         BitSet nulls = null;
-        if (datasetRec) {
-
-            if (recType != null) {
-                nulls = new BitSet(recType.getFieldNames().length);
-                recBuilder.reset(recType);
-            } else {
-                recBuilder.reset(null);
-            }
-
-        } else if (recType != null) {
+        if (recType != null) {
+            //TODO: use BitSet Pool
             nulls = new BitSet(recType.getFieldNames().length);
             recBuilder.reset(recType);
         } else {
@@ -650,7 +634,7 @@
                     }
 
                     token = admLexer.next();
-                    this.admFromLexerStream(token, fieldType, fieldValueBuffer.getDataOutput(), false);
+                    this.admFromLexerStream(token, fieldType, fieldValueBuffer.getDataOutput());
                     if (openRecordField) {
                         if (fieldValueBuffer.getByteArray()[0] != ATypeTag.NULL.serialize()) {
                             recBuilder.addField(fieldNameBuffer, fieldValueBuffer);
@@ -752,7 +736,7 @@
                 expectingListItem = false;
                 itemBuffer.reset();
 
-                admFromLexerStream(token, itemType, itemBuffer.getDataOutput(), false);
+                admFromLexerStream(token, itemType, itemBuffer.getDataOutput());
                 orderedListBuilder.addItem(itemBuffer);
             }
             first = false;
@@ -799,7 +783,7 @@
             } else {
                 expectingListItem = false;
                 itemBuffer.reset();
-                admFromLexerStream(token, itemType, itemBuffer.getDataOutput(), false);
+                admFromLexerStream(token, itemType, itemBuffer.getDataOutput());
                 unorderedListBuilder.addItem(itemBuffer);
             }
             first = false;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
index 6c399c3..2f0fc86 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
@@ -28,6 +28,7 @@
 import org.apache.asterix.builders.RecordBuilder;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.dataflow.data.nontagged.serde.ANullSerializerDeserializer;
+import org.apache.asterix.external.api.IDataParser;
 import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
@@ -124,18 +125,6 @@
         }
     }
 
-    protected void fieldNameToBytes(String fieldName, AMutableString str, ArrayBackedValueStorage buffer)
-            throws HyracksDataException {
-        buffer.reset();
-        DataOutput out = buffer.getDataOutput();
-        str.setValue(fieldName);
-        try {
-            stringSerde.serialize(str, out);
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
     @Override
     public DataSourceType getDataSourceType() {
         return isStreamParser ? DataSourceType.STREAM : DataSourceType.RECORDS;
@@ -173,7 +162,8 @@
                     throw new HyracksDataException("Illegal field " + name + " in closed type " + recordType);
                 } else {
                     nameBuffers[i] = new ArrayBackedValueStorage();
-                    fieldNameToBytes(name, str, nameBuffers[i]);
+                    str.setValue(name);
+                    IDataParser.toBytes(str, nameBuffers[i], stringSerde);
                 }
             }
         }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
new file mode 100644
index 0000000..ecdb03d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.external.api.IDataParser;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.input.record.RecordWithMetadata;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public class RecordWithMetadataParser<T> implements IRecordDataParser<RecordWithMetadata<T>> {
+
+    private final Class<? extends RecordWithMetadata<T>> clazz;
+    private final int[] metaIndexes;
+    private final int valueIndex;
+    private ARecordType recordType;
+    private IRecordDataParser<T> valueParser;
+    private RecordBuilder recBuilder;
+    private ArrayBackedValueStorage[] nameBuffers;
+    private int numberOfFields;
+    private ArrayBackedValueStorage valueBuffer = new ArrayBackedValueStorage();
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ASTRING);
+
+    public RecordWithMetadataParser(Class<? extends RecordWithMetadata<T>> clazz, int[] metaIndexes,
+            IRecordDataParser<T> valueParser, int valueIndex) {
+        this.clazz = clazz;
+        this.metaIndexes = metaIndexes;
+        this.valueParser = valueParser;
+        this.valueIndex = valueIndex;
+    }
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.RECORDS;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration, ARecordType recordType)
+            throws HyracksDataException, IOException {
+        this.recordType = recordType;
+        this.numberOfFields = recordType.getFieldNames().length;
+        recBuilder = new RecordBuilder();
+        recBuilder.reset(recordType);
+        recBuilder.init();
+        nameBuffers = new ArrayBackedValueStorage[numberOfFields];
+        AMutableString str = new AMutableString(null);
+        for (int i = 0; i < numberOfFields; i++) {
+            String name = recordType.getFieldNames()[i];
+            nameBuffers[i] = new ArrayBackedValueStorage();
+            str.setValue(name);
+            IDataParser.toBytes(str, nameBuffers[i], stringSerde);
+        }
+    }
+
+    @Override
+    public Class<? extends RecordWithMetadata<T>> getRecordClass() {
+        return clazz;
+    }
+
+    @Override
+    public void parse(IRawRecord<? extends RecordWithMetadata<T>> record, DataOutput out) throws Exception {
+        recBuilder.reset(recordType);
+        valueBuffer.reset();
+        recBuilder.init();
+        RecordWithMetadata<T> rwm = record.get();
+        for (int i = 0; i < numberOfFields; i++) {
+            if (i == valueIndex) {
+                valueParser.parse(rwm.getRecord(), valueBuffer.getDataOutput());
+                recBuilder.addField(i, valueBuffer);
+            } else {
+                recBuilder.addField(i, rwm.getMetadata(metaIndexes[i]));
+            }
+        }
+        recBuilder.write(out, true);
+    }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
new file mode 100644
index 0000000..88a0683
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.factory;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IRecordDataParserFactory;
+import org.apache.asterix.external.input.record.RecordWithMetadata;
+import org.apache.asterix.external.parser.RecordWithMetadataParser;
+import org.apache.asterix.external.provider.ParserFactoryProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class RecordWithMetadataParserFactory<T> implements IRecordDataParserFactory<RecordWithMetadata<T>> {
+
+    private static final long serialVersionUID = 1L;
+    private Class<? extends RecordWithMetadata<T>> recordClass;
+    private ARecordType recordType;
+    private int[] metaIndexes;
+    private IRecordDataParserFactory<T> valueParserFactory;
+    private int valueIndex;
+
+    @Override
+    public DataSourceType getDataSourceType() throws AsterixException {
+        return DataSourceType.RECORDS;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        // validation first
+        if (!configuration.containsKey(ExternalDataConstants.KEY_META_INDEXES)) {
+            throw new HyracksDataException(
+                    "the parser parameter (" + ExternalDataConstants.KEY_META_INDEXES + ") is missing");
+        }
+        if (!configuration.containsKey(ExternalDataConstants.KEY_VALUE_INDEX)) {
+            throw new HyracksDataException(
+                    "the parser parameter (" + ExternalDataConstants.KEY_VALUE_INDEX + ") is missing");
+        }
+        if (!configuration.containsKey(ExternalDataConstants.KEY_VALUE_FORMAT)) {
+            throw new HyracksDataException(
+                    "the parser parameter (" + ExternalDataConstants.KEY_VALUE_FORMAT + ") is missing");
+        }
+        // get meta field indexes
+        String[] stringMetaIndexes = configuration.get(ExternalDataConstants.KEY_META_INDEXES).split(",");
+        metaIndexes = new int[stringMetaIndexes.length];
+        for (int i = 0; i < stringMetaIndexes.length; i++) {
+            metaIndexes[i] = Integer.parseInt(stringMetaIndexes[i].trim());
+        }
+        // get value index
+        valueIndex = Integer.parseInt(configuration.get(ExternalDataConstants.KEY_VALUE_INDEX).trim());
+        // get value format
+        configuration.put(ExternalDataConstants.KEY_DATA_PARSER,
+                configuration.get(ExternalDataConstants.KEY_VALUE_FORMAT));
+        valueParserFactory = (IRecordDataParserFactory<T>) ParserFactoryProvider.getDataParserFactory(configuration);
+        valueParserFactory.setRecordType((ARecordType) recordType.getFieldTypes()[valueIndex]);
+        valueParserFactory.configure(configuration);
+        recordClass = (Class<? extends RecordWithMetadata<T>>) (new RecordWithMetadata<T>(
+                valueParserFactory.getRecordClass())).getClass();
+    }
+
+    @Override
+    public void setRecordType(ARecordType recordType) {
+        this.recordType = recordType;
+    }
+
+    @Override
+    public IRecordDataParser<RecordWithMetadata<T>> createRecordParser(IHyracksTaskContext ctx)
+            throws HyracksDataException, AsterixException, IOException {
+        IRecordDataParser<T> valueParser = valueParserFactory.createRecordParser(ctx);
+        return new RecordWithMetadataParser<T>(recordClass, metaIndexes, valueParser, valueIndex);
+    }
+
+    @Override
+    public Class<? extends RecordWithMetadata<T>> getRecordClass() {
+        return recordClass;
+    }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index a7ab062..745c653 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -25,9 +25,10 @@
 import org.apache.asterix.external.api.IInputStreamProviderFactory;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.input.HDFSDataSourceFactory;
-import org.apache.asterix.external.input.record.reader.factory.LineRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.factory.SemiStructuredRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.factory.TwitterRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.couchbase.CouchbaseReaderFactory;
+import org.apache.asterix.external.input.record.reader.stream.LineRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.twitter.TwitterRecordReaderFactory;
 import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamProviderFactory;
 import org.apache.asterix.external.input.stream.factory.SocketInputStreamProviderFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -97,8 +98,11 @@
                 case ExternalDataConstants.READER_TWITTER_PUSH:
                     readerFactory = new TwitterRecordReaderFactory();
                     break;
+                case ExternalDataConstants.READER_COUCHBASE:
+                    readerFactory = new CouchbaseReaderFactory();
+                    break;
                 default:
-                    throw new AsterixException("unknown record reader factory");
+                    throw new AsterixException("unknown record reader factory: " + reader);
             }
         }
         return readerFactory;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LookupReaderFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/LookupReaderFactoryProvider.java
similarity index 92%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LookupReaderFactoryProvider.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/provider/LookupReaderFactoryProvider.java
index 3a82a68..18b9cb5 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LookupReaderFactoryProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/LookupReaderFactoryProvider.java
@@ -16,13 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.provider;
 
 import java.util.Map;
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.ILookupReaderFactory;
-import org.apache.asterix.external.input.record.reader.factory.HDFSLookupReaderFactory;
+import org.apache.asterix.external.input.record.reader.hdfs.HDFSLookupReaderFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.HDFSUtils;
 
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
index f5a0512..30595db 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
@@ -22,10 +22,12 @@
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.IDataParserFactory;
+import org.apache.asterix.external.input.record.RecordWithMetadata;
 import org.apache.asterix.external.parser.factory.ADMDataParserFactory;
 import org.apache.asterix.external.parser.factory.DelimitedDataParserFactory;
 import org.apache.asterix.external.parser.factory.HiveDataParserFactory;
 import org.apache.asterix.external.parser.factory.RSSParserFactory;
+import org.apache.asterix.external.parser.factory.RecordWithMetadataParserFactory;
 import org.apache.asterix.external.parser.factory.TweetParserFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
@@ -39,13 +41,12 @@
             return ExternalDataUtils.createExternalParserFactory(ExternalDataUtils.getDataverse(configuration),
                     parserFactoryName);
         } else {
-            parserFactory = ParserFactoryProvider.getParserFactory(configuration);
+            parserFactory = ParserFactoryProvider.getParserFactory(ExternalDataUtils.getRecordFormat(configuration));
         }
         return parserFactory;
     }
 
-    private static IDataParserFactory getParserFactory(Map<String, String> configuration) throws AsterixException {
-        String recordFormat = ExternalDataUtils.getRecordFormat(configuration);
+    private static IDataParserFactory getParserFactory(String recordFormat) throws AsterixException {
         switch (recordFormat) {
             case ExternalDataConstants.FORMAT_ADM:
             case ExternalDataConstants.FORMAT_JSON:
@@ -58,6 +59,8 @@
                 return new TweetParserFactory();
             case ExternalDataConstants.FORMAT_RSS:
                 return new RSSParserFactory();
+            case ExternalDataConstants.FORMAT_RECORD_WITH_META:
+                return new RecordWithMetadataParserFactory<RecordWithMetadata<?>>();
             default:
                 throw new AsterixException("Unknown data format");
         }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
index 90c74e1..27a1d0e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
@@ -36,7 +36,7 @@
     public static void addTupleToFrame(FrameTupleAppender appender, ArrayTupleBuilder tb, IFrameWriter writer)
             throws HyracksDataException {
         if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-            appender.flush(writer, true);
+            appender.write(writer, true);
             if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
                 throw new HyracksDataException("Tuple is too large for a frame");
             }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index fb2688f..3d7f60b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -40,9 +40,13 @@
     public static final String KEY_HDFS_URL = "hdfs";
     // specify the path when reading from a file system
     public static final String KEY_PATH = "path";
+    // specify the hdfs input format when reading data from HDFS
     public static final String KEY_INPUT_FORMAT = "input-format";
+    // specifies the filesystem (localfs or hdfs) when using a filesystem data source
     public static final String KEY_FILESYSTEM = "fs";
+    // specifies the address of the hdfs name node
     public static final String KEY_HADOOP_FILESYSTEM_URI = "fs.defaultFS";
+    // specifies the class implementation of the accessed instance of HDFS
     public static final String KEY_HADOOP_FILESYSTEM_CLASS = "fs.hdfs.impl";
     public static final String KEY_HADOOP_INPUT_DIR = "mapred.input.dir";
     public static final String KEY_HADOOP_INPUT_FORMAT = "mapred.input.format.class";
@@ -73,6 +77,20 @@
     public static final String KEY_IS_FEED = "is-feed";
     public static final String KEY_WAIT_FOR_DATA = "wait-for-data";
     public static final String KEY_FEED_NAME = "feed";
+    // a string representing external bucket name
+    public static final String KEY_BUCKET = "bucket";
+    // a comma delimited list of nodes
+    public static final String KEY_NODES = "nodes";
+    // a string representing the password used to authenticate with the external data source
+    public static final String KEY_PASSWORD = "password";
+    // an integer representing the number of raw records that can be bufferred in the parsing queue 
+    public static final String KEY_QUEUE_SIZE = "queue-size";
+    // a comma delimited integers representing the indexes of the meta fields in the raw record (i,e: "3,1,0,2" denotes that the first meta field is in index 3 in the actual record)
+    public static final String KEY_META_INDEXES = "meta-indexes";
+    // an integer representing the index of the value field in the data type
+    public static final String KEY_VALUE_INDEX = "value-index";
+    // a string representing the format of the raw record in the value field in the data type
+    public static final String KEY_VALUE_FORMAT = "value-format";
     /**
      * HDFS class names
      */
@@ -95,6 +113,7 @@
      */
     public static final String READER_HDFS = "hdfs";
     public static final String READER_ADM = "adm";
+    public static final String READER_COUCHBASE = "couchbase";
     public static final String READER_SEMISTRUCTURED = "semi-structured";
     public static final String READER_DELIMITED = "delimited-text";
     public static final String READER_TWITTER_PUSH = "twitter-push";
@@ -120,6 +139,7 @@
     public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
     public static final String FORMAT_TWEET = "tweet";
     public static final String FORMAT_RSS = "rss";
+    public static final String FORMAT_RECORD_WITH_META = "record-with-meta";
 
     /**
      * input streams
@@ -178,6 +198,7 @@
      */
     public static final int DEFAULT_BUFFER_SIZE = 4096;
     public static final int DEFAULT_BUFFER_INCREMENT = 4096;
+    public static final int DEFAULT_QUEUE_SIZE = 64;
 
     /**
      * Expected parameter values
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index c9be872..7c03e4d 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -171,15 +171,19 @@
             if (tag == null) {
                 throw new NotImplementedException("Failed to get the type information for field " + i + ".");
             }
-            IValueParserFactory vpf = valueParserFactoryMap.get(tag);
-            if (vpf == null) {
-                throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
-            }
-            fieldParserFactories[i] = vpf;
+            fieldParserFactories[i] = getParserFactory(tag);
         }
         return fieldParserFactories;
     }
 
+    public static IValueParserFactory getParserFactory(ATypeTag tag) {
+        IValueParserFactory vpf = valueParserFactoryMap.get(tag);
+        if (vpf == null) {
+            throw new NotImplementedException("No value parser factory for fields of type " + tag);
+        }
+        return vpf;
+    }
+
     public static String getRecordReaderStreamName(Map<String, String> configuration) {
         return configuration.get(ExternalDataConstants.KEY_READER_STREAM);
     }
@@ -254,4 +258,10 @@
     public static String getFeedName(Map<String, String> configuration) {
         return configuration.get(ExternalDataConstants.KEY_FEED_NAME);
     }
+
+    public static int getQueueSize(Map<String, String> configuration) {
+        return configuration.containsKey(ExternalDataConstants.KEY_QUEUE_SIZE)
+                ? Integer.parseInt(configuration.get(ExternalDataConstants.KEY_QUEUE_SIZE))
+                : ExternalDataConstants.DEFAULT_QUEUE_SIZE;
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
index 4bb9d92..631eef4 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
@@ -34,6 +34,8 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
@@ -50,6 +52,7 @@
     private final boolean isFeed;
     private boolean done;
     private File current;
+    private AbstractFeedDataFlowController controller;
 
     public FileSystemWatcher(FeedLogManager logManager, Path inputResource, String expression, boolean isFeed)
             throws IOException {
@@ -199,7 +202,7 @@
         return false;
     }
 
-    public boolean hasNext() {
+    public boolean hasNext() throws HyracksDataException {
         if (it.hasNext()) {
             return true;
         }
@@ -218,6 +221,9 @@
             key = watcher.poll();
         }
         // No file was found, wait for the filesystem to push events
+        if (controller != null) {
+            controller.flush();
+        }
         while (files.isEmpty()) {
             try {
                 key = watcher.take();
@@ -241,4 +247,8 @@
         it = files.iterator();
         return it.hasNext();
     }
+
+    public void setController(AbstractFeedDataFlowController controller) {
+        this.controller = controller;
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index de6737a..7ac0428 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -28,7 +28,7 @@
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.IndexingScheduler;
 import org.apache.asterix.external.indexing.RecordId.RecordIdType;
-import org.apache.asterix.external.input.stream.HDFSInputStreamProvider;
+import org.apache.asterix.external.input.stream.provider.HDFSInputStreamProvider;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.hadoop.fs.BlockLocation;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index cff5c6b..f15540a 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -2155,7 +2155,7 @@
         }
     }
 
-    public AlgebricksPartitionConstraint getClusterLocations() {
+    public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
         return AsterixClusterProperties.INSTANCE.getClusterLocations();
     }
 
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index 769b883..c69cd16 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -37,6 +37,7 @@
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.external.feed.api.IFeed;
 import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
@@ -510,7 +511,7 @@
         return feedProps;
     }
 
-    private static ARecordType getOutputType(Feed feed, Map<String, String> configuration) throws Exception {
+    public static ARecordType getOutputType(IFeed feed, Map<String, String> configuration) throws Exception {
         ARecordType outputType = null;
         String fqOutputType = configuration.get(ExternalDataConstants.KEY_TYPE_NAME);
 
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
index c2c28df..2744630 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
@@ -44,7 +44,6 @@
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 
 /**
  * A holder class for properties related to the Asterix cluster.
@@ -229,7 +228,7 @@
         return participantNodes;
     }
 
-    public synchronized AlgebricksPartitionConstraint getClusterLocations() {
+    public synchronized AlgebricksAbsolutePartitionConstraint getClusterLocations() {
         if (clusterPartitionConstraint == null) {
             resetClusterPartitionConstraint();
         }
