Added asterix project

git-svn-id: https://asterixdb.googlecode.com/svn/trunk/asterix@12 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
new file mode 100644
index 0000000..67e89af
--- /dev/null
+++ b/asterix-external-data/pom.xml
@@ -0,0 +1,150 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<artifactId>asterix</artifactId>
+		<groupId>edu.uci.ics.asterix</groupId>
+		<version>0.0.4-SNAPSHOT</version>
+	</parent>
+	<groupId>edu.uci.ics.asterix</groupId>
+	<artifactId>asterix-external-data</artifactId>
+	<version>0.0.4-SNAPSHOT</version>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>2.0.2</version>
+				<configuration>
+					<source>1.6</source>
+					<target>1.6</target>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>appassembler-maven-plugin</artifactId>
+				<version>1.0</version>
+				<executions>
+					<execution>
+						<configuration>
+							<programs>
+								<program>
+									<mainClass>edu.uci.ics.asterix.drivers.AsterixWebServer</mainClass>
+									<name>asterix-web</name>
+								</program>
+								<program>
+									<mainClass>edu.uci.ics.asterix.drivers.AsterixClientDriver</mainClass>
+									<name>asterix-cmd</name>
+								</program>
+							</programs>
+							<repositoryLayout>flat</repositoryLayout>
+							<repositoryName>lib</repositoryName>
+						</configuration>
+						<phase>package</phase>
+						<goals>
+							<goal>assemble</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.7.2</version>
+				<configuration>
+					<!--
+						doesn't work from m2eclipse, currently
+						<additionalClasspathElements>
+						<additionalClasspathElement>${basedir}/src/main/resources</additionalClasspathElement>
+						</additionalClasspathElements>
+					-->
+					<forkMode>pertest</forkMode>
+					<argLine>-enableassertions -Xmx${test.heap.size}m -Dfile.encoding=UTF-8
+						-Djava.util.logging.config.file=src/test/resources/logging.properties</argLine>
+					<includes>
+						<include>**/*TestSuite.java</include>
+						<include>**/*Test.java</include>
+					</includes>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+	<dependencies>
+		<dependency>
+			<groupId>javax.servlet</groupId>
+			<artifactId>servlet-api</artifactId>
+			<version>2.5</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.8.1</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-control-cc</artifactId>
+			<version>0.2.0-SNAPSHOT</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-control-nc</artifactId>
+			<version>0.2.0-SNAPSHOT</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.asterix</groupId>
+			<artifactId>asterix-om</artifactId>
+			<version>0.0.4-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.asterix</groupId>
+			<artifactId>asterix-runtime</artifactId>
+			<version>0.0.4-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.kenai.nbpwr</groupId>
+			<artifactId>org-apache-commons-io</artifactId>
+			<version>1.3.1-201002241208</version>
+			<scope>test</scope>
+		</dependency>
+                <dependency>
+                        <groupId>org.twitter4j</groupId>
+                        <artifactId>twitter4j-core</artifactId>
+                        <version>2.2.3</version>
+                </dependency>
+                <dependency>
+                        <groupId>org.apache.hadoop</groupId>
+                        <artifactId>hadoop-core</artifactId>
+                        <version>0.20.2</version>
+                        <type>jar</type>
+                        <scope>compile</scope>
+                </dependency>
+                <dependency>
+                        <groupId>net.java.dev.rome</groupId>
+                        <artifactId>rome-fetcher</artifactId>
+                        <version>1.0.0</version>
+                        <type>jar</type>
+                        <scope>compile</scope>
+                </dependency>
+                <dependency>
+                         <groupId>rome</groupId>
+                         <artifactId>rome</artifactId>
+                         <version>1.0.1-modified-01</version>
+                </dependency>
+<dependency>
+            <groupId>jdom</groupId>
+            <artifactId>jdom</artifactId>
+            <version>1.0</version>
+        </dependency>
+	</dependencies>
+
+</project>
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/AbstractDataListeningProperty.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/AbstractDataListeningProperty.java
new file mode 100644
index 0000000..88ce33c
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/AbstractDataListeningProperty.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.datasource.data.listener;
+
+/**
+ * A push-based datasource adapter allows registering a IDataListener instance.
+ * Data listening property defines when data is pushed to a IDataListener.
+ */
+
+public abstract class AbstractDataListeningProperty {
+
+    /**
+     * COUNT_BASED: Data is pushed to a data listener only if the count of
+     * records exceeds the configured threshold value. TIME_BASED: Data is
+     * pushed to a data listener in a periodic manner at the end of each time
+     * interval.
+     */
+    public enum listeningPropertyType {
+        COUNT_BASED,
+        TIME_BASED
+    }
+
+    public abstract listeningPropertyType getListeningPropretyType();
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/CountBasedDataListeningProperty.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/CountBasedDataListeningProperty.java
new file mode 100644
index 0000000..0eb42dc
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/CountBasedDataListeningProperty.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.datasource.data.listener;
+
+/**
+ * A data listening property chosen by a data listener when it wants data to be
+ * pushed when the count of records collected by the adapter exceeds a confiured
+ * count value.
+ */
+public class CountBasedDataListeningProperty extends AbstractDataListeningProperty {
+
+    int numTuples;
+
+    public int getNumTuples() {
+        return numTuples;
+    }
+
+    public void setNumTuples(int numTuples) {
+        this.numTuples = numTuples;
+    }
+
+    public CountBasedDataListeningProperty(int numTuples) {
+        this.numTuples = numTuples;
+    }
+
+    @Override
+    public listeningPropertyType getListeningPropretyType() {
+        return listeningPropertyType.COUNT_BASED;
+    }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/IDataListener.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/IDataListener.java
new file mode 100644
index 0000000..269f060
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/IDataListener.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.datasource.data.listener;
+
+import java.nio.ByteBuffer;
+
+/**
+ * An interface providing a call back API for a subscriber interested in data
+ * received from an external data source via the datasource adapter.
+ */
+public interface IDataListener {
+
+    /**
+     * This method is a call back API and is invoked by an instance of
+     * IPushBasedDatasourceReadAdapter. The caller passes a frame containing new
+     * data. The protocol as to when the caller shall invoke this method is
+     * decided by the configured @see DataListenerProperty .
+     * 
+     * @param aObjects
+     */
+
+    public void dataReceived(ByteBuffer frame);
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/TimeBasedDataListeningProperty.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/TimeBasedDataListeningProperty.java
new file mode 100644
index 0000000..a11cf94
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/TimeBasedDataListeningProperty.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.datasource.data.listener;
+
+/**
+ * A data listening property chosen by a data listener when it needs data to be
+ * pushed in a periodic manner with a configured time-interval.
+ */
+public class TimeBasedDataListeningProperty extends AbstractDataListeningProperty {
+
+    // time interval in secs
+    int interval;
+
+    public int getInteval() {
+        return interval;
+    }
+
+    public void setInterval(int interval) {
+        this.interval = interval;
+    }
+
+    public TimeBasedDataListeningProperty(int interval) {
+        this.interval = interval;
+    }
+
+    @Override
+    public listeningPropertyType getListeningPropretyType() {
+        return listeningPropertyType.TIME_BASED;
+    }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceAdapter.java
new file mode 100644
index 0000000..1baf149
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceAdapter.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.adapter.api;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * A super interface implemented by a data source adapter. An adapter can be a
+ * pull based or push based. This interface provides all common APIs that need
+ * to be implemented by each adapter irrespective of the the kind of
+ * adapter(pull or push).
+ */
+public interface IDatasourceAdapter extends Serializable {
+
+    /**
+     * Represents the kind of data exchange that happens between the adapter and
+     * the external data source. The data exchange can be either pull based or
+     * push based. In the former case (pull), the request for data transfer is
+     * initiated by the adapter. In the latter case (push) the adapter is
+     * required to submit an initial request to convey intent for data.
+     * Subsequently all data transfer requests are initiated by the external
+     * data source.
+     */
+    public enum AdapterDataFlowType {
+        PULL,
+        PUSH
+    }
+
+    /**
+     * An adapter can be used to read from an external data source and may also
+     * allow writing to the external data source. This enum type indicates the
+     * kind of operations supported by the adapter.
+     * 
+     * @caller Compiler uses this method to assert the validity of an operation
+     *         on an external dataset. The type of adapter associated with an
+     *         external dataset determines the set of valid operations allowed
+     *         on the dataset.
+     */
+    public enum AdapterType {
+        READ,
+        WRITE,
+        READ_WRITE
+    }
+
+    /**
+     * An adapter can be a pull or a push based adapter. This method returns the
+     * kind of adapter, that is whether it is a pull based adapter or a push
+     * based adapter.
+     * 
+     * @caller Compiler or wrapper operator: Compiler uses this API to choose
+     *         the right wrapper (push-based) operator that wraps around the
+     *         adapter and provides an iterator interface. If we decide to form
+     *         a single operator that handles both pull and push based adapter
+     *         kinds, then this method will be used by the wrapper operator for
+     *         switching between the logic for interacting with a pull based
+     *         adapter versus a push based adapter.
+     * @return AdapterDataFlowType
+     */
+    public AdapterDataFlowType getAdapterDataFlowType();
+
+    /**
+     * Returns the type of adapter indicating if the adapter can be used for
+     * reading from an external data source or writing to an external data
+     * source or can be used for both purposes.
+     * 
+     * @Caller: Compiler: The compiler uses this API to verify if an operation
+     *          is supported by the adapter. For example, an write query against
+     *          an external dataset will not compile successfully if the
+     *          external dataset was declared with a read_only adapter.
+     * @see AdapterType
+     * @return
+     */
+    public AdapterType getAdapterType();
+
+    /**
+     * Each adapter instance is configured with a set of parameters that are
+     * key-value pairs. When creating an external or a feed dataset, an adapter
+     * instance is used in conjunction with a set of configuration parameters
+     * for the adapter instance. The configuration parameters are stored
+     * internally with the adapter and can be retrieved using this API.
+     * 
+     * @param propertyKey
+     * @return String the value corresponding to the configuration parameter
+     *         represented by the key- attributeKey.
+     */
+    public String getAdapterProperty(String propertyKey);
+
+    /**
+     * Allows setting a configuration property of the adapter with a specified
+     * value.
+     * 
+     * @caller Used by the wrapper operator to modify the behavior of the
+     *         adapter, if required.
+     * @param property
+     *            the property to be set
+     * @param value
+     *            the value for the property
+     */
+    public void setAdapterProperty(String property, String value);
+
+    /**
+     * Configures the IDatasourceAdapter instance.
+     * 
+     * @caller Scenario 1) Called during compilation of DDL statement that
+     *         creates a Feed dataset and associates the adapter with the
+     *         dataset. The (key,value) configuration parameters provided as
+     *         part of the DDL statement are collected by the compiler and
+     *         passed on to this method. The adapter may as part of
+     *         configuration connect with the external data source and determine
+     *         the IAType associated with data residing with the external
+     *         datasource.
+     *         Scenario 2) An adapter instance is created by an ASTERIX operator
+     *         that wraps around the adapter instance. The operator, as part of
+     *         its initialization invokes the configure method. The (key,value)
+     *         configuration parameters are passed on to the operator by the
+     *         compiler. Subsequent to the invocation, the wrapping operator
+     *         obtains the partition constraints (if any). In addition, in the
+     *         case of a read adapter, the wrapping operator obtains the output
+     *         ASTERIX type associated with the data that will be output from
+     *         the adapter.
+     * @param arguments
+     *            A map with key-value pairs that contains the configuration
+     *            parameters for the adapter. The arguments are obtained from
+     *            the metadata. Recall that the DDL to create an external
+     *            dataset or a feed dataset requires using an adapter and
+     *            providing all arguments as a set of (key,value) pairs. These
+     *            arguments are put into the metadata.
+     */
+    public void configure(Map<String, String> arguments, IAType atype) throws Exception;
+
+    /**
+     * Returns a list of partition constraints. A partition constraint can be a
+     * requirement to execute at a particular location or could be cardinality
+     * constraints indicating the number of instances that need to run in
+     * parallel. example, a IDatasourceAdapter implementation written for data
+     * residing on the local file system of a node cannot run on any other node
+     * and thus has a location partition constraint. The location partition
+     * constraint can be expressed as a node IP address or a node controller id.
+     * In the former case, the IP address is translated to a node controller id
+     * running on the node with the given IP address.
+     * 
+     * @Caller The wrapper operator configures its partition constraints from
+     *         the constraints obtained from the adapter.
+     */
+    public AlgebricksPartitionConstraint getPartitionConstraint();
+
+    /**
+     * Allows the adapter to establish connection with the external data source
+     * expressing intent for data and providing any configuration parameters
+     * required by the external data source for the transfer of data. This
+     * method does not result in any data transfer, but is a prerequisite for
+     * any subsequent data transfer to happen between the external data source
+     * and the adapter.
+     * 
+     * @caller This method is called by the wrapping ASTERIX operator that
+     * @param ctx
+     * @throws Exception
+     */
+    public void initialize(IHyracksTaskContext ctx) throws Exception;
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceReadAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceReadAdapter.java
new file mode 100644
index 0000000..737dde0
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceReadAdapter.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.adapter.api;
+
+import edu.uci.ics.asterix.external.data.parser.IDataParser;
+
+public interface IDatasourceReadAdapter extends IDatasourceAdapter {
+
+	/**
+	 * Retrieves data from an external datasource, packs it in frames and uses a
+	 * frame writer to flush the frames to a recipient operator.
+	 * 
+	 * @param partition
+	 *            Multiple instances of the adapter can be configured to
+	 *            retrieve data in parallel. Partition is an integer between 0
+	 *            to N-1 where N is the number of parallel adapter instances.
+	 *            The partition value helps configure a particular instance of
+	 *            the adapter to fetch data.
+	 * @param writer
+	 *            An instance of IFrameWriter that is used to flush frames to
+	 *            the recipient operator
+	 * @throws Exception
+	 */
+	
+	public IDataParser getDataParser(int partition) throws Exception;
+	
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceWriteAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceWriteAdapter.java
new file mode 100644
index 0000000..3cd1464
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceWriteAdapter.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.adapter.api;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import edu.uci.ics.asterix.om.types.IAType;
+
+public interface IDatasourceWriteAdapter extends IDatasourceAdapter {
+
+    /**
+     * Flushes tuples contained in the frame to the dataset stored in an
+     * external data source. If required, the content of the frame is converted
+     * into an appropriate format as required by the external data source.
+     * 
+     * @caller This method is invoked by the wrapping ASTERIX operator when data
+     *         needs to be written to the external data source.
+     * @param sourceAType
+     *            The type associated with the data that is required to be
+     *            written
+     * @param frame
+     *            the frame that needs to be flushed
+     * @param datasourceSpecificParams
+     *            A map containing other parameters that are specific to the
+     *            target data source where data is to be written. For example
+     *            when writing to a data source such as HDFS, an optional
+     *            parameter is the replication factor.
+     * @throws Exception
+     */
+    public void flush(IAType sourceAType, ByteBuffer frame, Map<String, String> datasourceSpecificParams)
+            throws Exception;
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
new file mode 100644
index 0000000..cf0d4f3
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.operator;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+public class ExternalDataScanOperatorDescriptor extends
+		AbstractSingleActivityOperatorDescriptor {
+	private static final long serialVersionUID = 1L;
+
+	private final String adapter;
+	private final Map<String, String> adapterConfiguration;
+	private final IAType atype;
+
+	private transient IDatasourceReadAdapter datasourceReadAdapter;
+
+	public ExternalDataScanOperatorDescriptor(JobSpecification spec,
+			String adapter, Map<String, String> arguments, IAType atype,
+			RecordDescriptor rDesc) {
+		super(spec, 0, 1);
+		recordDescriptors[0] = rDesc;
+		this.adapter = adapter;
+		this.adapterConfiguration = arguments;
+		this.atype = atype;
+	}
+
+	public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+			IRecordDescriptorProvider recordDescProvider, final int partition,
+			int nPartitions) throws HyracksDataException {
+
+		try {
+			datasourceReadAdapter = (IDatasourceReadAdapter) Class.forName(
+					adapter).newInstance();
+			datasourceReadAdapter.configure(adapterConfiguration, atype);
+			datasourceReadAdapter.initialize(ctx);
+		} catch (Exception e) {
+			throw new HyracksDataException("initialization of adapter failed",
+					e);
+		}
+		return new AbstractUnaryOutputSourceOperatorNodePushable() {
+			@Override
+			public void initialize() throws HyracksDataException {
+				writer.open();
+				try {
+					datasourceReadAdapter.getDataParser(partition)
+							.parse(writer);
+				} catch (Exception e) {
+					throw new HyracksDataException(
+							"exception during reading from external data source",
+							e);
+				} finally {
+					writer.close();
+				}
+			}
+		};
+	}
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ADMStreamParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ADMStreamParser.java
new file mode 100644
index 0000000..658645c
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ADMStreamParser.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class ADMStreamParser extends AbstractStreamDataParser {
+
+    public ADMStreamParser() {
+    }
+
+    @Override
+    public void initialize(ARecordType atype, IHyracksTaskContext ctx) {
+        tupleParser = new AdmSchemafullRecordParserFactory(atype).createTupleParser(ctx);
+    }
+
+    @Override
+    public void parse(IFrameWriter writer) throws HyracksDataException {
+        tupleParser.parse(inputStream, writer);
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) {
+    }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/AbstractStreamDataParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/AbstractStreamDataParser.java
new file mode 100644
index 0000000..403f197
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/AbstractStreamDataParser.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import java.io.InputStream;
+import java.util.HashMap;
+
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.LongParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+
+public abstract class AbstractStreamDataParser implements IDataStreamParser {
+
+    public static final String KEY_DELIMITER = "delimiter";
+    
+    protected static final HashMap<ATypeTag, IValueParserFactory> typeToValueParserFactMap = new HashMap<ATypeTag, IValueParserFactory>();
+
+    static {
+        typeToValueParserFactMap.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
+        typeToValueParserFactMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
+        typeToValueParserFactMap.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
+        typeToValueParserFactMap.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
+        typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
+    }
+
+    protected ITupleParser tupleParser;
+    protected IFrameWriter frameWriter;
+    protected InputStream inputStream;
+
+    @Override
+    public abstract void initialize(ARecordType recordType, IHyracksTaskContext ctx);
+
+    @Override
+    public abstract void parse(IFrameWriter frameWriter) throws HyracksDataException;
+    
+    @Override
+    public void setInputStream(InputStream in) {
+        inputStream = in;
+        
+    }
+
+    @Override
+    public InputStream getInputStream() {
+        return inputStream;
+    }
+   
+   
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/DelimitedDataStreamParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/DelimitedDataStreamParser.java
new file mode 100644
index 0000000..11c70ac8
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/DelimitedDataStreamParser.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+
+public class DelimitedDataStreamParser extends AbstractStreamDataParser {
+
+    protected Character delimiter = defaultDelimiter;
+
+    protected static final Character defaultDelimiter = new Character('\n');
+
+    public Character getDelimiter() {
+        return delimiter;
+    }
+
+    public DelimitedDataStreamParser(Character delimiter) {
+        this.delimiter = delimiter;
+    }
+
+    public DelimitedDataStreamParser() {
+    }
+
+    @Override
+    public void initialize(ARecordType recordType, IHyracksTaskContext ctx) {
+        int n = recordType.getFieldTypes().length;
+        IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
+        for (int i = 0; i < n; i++) {
+            ATypeTag tag = recordType.getFieldTypes()[i].getTypeTag();
+            IValueParserFactory vpf = typeToValueParserFactMap.get(tag);
+            if (vpf == null) {
+                throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
+            }
+            fieldParserFactories[i] = vpf;
+        }
+        tupleParser = new NtDelimitedDataTupleParserFactory(recordType, fieldParserFactories, delimiter)
+                .createTupleParser(ctx);
+    }
+
+    @Override
+    public void parse(IFrameWriter writer) throws HyracksDataException {
+        tupleParser.parse(inputStream, writer);
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) {
+        String delimiterArg = configuration.get(KEY_DELIMITER);
+        if (delimiterArg != null) {
+            delimiter = delimiterArg.charAt(0);
+        } else {
+            delimiter = '\n';
+        }
+    }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IDataParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IDataParser.java
new file mode 100644
index 0000000..eb7daf7
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IDataParser.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Represents an parser that processes an input stream to form records of a
+ * given type. The parser creates frames that are flushed using a frame writer.
+ */
+public interface IDataParser {
+
+    /**
+     * @param atype
+     *            The record type associated with each record output by the
+     *            parser
+     * @param configuration
+     *            Any configuration parameters for the parser
+     */
+    public void configure(Map<String, String> configuration);
+
+    /**
+     * Initializes the instance. An implementation may use the passed-in
+     * configuration parameters, the output record type to initialize itself so
+     * that it can parse an input stream to form records of the given type.
+     * 
+     * @param configuration
+     *            Any configuration parameters for the parser
+     * @param ctx
+     *            The runtime HyracksStageletContext.
+     */
+    public void initialize(ARecordType recordType, IHyracksTaskContext ctx);
+
+    /**
+     * Parses the input stream to produce records of the configured type and
+     * uses the frame writer instance to flush frames containing the produced
+     * records.
+     * 
+     * @param in
+     *            The source input stream
+     * @param frameWriter
+     *            A frame writer instance that is used for flushing frames to
+     *            the recipient operator
+     * @throws HyracksDataException
+     */
+    public void parse(IFrameWriter frameWriter) throws HyracksDataException;
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IDataStreamParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IDataStreamParser.java
new file mode 100644
index 0000000..1425707
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IDataStreamParser.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import java.io.InputStream;
+
+public interface IDataStreamParser extends IDataParser {
+
+    public void setInputStream(InputStream in);
+    
+    public InputStream getInputStream();
+    
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IManagedDataParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IManagedDataParser.java
new file mode 100644
index 0000000..7c9bb7d
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IManagedDataParser.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+
+public interface IManagedDataParser extends IDataParser {
+
+    public IManagedTupleParser getManagedTupleParser();
+    
+    public void setAdapter(IManagedFeedAdapter adapter);
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IManagedTupleParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IManagedTupleParser.java
new file mode 100644
index 0000000..14f6372
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IManagedTupleParser.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+
+public interface IManagedTupleParser extends ITupleParser {
+
+    public void suspend() throws Exception;
+
+    public void resume() throws Exception;
+
+    public void stop() throws Exception;
+    
+    public void alter(Map<String,String> alterParams) throws Exception;
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmRecordParserFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmRecordParserFactory.java
new file mode 100644
index 0000000..3d31489
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmRecordParserFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+
+public class ManagedAdmRecordParserFactory extends AdmSchemafullRecordParserFactory {
+
+    private final IManagedFeedAdapter adapter;
+
+    public ManagedAdmRecordParserFactory(ARecordType recType, IManagedFeedAdapter adapter) {
+        super(recType);
+        this.adapter = adapter;
+    }
+
+    @Override
+    public ITupleParser createTupleParser(final IHyracksTaskContext ctx) {
+        return new ManagedAdmTupleParser(ctx, recType, adapter);
+    }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmStreamParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmStreamParser.java
new file mode 100644
index 0000000..bfe9fe0
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmStreamParser.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class ManagedAdmStreamParser extends ADMStreamParser implements IManagedDataParser{
+
+    private  IManagedFeedAdapter adapter;
+
+    @Override
+    public void initialize(ARecordType atype, IHyracksTaskContext ctx) {
+        tupleParser = new ManagedAdmRecordParserFactory(atype, adapter).createTupleParser(ctx);
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) {
+
+    }
+
+    @Override
+    public IManagedTupleParser getManagedTupleParser() {
+         return (IManagedTupleParser)tupleParser;
+    }
+
+    @Override
+    public void setAdapter(IManagedFeedAdapter adapter) {
+        this.adapter = adapter;        
+    }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmTupleParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmTupleParser.java
new file mode 100644
index 0000000..b7215a3
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmTupleParser.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.asterix.adm.parser.nontagged.AdmLexer;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter.OperationState;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.operators.file.AdmTupleParser;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class ManagedAdmTupleParser extends AdmTupleParser implements IManagedTupleParser {
+
+    private OperationState state;
+    private List<OperationState> nextState;
+    private final IManagedFeedAdapter adapter;
+    private long tupleInterval;
+
+    public static final String TUPLE_INTERVAL_KEY = "tuple-interval";
+
+    public ManagedAdmTupleParser(IHyracksTaskContext ctx, ARecordType recType, IManagedFeedAdapter adapter) {
+        super(ctx, recType);
+        nextState = new ArrayList<OperationState>();
+        this.adapter = adapter;
+        this.tupleInterval = adapter.getAdapterProperty(TUPLE_INTERVAL_KEY) == null ? 0 : Long.parseLong(adapter
+                .getAdapterProperty(TUPLE_INTERVAL_KEY));
+    }
+
+    public ManagedAdmTupleParser(IHyracksTaskContext ctx, ARecordType recType, long tupleInterval,
+            IManagedFeedAdapter adapter) {
+        super(ctx, recType);
+        nextState = new ArrayList<OperationState>();
+        this.adapter = adapter;
+        this.tupleInterval = tupleInterval;
+    }
+
+    @Override
+    public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
+        admLexer = new AdmLexer(in);
+        appender.reset(frame, true);
+        int tupleNum = 0;
+        try {
+            while (true) {
+                tb.reset();
+                if (!parseAdmInstance(recType, true, dos)) {
+                    break;
+                }
+                tb.addFieldEndOffset();
+                processNextTuple(nextState.isEmpty() ? null : nextState.get(0), writer);
+                Thread.currentThread().sleep(tupleInterval);
+                tupleNum++;
+            }
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(frame, writer);
+            }
+        } catch (AsterixException ae) {
+            throw new HyracksDataException(ae);
+        } catch (IOException ioe) {
+            throw new HyracksDataException(ioe);
+        } catch (InterruptedException ie) {
+            throw new HyracksDataException(ie);
+        }
+    }
+
+    private void addTupleToFrame(IFrameWriter writer, boolean forceFlush) throws HyracksDataException {
+        boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+        if (!success) {
+            FrameUtils.flushFrame(frame, writer);
+            appender.reset(frame, true);
+            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                throw new IllegalStateException();
+            }
+        }
+
+        if (forceFlush) {
+            FrameUtils.flushFrame(frame, writer);
+        }
+
+    }
+
+    private void processNextTuple(OperationState feedState, IFrameWriter writer) throws HyracksDataException {
+        try {
+            if (feedState != null) {
+                switch (state) {
+                    case SUSPENDED:
+                        suspendOperation(writer);
+                        break;
+                    case STOPPED:
+                        stopOperation(writer);
+                        break;
+                }
+            } else {
+                addTupleToFrame(writer, false);
+            }
+        } catch (HyracksDataException hde) {
+            throw hde;
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void suspendOperation(IFrameWriter writer) throws HyracksDataException, Exception {
+        nextState.remove(0);
+        addTupleToFrame(writer, false);
+        adapter.beforeSuspend();
+        synchronized (this) {
+            this.wait();
+            adapter.beforeResume();
+        }
+    }
+
+    private void stopOperation(IFrameWriter writer) throws HyracksDataException, Exception {
+        nextState.remove(0);
+        addTupleToFrame(writer, true);
+        adapter.beforeStop();
+        writer.close();
+    }
+
+    @Override
+    public void suspend() throws Exception {
+        nextState.add(OperationState.SUSPENDED);
+    }
+
+    @Override
+    public void resume() throws Exception {
+        synchronized (this) {
+            this.notifyAll();
+        }
+    }
+
+    @Override
+    public void stop() throws Exception {
+        nextState.add(OperationState.STOPPED);
+    }
+
+    @Override
+    public void alter(Map<String, String> alterParams) throws Exception {
+        if (alterParams.get(TUPLE_INTERVAL_KEY) != null) {
+            tupleInterval = Long.parseLong(alterParams.get(TUPLE_INTERVAL_KEY));
+        }
+    }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataRecordParserFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataRecordParserFactory.java
new file mode 100644
index 0000000..37a7162
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataRecordParserFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+
+public class ManagedDelimitedDataRecordParserFactory extends NtDelimitedDataTupleParserFactory {
+
+    private final IManagedFeedAdapter adapter;
+
+    public ManagedDelimitedDataRecordParserFactory(IValueParserFactory[] fieldParserFactories, char fieldDelimiter,
+            ARecordType recType, IManagedFeedAdapter adapter) {
+        super(recType, fieldParserFactories, fieldDelimiter);
+        this.adapter = adapter;
+    }
+    
+    @Override
+    public ITupleParser createTupleParser(final IHyracksTaskContext ctx) {
+        return new ManagedDelimitedDataTupleParser(ctx, recordType, adapter, valueParserFactories, fieldDelimiter);
+    }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataStreamParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataStreamParser.java
new file mode 100644
index 0000000..f38a95b
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataStreamParser.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+
+public class ManagedDelimitedDataStreamParser extends DelimitedDataStreamParser implements IManagedDataParser {
+
+    private IManagedFeedAdapter adapter;
+   
+    @Override
+    public void initialize(ARecordType recordType, IHyracksTaskContext ctx) {
+        int n = recordType.getFieldTypes().length;
+        IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
+        for (int i = 0; i < n; i++) {
+            ATypeTag tag = recordType.getFieldTypes()[i].getTypeTag();
+            IValueParserFactory vpf = typeToValueParserFactMap.get(tag);
+            if (vpf == null) {
+                throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
+            }
+            fieldParserFactories[i] = vpf;
+        }
+        tupleParser = new ManagedDelimitedDataRecordParserFactory(fieldParserFactories, delimiter.charValue(),
+                recordType, adapter).createTupleParser(ctx);
+    }
+
+    @Override
+    public IManagedTupleParser getManagedTupleParser() {
+        return (IManagedTupleParser) tupleParser;
+    }
+
+    @Override
+    public void setAdapter(IManagedFeedAdapter adapter) {
+        this.adapter = adapter;
+    }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataTupleParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataTupleParser.java
new file mode 100644
index 0000000..325eff4
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataTupleParser.java
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.asterix.builders.IARecordBuilder;
+import edu.uci.ics.asterix.builders.RecordBuilder;
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter.OperationState;
+import edu.uci.ics.asterix.om.base.AMutableString;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.runtime.operators.file.DelimitedDataTupleParser;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParser;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+
+public class ManagedDelimitedDataTupleParser extends DelimitedDataTupleParser implements IManagedTupleParser {
+
+    private List<OperationState> nextState;
+    private IManagedFeedAdapter adapter;
+    private long tupleInterval;
+
+    public static final String TUPLE_INTERVAL_KEY = "tuple-interval";
+
+    public ManagedDelimitedDataTupleParser(IHyracksTaskContext ctx, ARecordType recType, IManagedFeedAdapter adapter,
+            IValueParserFactory[] valueParserFactories, char fieldDelimter) {
+        super(ctx, recType, valueParserFactories, fieldDelimter);
+        this.adapter = adapter;
+        nextState = new ArrayList<OperationState>();
+        tupleInterval = adapter.getAdapterProperty(TUPLE_INTERVAL_KEY) == null ? 0 : Long.parseLong(adapter
+                .getAdapterProperty(TUPLE_INTERVAL_KEY));
+    }
+
+    @Override
+    public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
+        try {
+            IValueParser[] valueParsers = new IValueParser[valueParserFactories.length];
+            for (int i = 0; i < valueParserFactories.length; ++i) {
+                valueParsers[i] = valueParserFactories[i].createValueParser();
+            }
+
+            appender.reset(frame, true);
+            tb = new ArrayTupleBuilder(1);
+            recDos = tb.getDataOutput();
+
+            ArrayBackedValueStorage fieldValueBuffer = new ArrayBackedValueStorage();
+            DataOutput fieldValueBufferOutput = fieldValueBuffer.getDataOutput();
+            IARecordBuilder recBuilder = new RecordBuilder();
+            recBuilder.reset(recType);
+            recBuilder.init();
+
+            int n = recType.getFieldNames().length;
+            byte[] fieldTypeTags = new byte[n];
+            for (int i = 0; i < n; i++) {
+                ATypeTag tag = recType.getFieldTypes()[i].getTypeTag();
+                fieldTypeTags[i] = tag.serialize();
+            }
+
+            int[] fldIds = new int[n];
+            ArrayBackedValueStorage[] nameBuffers = new ArrayBackedValueStorage[n];
+            AMutableString str = new AMutableString(null);
+            for (int i = 0; i < n; i++) {
+                String name = recType.getFieldNames()[i];
+                fldIds[i] = recBuilder.getFieldId(name);
+                if (fldIds[i] < 0) {
+                    if (!recType.isOpen()) {
+                        throw new HyracksDataException("Illegal field " + name + " in closed type " + recType);
+                    } else {
+                        nameBuffers[i] = new ArrayBackedValueStorage();
+                        fieldNameToBytes(name, str, nameBuffers[i]);
+                    }
+                }
+            }
+
+            FieldCursor cursor = new FieldCursor(new InputStreamReader(in));
+            while (cursor.nextRecord()) {
+                tb.reset();
+                recBuilder.reset(recType);
+                recBuilder.init();
+
+                for (int i = 0; i < valueParsers.length; ++i) {
+                    if (!cursor.nextField()) {
+                        break;
+                    }
+                    fieldValueBuffer.reset();
+                    fieldValueBufferOutput.writeByte(fieldTypeTags[i]);
+                    valueParsers[i].parse(cursor.getBuffer(), cursor.getfStart(),
+                            cursor.getfEnd() - cursor.getfStart(), fieldValueBufferOutput);
+                    if (fldIds[i] < 0) {
+                        recBuilder.addField(nameBuffers[i], fieldValueBuffer);
+                    } else {
+                        recBuilder.addField(fldIds[i], fieldValueBuffer);
+                    }
+                }
+                recBuilder.write(recDos, true);
+                processNextTuple(nextState.isEmpty() ? null : nextState.get(0), writer);
+                Thread.currentThread().sleep(tupleInterval);
+            }
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(frame, writer);
+            }
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        } catch (InterruptedException ie) {
+            throw new HyracksDataException(ie);
+        }
+    }
+
+    private void addTupleToFrame(IFrameWriter writer, boolean forceFlush) throws HyracksDataException {
+        tb.addFieldEndOffset();
+        boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+        if (!success) {
+            FrameUtils.flushFrame(frame, writer);
+            appender.reset(frame, true);
+            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                throw new IllegalStateException();
+            }
+        }
+
+        if (forceFlush) {
+            FrameUtils.flushFrame(frame, writer);
+        }
+
+    }
+
+    private void processNextTuple(OperationState feedState, IFrameWriter writer) throws HyracksDataException {
+        try {
+            if (feedState != null) {
+                switch (feedState) {
+                    case SUSPENDED:
+                        suspendOperation(writer);
+                        break;
+                    case STOPPED:
+                        stopOperation(writer);
+                        break;
+                }
+            } else {
+                addTupleToFrame(writer, false);
+            }
+        } catch (HyracksDataException hde) {
+            throw hde;
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void suspendOperation(IFrameWriter writer) throws HyracksDataException, Exception {
+        nextState.remove(0);
+        addTupleToFrame(writer, false);
+        adapter.beforeSuspend();
+        synchronized (this) {
+            this.wait();
+            adapter.beforeResume();
+        }
+    }
+
+    private void stopOperation(IFrameWriter writer) throws HyracksDataException, Exception {
+        nextState.remove(0);
+        addTupleToFrame(writer, false);
+        adapter.beforeStop();
+        adapter.stop();
+    }
+
+    @Override
+    public void suspend() throws Exception {
+        nextState.add(OperationState.SUSPENDED);
+    }
+
+    @Override
+    public void resume() throws Exception {
+        synchronized (this) {
+            this.notifyAll();
+        }
+    }
+
+    @Override
+    public void stop() throws Exception {
+        nextState.add(OperationState.STOPPED);
+    }
+
+    @Override
+    public void alter(Map<String, String> alterParams) throws Exception {
+        if (alterParams.get(TUPLE_INTERVAL_KEY) != null) {
+            tupleInterval = Long.parseLong(alterParams.get(TUPLE_INTERVAL_KEY));
+        }
+    }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
new file mode 100644
index 0000000..320a29f
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.external.data.parser.IDataParser;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.LongParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+
+/**
+ * Represents the base class that is required to be extended by every
+ * implementation of the IDatasourceAdapter interface.
+ */
+public abstract class AbstractDatasourceAdapter implements IDatasourceAdapter {
+
+    private static final long serialVersionUID = -3510610289692452466L;
+
+    protected Map<String, String> configuration;
+
+    protected AlgebricksPartitionConstraint partitionConstraint;
+
+    protected IAType atype;
+
+    protected IHyracksTaskContext ctx;
+
+    protected IDataParser dataParser;
+
+    protected static final HashMap<ATypeTag, IValueParserFactory> typeToValueParserFactMap = new HashMap<ATypeTag, IValueParserFactory>();
+
+    protected static final HashMap<String, String> formatToParserMap = new HashMap<String, String>();
+
+    protected static final HashMap<String, String> formatToManagedParserMap = new HashMap<String, String>();
+
+    protected AdapterDataFlowType dataFlowType;
+
+    protected AdapterType adapterType;
+
+    static {
+        typeToValueParserFactMap.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
+        typeToValueParserFactMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
+        typeToValueParserFactMap.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
+        typeToValueParserFactMap.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
+        typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
+
+        formatToParserMap.put("delimited-text", "edu.uci.ics.asterix.external.data.parser.DelimitedDataStreamParser");
+        formatToParserMap.put("adm", "edu.uci.ics.asterix.external.data.parser.ADMStreamParser");
+
+        formatToManagedParserMap.put("delimited-text",
+                "edu.uci.ics.asterix.external.data.parser.ManagedDelimitedDataStreamParser");
+        formatToManagedParserMap.put("adm", "edu.uci.ics.asterix.external.data.parser.ManagedAdmStreamParser");
+
+    }
+
+    public static final String KEY_FORMAT = "format";
+    public static final String KEY_PARSER = "parser";
+
+    public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
+    public static final String FORMAT_ADM = "adm";
+
+    abstract public void initialize(IHyracksTaskContext ctx) throws Exception;
+
+    abstract public void configure(Map<String, String> arguments, IAType atype) throws Exception;
+
+    abstract public AdapterDataFlowType getAdapterDataFlowType();
+
+    abstract public AdapterType getAdapterType();
+
+    public AlgebricksPartitionConstraint getPartitionConstraint() {
+        return partitionConstraint;
+    }
+
+    public void setAdapterProperty(String property, String value) {
+        configuration.put(property, value);
+    }
+
+    public IDataParser getParser() {
+        return dataParser;
+    }
+
+    public void setParser(IDataParser dataParser) {
+        this.dataParser = dataParser;
+    }
+
+    public String getAdapterProperty(String attribute) {
+        return configuration.get(attribute);
+    }
+
+    public Map<String, String> getConfiguration() {
+        return configuration;
+    }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
new file mode 100644
index 0000000..0673a24
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
@@ -0,0 +1,120 @@
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.external.data.parser.IDataParser;
+import edu.uci.ics.asterix.external.data.parser.IDataStreamParser;
+import edu.uci.ics.asterix.external.data.parser.ManagedDelimitedDataStreamParser;
+import edu.uci.ics.asterix.feed.intake.FeedStream;
+import edu.uci.ics.asterix.feed.intake.RSSFeedClient;
+import edu.uci.ics.asterix.feed.managed.adapter.IMutableFeedAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+
+public class CNNFeedAdapter extends RSSFeedAdapter  implements
+		IDatasourceAdapter, IMutableFeedAdapter {
+
+	private List<String> feedURLs = new ArrayList<String>();
+	private String id_prefix = "";
+
+	public static final String KEY_RSS_URL = "topic";
+	public static final String KEY_INTERVAL = "interval";
+
+	private static Map<String, String> topicFeeds = new HashMap<String, String>();
+
+	public static final String TOP_STORIES = "topstories";
+	public static final String WORLD = "world";
+	public static final String US = "us";
+	public static final String SPORTS = "sports";
+	public static final String BUSINESS = "business";
+	public static final String POLITICS = "politics";
+	public static final String CRIME = "crime";
+	public static final String TECHNOLOGY = "technology";
+	public static final String HEALTH = "health";
+	public static final String ENTERNTAINMENT = "entertainemnt";
+	public static final String TRAVEL = "travel";
+	public static final String LIVING = "living";
+	public static final String VIDEO = "video";
+	public static final String STUDENT = "student";
+	public static final String POPULAR = "popular";
+	public static final String RECENT = "recent";
+
+	private void initTopics() {
+		topicFeeds
+				.put(TOP_STORIES, "http://rss.cnn.com/rss/cnn_topstories.rss");
+		topicFeeds.put(WORLD, "http://rss.cnn.com/rss/cnn_world.rss");
+		topicFeeds.put(US, "http://rss.cnn.com/rss/cnn_us.rss");
+		topicFeeds.put(SPORTS, "http://rss.cnn.com/rss/si_topstories.rss");
+		topicFeeds.put(BUSINESS, "http://rss.cnn.com/rss/money_latest.rss");
+		topicFeeds.put(POLITICS, "http://rss.cnn.com/rss/cnn_allpolitics.rss");
+		topicFeeds.put(CRIME, "http://rss.cnn.com/rss/cnn_crime.rss");
+		topicFeeds.put(TECHNOLOGY, "http://rss.cnn.com/rss/cnn_tech.rss");
+		topicFeeds.put(HEALTH, "http://rss.cnn.com/rss/cnn_health.rss");
+		topicFeeds
+				.put(ENTERNTAINMENT, "http://rss.cnn.com/rss/cnn_showbiz.rss");
+		topicFeeds.put(LIVING, "http://rss.cnn.com/rss/cnn_living.rss");
+		topicFeeds.put(VIDEO, "http://rss.cnn.com/rss/cnn_freevideo.rss");
+		topicFeeds.put(TRAVEL, "http://rss.cnn.com/rss/cnn_travel.rss");
+		topicFeeds.put(STUDENT, "http://rss.cnn.com/rss/cnn_studentnews.rss");
+		topicFeeds.put(POPULAR, "http://rss.cnn.com/rss/cnn_mostpopular.rss");
+		topicFeeds.put(RECENT, "http://rss.cnn.com/rss/cnn_latest.rss");
+	}
+
+	@Override
+	public IDataParser getDataParser(int partition) throws Exception {
+		if (dataParser == null) {
+			dataParser = new ManagedDelimitedDataStreamParser();
+			dataParser.configure(configuration);
+			dataParser.initialize((ARecordType) atype, ctx);
+			RSSFeedClient feedClient = new RSSFeedClient(this,
+					feedURLs.get(partition), id_prefix);
+			FeedStream feedStream = new FeedStream(feedClient, ctx);
+			((IDataStreamParser) dataParser).setInputStream(feedStream);
+		}
+		return dataParser;
+	}
+
+	@Override
+	public void configure(Map<String, String> arguments, IAType atype)
+			throws Exception {
+		configuration = arguments;
+		this.atype = atype;
+		String rssURLProperty = configuration.get(KEY_RSS_URL);
+		if (rssURLProperty == null) {
+			throw new IllegalArgumentException("no rss url provided");
+		}
+		initializeFeedURLs(rssURLProperty);
+		configurePartitionConstraints();
+
+	}
+
+	private void initializeFeedURLs(String rssURLProperty) {
+		feedURLs.clear();
+		String[] rssTopics = rssURLProperty.split(",");
+		initTopics();
+		for (String topic : rssTopics) {
+			String feedURL = topicFeeds.get(topic);
+			if (feedURL == null) {
+				throw new IllegalArgumentException(" unknown topic :" + topic
+						+ " please choose from the following "
+						+ getValidTopics());
+			}
+			feedURLs.add(feedURL);
+		}
+	}
+
+	private static String getValidTopics() {
+		StringBuilder builder = new StringBuilder();
+		for (String key : topicFeeds.keySet()) {
+			builder.append(key);
+			builder.append(" ");
+		}
+		return new String(builder);
+	}
+
+}
+
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
new file mode 100644
index 0000000..205c855
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -0,0 +1,291 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
+import edu.uci.ics.asterix.external.data.parser.DelimitedDataStreamParser;
+import edu.uci.ics.asterix.external.data.parser.IDataParser;
+import edu.uci.ics.asterix.external.data.parser.IDataStreamParser;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class HDFSAdapter extends AbstractDatasourceAdapter implements IDatasourceReadAdapter {
+
+    private String hdfsUrl;
+    private List<String> hdfsPaths;
+    private String inputFormatClassName;
+    private InputSplit[] inputSplits;
+    private JobConf conf;
+    private IHyracksTaskContext ctx;
+    private Reporter reporter;
+    private boolean isDelimited;
+    private Character delimiter;
+    private static final Map<String, String> formatClassNames = new HashMap<String, String>();
+
+    public static final String KEY_HDFS_URL = "hdfs";
+    public static final String KEY_HDFS_PATH = "path";
+    public static final String KEY_INPUT_FORMAT = "input-format";
+
+    public static final String INPUT_FORMAT_TEXT = "text-input-format";
+    public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
+
+    static {
+        formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
+        formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
+    }
+
+    public String getHdfsUrl() {
+        return hdfsUrl;
+    }
+
+    public void setHdfsUrl(String hdfsUrl) {
+        this.hdfsUrl = hdfsUrl;
+    }
+
+    public List<String> getHdfsPaths() {
+        return hdfsPaths;
+    }
+
+    public void setHdfsPaths(List<String> hdfsPaths) {
+        this.hdfsPaths = hdfsPaths;
+    }
+
+    @Override
+    public void configure(Map<String, String> arguments, IAType atype) throws Exception {
+        configuration = arguments;
+        configureFormat();
+        configureJobConf();
+        configurePartitionConstraint();
+        this.atype = atype;
+    }
+
+    private void configureFormat() throws Exception {
+        String format = configuration.get(KEY_INPUT_FORMAT);
+        inputFormatClassName = formatClassNames.get(format);
+        if (inputFormatClassName == null) {
+            throw new Exception("format " + format + " not supported");
+        }
+
+        String parserClass = configuration.get(KEY_PARSER);
+        if (parserClass == null) {
+            if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
+                parserClass = formatToParserMap.get(FORMAT_DELIMITED_TEXT);
+            } else if (FORMAT_ADM.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
+                parserClass = formatToParserMap.get(FORMAT_ADM);
+            }
+        }
+
+        dataParser = (IDataParser) Class.forName(parserClass).newInstance();
+        dataParser.configure(configuration);
+    }
+
+    private void configurePartitionConstraint() throws Exception {
+        InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, 0);
+        partitionConstraint = new AlgebricksCountPartitionConstraint(inputSplits.length);
+        hdfsPaths = new ArrayList<String>();
+        for (String hdfsPath : configuration.get(KEY_HDFS_PATH).split(",")) {
+            hdfsPaths.add(hdfsPath);
+        }
+    }
+
+    private ITupleParserFactory createTupleParserFactory(ARecordType recType) {
+        if (isDelimited) {
+            int n = recType.getFieldTypes().length;
+            IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
+            for (int i = 0; i < n; i++) {
+                ATypeTag tag = recType.getFieldTypes()[i].getTypeTag();
+                IValueParserFactory vpf = typeToValueParserFactMap.get(tag);
+                if (vpf == null) {
+                    throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
+                }
+                fieldParserFactories[i] = vpf;
+            }
+            return new NtDelimitedDataTupleParserFactory(recType, fieldParserFactories, delimiter);
+        } else {
+            return new AdmSchemafullRecordParserFactory(recType);
+        }
+    }
+
+    private JobConf configureJobConf() throws Exception {
+        hdfsUrl = configuration.get(KEY_HDFS_URL);
+        conf = new JobConf();
+        conf.set("fs.default.name", hdfsUrl);
+        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+        conf.setClassLoader(HDFSAdapter.class.getClassLoader());
+        conf.set("mapred.input.dir", configuration.get(KEY_HDFS_PATH));
+        conf.set("mapred.input.format.class", formatClassNames.get(configuration.get(KEY_INPUT_FORMAT)));
+        return conf;
+    }
+
+    public AdapterDataFlowType getAdapterDataFlowType() {
+        return AdapterDataFlowType.PULL;
+    }
+
+    public AdapterType getAdapterType() {
+        return AdapterType.READ_WRITE;
+    }
+
+    @Override
+    public void initialize(IHyracksTaskContext ctx) throws Exception {
+        this.ctx = ctx;
+        inputSplits = conf.getInputFormat().getSplits(conf, 0);
+        dataParser.initialize((ARecordType) atype, ctx);
+        reporter = new Reporter() {
+
+            @Override
+            public Counter getCounter(Enum<?> arg0) {
+                return null;
+            }
+
+            @Override
+            public Counter getCounter(String arg0, String arg1) {
+                return null;
+            }
+
+            @Override
+            public InputSplit getInputSplit() throws UnsupportedOperationException {
+                return null;
+            }
+
+            @Override
+            public void incrCounter(Enum<?> arg0, long arg1) {
+            }
+
+            @Override
+            public void incrCounter(String arg0, String arg1, long arg2) {
+            }
+
+            @Override
+            public void setStatus(String arg0) {
+            }
+
+            @Override
+            public void progress() {
+            }
+        };
+    }
+
+    @Override
+    public IDataParser getDataParser(int partition) throws Exception {
+        Path path = new Path(inputSplits[partition].toString());
+        FileSystem fs = FileSystem.get(conf);
+        InputStream inputStream;
+        if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
+            SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
+            RecordReader reader = format.getRecordReader(inputSplits[partition], conf, reporter);
+            inputStream = new SequenceToTextStream(reader, ctx);
+        } else {
+            try {
+                inputStream = fs.open(((org.apache.hadoop.mapred.FileSplit) inputSplits[partition]).getPath());
+            } catch (FileNotFoundException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        if (dataParser instanceof IDataStreamParser) {
+            ((IDataStreamParser) dataParser).setInputStream(inputStream);
+        } else {
+            throw new IllegalArgumentException(" parser not compatible");
+        }
+
+        return dataParser;
+    }
+
+}
+
+class SequenceToTextStream extends InputStream {
+
+    private ByteBuffer buffer;
+    private int capacity;
+    private RecordReader reader;
+    private boolean readNext = true;
+    private final Object key;
+    private final Text value;
+
+    public SequenceToTextStream(RecordReader reader, IHyracksTaskContext ctx) throws Exception {
+        capacity = ctx.getFrameSize();
+        buffer = ByteBuffer.allocate(capacity);
+        this.reader = reader;
+        key = reader.createKey();
+        try {
+            value = (Text) reader.createValue();
+        } catch (ClassCastException cce) {
+            throw new Exception("context is not of type org.apache.hadoop.io.Text"
+                    + " type not supported in sequence file format", cce);
+        }
+        initialize();
+    }
+
+    private void initialize() throws Exception {
+        boolean hasMore = reader.next(key, value);
+        if (!hasMore) {
+            buffer.limit(0);
+        } else {
+            buffer.position(0);
+            buffer.limit(capacity);
+            buffer.put(value.getBytes());
+            buffer.put("\n".getBytes());
+            buffer.flip();
+        }
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (!buffer.hasRemaining()) {
+            boolean hasMore = reader.next(key, value);
+            if (!hasMore) {
+                return -1;
+            }
+            buffer.position(0);
+            buffer.limit(capacity);
+            buffer.put(value.getBytes());
+            buffer.put("\n".getBytes());
+            buffer.flip();
+            return buffer.get();
+        } else {
+            return buffer.get();
+        }
+
+    }
+
+}
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
new file mode 100644
index 0000000..44dab4c
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
+import edu.uci.ics.asterix.external.data.parser.IDataParser;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class HiveAdapter extends AbstractDatasourceAdapter implements
+		IDatasourceReadAdapter {
+
+	public static final String HIVE_DATABASE = "database";
+	public static final String HIVE_TABLE = "table";
+	public static final String HIVE_HOME = "hive-home";
+	public static final String HIVE_METASTORE_URI = "metastore-uri";
+	public static final String HIVE_WAREHOUSE_DIR = "warehouse-dir";
+	public static final String HIVE_METASTORE_RAWSTORE_IMPL = "rawstore-impl";
+
+	private HDFSAdapter hdfsAdapter;
+
+	@Override
+	public AdapterType getAdapterType() {
+		return AdapterType.READ;
+	}
+
+	@Override
+	public AdapterDataFlowType getAdapterDataFlowType() {
+		return AdapterDataFlowType.PULL;
+	}
+
+	@Override
+	public void configure(Map<String, String> arguments, IAType atype)
+			throws Exception {
+		configuration = arguments;
+		this.atype = atype;
+		configureHadoopAdapter();
+	}
+
+	private void configureHadoopAdapter() throws Exception {
+		String database = configuration.get(HIVE_DATABASE);
+		String tablePath = null;
+		if (database == null) {
+			tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/"
+					+ configuration.get(HIVE_TABLE);
+		} else {
+			tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + tablePath
+					+ ".db" + "/" + configuration.get(HIVE_TABLE);
+		}
+		configuration.put(HDFSAdapter.KEY_HDFS_PATH, tablePath);
+		if (!configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT)) {
+			throw new IllegalArgumentException("format"
+					+ configuration.get(KEY_FORMAT) + " is not supported");
+		}
+
+		if (!(configuration.get(HDFSAdapter.KEY_INPUT_FORMAT).equals(
+				HDFSAdapter.INPUT_FORMAT_TEXT) || configuration.get(
+				HDFSAdapter.KEY_INPUT_FORMAT).equals(
+				HDFSAdapter.INPUT_FORMAT_SEQUENCE))) {
+			throw new IllegalArgumentException("file input format"
+					+ configuration.get(HDFSAdapter.KEY_INPUT_FORMAT)
+					+ " is not supported");
+		}
+
+		hdfsAdapter = new HDFSAdapter();
+		hdfsAdapter.configure(configuration, atype);
+	}
+
+	@Override
+	public void initialize(IHyracksTaskContext ctx) throws Exception {
+		hdfsAdapter.initialize(ctx);
+	}
+
+    @Override
+    public IDataParser getDataParser(int partition) throws Exception {
+        return hdfsAdapter.getDataParser(partition);
+    }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
new file mode 100644
index 0000000..2449db0
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
+import edu.uci.ics.asterix.external.data.parser.IDataParser;
+import edu.uci.ics.asterix.external.data.parser.IDataStreamParser;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+
+public class NCFileSystemAdapter extends AbstractDatasourceAdapter implements IDatasourceReadAdapter {
+
+    private static final long serialVersionUID = -4154256369973615710L;
+    protected FileSplit[] fileSplits;
+
+    public class Constants {
+        public static final String KEY_SPLITS = "path";
+        public static final String KEY_FORMAT = "format";
+        public static final String KEY_PARSER = "parser";
+        public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
+        public static final String FORMAT_ADM = "adm";
+    }
+
+    @Override
+    public void configure(Map<String, String> arguments, IAType atype) throws Exception {
+        this.configuration = arguments;
+        String[] splits = arguments.get(Constants.KEY_SPLITS).split(",");
+        configureFileSplits(splits);
+        configurePartitionConstraint();
+        configureFormat();
+        if (atype == null) {
+            configureInputType();
+        } else {
+            setInputAType(atype);
+        }
+    }
+
+    public IAType getAType() {
+        return atype;
+    }
+
+    public void setInputAType(IAType atype) {
+        this.atype = atype;
+    }
+
+    @Override
+    public void initialize(IHyracksTaskContext ctx) throws Exception {
+        this.ctx = ctx;
+        dataParser.initialize((ARecordType) atype, ctx);
+    }
+
+    @Override
+    public AdapterDataFlowType getAdapterDataFlowType() {
+        return AdapterDataFlowType.PULL;
+    }
+
+    @Override
+    public AdapterType getAdapterType() {
+        return AdapterType.READ;
+    }
+
+    @Override
+    public IDataParser getDataParser(int partition) throws Exception {
+        FileSplit split = fileSplits[partition];
+        File inputFile = split.getLocalFile().getFile();
+        InputStream in;
+        try {
+            in = new FileInputStream(inputFile);
+        } catch (FileNotFoundException e) {
+            throw new HyracksDataException(e);
+        }
+        if (dataParser instanceof IDataStreamParser) {
+            ((IDataStreamParser) dataParser).setInputStream(in);
+        } else {
+            throw new IllegalArgumentException(" parser not compatible");
+        }
+        return dataParser;
+    }
+
+    private void configureFileSplits(String[] splits) {
+        fileSplits = new FileSplit[splits.length];
+        String nodeName;
+        String nodeLocalPath;
+        int count = 0;
+        for (String splitPath : splits) {
+            nodeName = splitPath.split(":")[0];
+            nodeLocalPath = splitPath.split("://")[1];
+            FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath)));
+            fileSplits[count++] = fileSplit;
+        }
+    }
+
+    protected void configureFormat() throws Exception {
+        String parserClass = configuration.get(Constants.KEY_PARSER);
+        if (parserClass == null) {
+            if (Constants.FORMAT_DELIMITED_TEXT.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
+                parserClass = formatToParserMap.get(FORMAT_DELIMITED_TEXT);
+            } else if (Constants.FORMAT_ADM.equalsIgnoreCase(configuration.get(Constants.KEY_FORMAT))) {
+                parserClass = formatToParserMap.get(Constants.FORMAT_ADM);
+            } else {
+                throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported");
+            }
+        }
+        dataParser = (IDataParser) Class.forName(parserClass).newInstance();
+        dataParser.configure(configuration);
+    }
+
+    private void configureInputType() {
+        throw new UnsupportedOperationException(" Cannot resolve input type, operation not supported");
+    }
+
+    private void configurePartitionConstraint() {
+        String[] locs = new String[fileSplits.length];
+        for (int i = 0; i < fileSplits.length; i++) {
+            locs[i] = fileSplits[i].getNodeName();
+        }
+        partitionConstraint = new AlgebricksAbsolutePartitionConstraint(locs);
+    }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
new file mode 100644
index 0000000..76d53ab
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
@@ -0,0 +1,266 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import twitter4j.Query;
+import twitter4j.QueryResult;
+import twitter4j.Tweet;
+import twitter4j.Twitter;
+import twitter4j.TwitterFactory;
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
+import edu.uci.ics.asterix.external.data.parser.AbstractStreamDataParser;
+import edu.uci.ics.asterix.external.data.parser.IDataParser;
+import edu.uci.ics.asterix.external.data.parser.IDataStreamParser;
+import edu.uci.ics.asterix.external.data.parser.IManagedDataParser;
+import edu.uci.ics.asterix.external.data.parser.ManagedDelimitedDataStreamParser;
+import edu.uci.ics.asterix.feed.intake.IFeedClient;
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.feed.managed.adapter.IMutableFeedAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class PullBasedTwitterAdapter extends AbstractDatasourceAdapter implements IDatasourceReadAdapter,
+        IManagedFeedAdapter, IMutableFeedAdapter {
+
+    private IDataStreamParser parser;
+    private int parallelism = 1;
+    private boolean stopRequested = false;
+    private boolean alterRequested = false;
+    private Map<String, String> alteredParams = new HashMap<String, String>();
+
+    public static final String QUERY = "query";
+    public static final String INTERVAL = "interval";
+
+    @Override
+    public void configure(Map<String, String> arguments, IAType atype) throws Exception {
+        configuration = arguments;
+        this.atype = atype;
+        partitionConstraint = new AlgebricksCountPartitionConstraint(1);
+    }
+
+    @Override
+    public AdapterDataFlowType getAdapterDataFlowType() {
+        return dataFlowType.PULL;
+    }
+
+    @Override
+    public AdapterType getAdapterType() {
+        return adapterType.READ;
+    }
+
+    @Override
+    public void initialize(IHyracksTaskContext ctx) throws Exception {
+        this.ctx = ctx;
+    }
+
+    @Override
+    public void beforeSuspend() throws Exception {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void beforeResume() throws Exception {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void beforeStop() throws Exception {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public IDataParser getDataParser(int partition) throws Exception {
+        if (parser == null) {
+            parser = new ManagedDelimitedDataStreamParser();
+            ((IManagedDataParser) parser).setAdapter(this);
+            configuration.put(AbstractStreamDataParser.KEY_DELIMITER, "|");
+            parser.configure(configuration);
+            parser.initialize((ARecordType) atype, ctx);
+            TweetClient tweetClient = new TweetClient(ctx.getJobletContext().getApplicationContext().getNodeId(), this);
+            TweetStream tweetStream = new TweetStream(tweetClient, ctx);
+            parser.setInputStream(tweetStream);
+        }
+        return parser;
+    }
+
+    @Override
+    public void stop() throws Exception {
+        stopRequested = true;
+    }
+
+    public boolean isStopRequested() {
+        return stopRequested;
+    }
+
+    @Override
+    public void alter(Map<String, String> properties) throws Exception {
+        alterRequested = true;
+        this.alteredParams = properties;
+    }
+
+    public boolean isAlterRequested() {
+        return alterRequested;
+    }
+
+    public Map<String, String> getAlteredParams() {
+        return alteredParams;
+    }
+
+    public void postAlteration() {
+        alteredParams = null;
+        alterRequested = false;
+    }
+}
+
+class TweetStream extends InputStream {
+
+    private ByteBuffer buffer;
+    private int capacity;
+    private TweetClient tweetClient;
+    private List<String> tweets = new ArrayList<String>();
+
+    public TweetStream(TweetClient tweetClient, IHyracksTaskContext ctx) throws Exception {
+        capacity = ctx.getFrameSize();
+        buffer = ByteBuffer.allocate(capacity);
+        this.tweetClient = tweetClient;
+        initialize();
+    }
+
+    private void initialize() throws Exception {
+        boolean hasMore = tweetClient.next(tweets);
+        if (!hasMore) {
+            buffer.limit(0);
+        } else {
+            buffer.position(0);
+            buffer.limit(capacity);
+            for (String tweet : tweets) {
+                buffer.put(tweet.getBytes());
+                buffer.put("\n".getBytes());
+            }
+            buffer.flip();
+        }
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (!buffer.hasRemaining()) {
+
+            boolean hasMore = tweetClient.next(tweets);
+            if (!hasMore) {
+                return -1;
+            }
+            buffer.position(0);
+            buffer.limit(capacity);
+            for (String tweet : tweets) {
+                buffer.put(tweet.getBytes());
+                buffer.put("\n".getBytes());
+            }
+            buffer.flip();
+            return buffer.get();
+        } else {
+            return buffer.get();
+        }
+
+    }
+}
+
+class TweetClient implements IFeedClient {
+
+    private String query;
+    private int timeInterval = 5;
+    private Character delimiter = '|';
+    private long id = 0;
+    private String id_prefix;
+
+    private final PullBasedTwitterAdapter adapter;
+
+    public TweetClient(String id_prefix, PullBasedTwitterAdapter adapter) {
+        this.id_prefix = id_prefix;
+        this.adapter = adapter;
+        initialize(adapter.getConfiguration());
+    }
+
+    private void initialize(Map<String, String> params) {
+        this.query = params.get(PullBasedTwitterAdapter.QUERY);
+        if (params.get(PullBasedTwitterAdapter.INTERVAL) != null) {
+            this.timeInterval = Integer.parseInt(params.get(PullBasedTwitterAdapter.INTERVAL));
+        }
+    }
+
+    @Override
+    public boolean next(List<String> tweets) {
+        try {
+            if (adapter.isStopRequested()) {
+                return false;
+            }
+            if (adapter.isAlterRequested()) {
+                initialize(((PullBasedTwitterAdapter) adapter).getAlteredParams());
+                adapter.postAlteration();
+            }
+            Thread.currentThread().sleep(1000 * timeInterval);
+            tweets.clear();
+            Twitter twitter = new TwitterFactory().getInstance();
+            QueryResult result = twitter.search(new Query(query));
+            List<Tweet> sourceTweets = result.getTweets();
+            for (Tweet tweet : sourceTweets) {
+                String tweetContent = formFeedTuple(tweet);
+                tweets.add(tweetContent);
+                System.out.println(tweetContent);
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            return false;
+        }
+        return true;
+    }
+
+    public String formFeedTuple(Object tweetObject) {
+        Tweet tweet = (Tweet) tweetObject;
+        StringBuilder builder = new StringBuilder();
+        builder.append(id_prefix + ":" + id);
+        builder.append(delimiter);
+        builder.append(tweet.getFromUserId());
+        builder.append(delimiter);
+        builder.append("Orange County");
+        builder.append(delimiter);
+        builder.append(escapeChars(tweet));
+        builder.append(delimiter);
+        builder.append(tweet.getCreatedAt().toString());
+        id++;
+        return new String(builder);
+    }
+
+    private String escapeChars(Tweet tweet) {
+        if (tweet.getText().contains("\n")) {
+            return tweet.getText().replace("\n", " ");
+        }
+        return tweet.getText();
+    }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
new file mode 100644
index 0000000..0cf8e95
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.external.data.parser.IDataParser;
+import edu.uci.ics.asterix.external.data.parser.IDataStreamParser;
+import edu.uci.ics.asterix.external.data.parser.IManagedDataParser;
+import edu.uci.ics.asterix.external.data.parser.ManagedDelimitedDataStreamParser;
+import edu.uci.ics.asterix.feed.intake.FeedStream;
+import edu.uci.ics.asterix.feed.intake.IFeedClient;
+import edu.uci.ics.asterix.feed.intake.RSSFeedClient;
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.feed.managed.adapter.IMutableFeedAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class RSSFeedAdapter extends AbstractDatasourceAdapter implements IDatasourceAdapter, IManagedFeedAdapter,
+        IMutableFeedAdapter {
+
+    private List<String> feedURLs = new ArrayList<String>();
+    private boolean isStopRequested = false;
+    private boolean isAlterRequested = false;
+    private Map<String, String> alteredParams = new HashMap<String, String>();
+    private String id_prefix = "";
+
+    public static final String KEY_RSS_URL = "url";
+    public static final String KEY_INTERVAL = "interval";
+
+    public boolean isStopRequested() {
+        return isStopRequested;
+    }
+
+    public void setStopRequested(boolean isStopRequested) {
+        this.isStopRequested = isStopRequested;
+    }
+
+    @Override
+    public IDataParser getDataParser(int partition) throws Exception {
+        if (dataParser == null) {
+            dataParser = new ManagedDelimitedDataStreamParser();
+            ((IManagedDataParser) dataParser).setAdapter(this);
+            dataParser.initialize((ARecordType) atype, ctx);
+            IFeedClient feedClient = new RSSFeedClient(this, feedURLs.get(partition), id_prefix);
+            FeedStream feedStream = new FeedStream(feedClient, ctx);
+            ((IDataStreamParser) dataParser).setInputStream(feedStream);
+        }
+        return dataParser;
+    }
+
+    @Override
+    public void alter(Map<String, String> properties) throws Exception {
+        isAlterRequested = true;
+        this.alteredParams = properties;
+        reconfigure(properties);
+    }
+
+    public void postAlteration() {
+        alteredParams = null;
+        isAlterRequested = false;
+    }
+
+    @Override
+    public void beforeSuspend() throws Exception {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void beforeResume() throws Exception {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void beforeStop() throws Exception {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void stop() throws Exception {
+        isStopRequested = true;
+    }
+
+    @Override
+    public AdapterDataFlowType getAdapterDataFlowType() {
+        return AdapterDataFlowType.PULL;
+    }
+
+    @Override
+    public AdapterType getAdapterType() {
+        return AdapterType.READ;
+    }
+
+    @Override
+    public void configure(Map<String, String> arguments, IAType atype) throws Exception {
+        configuration = arguments;
+        this.atype = atype;
+        String rssURLProperty = configuration.get(KEY_RSS_URL);
+        if (rssURLProperty == null) {
+            throw new IllegalArgumentException("no rss url provided");
+        }
+        initializeFeedURLs(rssURLProperty);
+        configurePartitionConstraints();
+
+    }
+
+    private void initializeFeedURLs(String rssURLProperty) {
+        feedURLs.clear();
+        String[] feedURLProperty = rssURLProperty.split(",");
+        for (String feedURL : feedURLProperty) {
+            feedURLs.add(feedURL);
+        }
+    }
+
+    protected void reconfigure(Map<String, String> arguments) {
+        String rssURLProperty = configuration.get(KEY_RSS_URL);
+        if (rssURLProperty != null) {
+            initializeFeedURLs(rssURLProperty);
+        }
+    }
+
+    protected void configurePartitionConstraints() {
+        partitionConstraint = new AlgebricksCountPartitionConstraint(feedURLs.size());
+    }
+
+    @Override
+    public void initialize(IHyracksTaskContext ctx) throws Exception {
+        this.ctx = ctx;
+        id_prefix = ctx.getJobletContext().getApplicationContext().getNodeId();
+    }
+
+    public boolean isAlterRequested() {
+        return isAlterRequested;
+    }
+
+    public Map<String, String> getAlteredParams() {
+        return alteredParams;
+    }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/comm/AlterFeedMessage.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/comm/AlterFeedMessage.java
new file mode 100644
index 0000000..d34af73
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/comm/AlterFeedMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feed.comm;
+
+import java.util.Map;
+
+public class AlterFeedMessage extends FeedMessage {
+
+    private final Map<String, String> alteredConfParams;
+
+    public AlterFeedMessage(Map<String, String> alteredConfParams) {
+        super(MessageType.ALTER);
+        messageResponseMode = MessageResponseMode.SYNCHRONOUS;
+        this.alteredConfParams = alteredConfParams;
+    }
+
+    public AlterFeedMessage(MessageResponseMode mode, Map<String, String> alteredConfParams) {
+        super(MessageType.ALTER);
+        messageResponseMode = mode;
+        this.alteredConfParams = alteredConfParams;
+    }
+
+    @Override
+    public MessageResponseMode getMessageResponseMode() {
+        return messageResponseMode;
+    }
+
+    @Override
+    public MessageType getMessageType() {
+        return MessageType.ALTER;
+    }
+
+    public Map<String, String> getAlteredConfParams() {
+        return alteredConfParams;
+    }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/comm/FeedMessage.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/comm/FeedMessage.java
new file mode 100644
index 0000000..1f1a020
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/comm/FeedMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feed.comm;
+
+public  class FeedMessage implements IFeedMessage {
+
+    protected MessageResponseMode messageResponseMode = MessageResponseMode.SYNCHRONOUS;
+    protected MessageType messageType;
+    
+
+    public FeedMessage(MessageType messageType){
+        this.messageType = messageType;
+    }
+
+
+    public MessageResponseMode getMessageResponseMode() {
+        return messageResponseMode;
+    }
+
+
+    public void setMessageResponseMode(MessageResponseMode messageResponseMode) {
+        this.messageResponseMode = messageResponseMode;
+    }
+
+
+    public MessageType getMessageType() {
+        return messageType;
+    }
+
+
+    public void setMessageType(MessageType messageType) {
+        this.messageType = messageType;
+    }
+    
+   
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/comm/IFeedMessage.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/comm/IFeedMessage.java
new file mode 100644
index 0000000..dfb4f91
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/comm/IFeedMessage.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feed.comm;
+
+import java.io.Serializable;
+
+public interface IFeedMessage extends Serializable {
+
+    public enum MessageResponseMode {
+        SYNCHRONOUS,
+        ASYNCHRONOUS,
+    }
+
+    public enum MessageType {
+        STOP,
+        SUSPEND,
+        RESUME,
+        ALTER,
+    }
+
+    public MessageResponseMode getMessageResponseMode();
+
+    public MessageType getMessageType();
+    
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/AbstractFeedStream.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/AbstractFeedStream.java
new file mode 100644
index 0000000..3d3856d
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/AbstractFeedStream.java
@@ -0,0 +1,62 @@
+package edu.uci.ics.asterix.feed.intake;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public abstract class AbstractFeedStream extends InputStream {
+
+	private ByteBuffer buffer;
+	private int capacity;
+	private IFeedClient feedClient;
+	private List<String> feedObjects;
+
+	public AbstractFeedStream(IFeedClient feedClient, IHyracksTaskContext ctx)
+			throws Exception {
+		capacity = ctx.getFrameSize();
+		buffer = ByteBuffer.allocate(capacity);
+		this.feedClient = feedClient;
+		initialize();
+	}
+
+	@Override
+	public int read() throws IOException {
+		if (!buffer.hasRemaining()) {
+
+			boolean hasMore = feedClient.next(feedObjects);
+			if (!hasMore) {
+				return -1;
+			}
+			buffer.position(0);
+			buffer.limit(capacity);
+			for (String feed : feedObjects) {
+				buffer.put(feed.getBytes());
+				buffer.put("\n".getBytes());
+			}
+			buffer.flip();
+			return buffer.get();
+		} else {
+			return buffer.get();
+		}
+
+	}
+
+	private void initialize() throws Exception {
+		boolean hasMore = feedClient.next(feedObjects);
+		if (!hasMore) {
+			buffer.limit(0);
+		} else {
+			buffer.position(0);
+			buffer.limit(capacity);
+			for (String feed : feedObjects) {
+				buffer.put(feed.getBytes());
+				buffer.put("\n".getBytes());
+			}
+			buffer.flip();
+		}
+	}
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/FeedStream.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/FeedStream.java
new file mode 100644
index 0000000..9d75182
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/FeedStream.java
@@ -0,0 +1,13 @@
+package edu.uci.ics.asterix.feed.intake;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+
+public class FeedStream extends AbstractFeedStream {
+
+	public FeedStream(IFeedClient feedClient, IHyracksTaskContext ctx)
+			throws Exception {
+		super(feedClient, ctx);
+	}
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/IFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/IFeedClient.java
new file mode 100644
index 0000000..b67561d
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/IFeedClient.java
@@ -0,0 +1,8 @@
+package edu.uci.ics.asterix.feed.intake;
+
+import java.util.List;
+
+public interface IFeedClient {
+
+	public boolean next(List<String> list);
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/RSSFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/RSSFeedClient.java
new file mode 100644
index 0000000..2e77499
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/RSSFeedClient.java
@@ -0,0 +1,158 @@
+package edu.uci.ics.asterix.feed.intake;
+
+import java.net.URL;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import com.sun.syndication.feed.synd.SyndEntryImpl;
+import com.sun.syndication.feed.synd.SyndFeed;
+import com.sun.syndication.fetcher.FeedFetcher;
+import com.sun.syndication.fetcher.FetcherEvent;
+import com.sun.syndication.fetcher.FetcherListener;
+import com.sun.syndication.fetcher.impl.FeedFetcherCache;
+import com.sun.syndication.fetcher.impl.HashMapFeedInfoCache;
+import com.sun.syndication.fetcher.impl.HttpURLFeedFetcher;
+
+import edu.uci.ics.asterix.external.dataset.adapter.RSSFeedAdapter;
+
+@SuppressWarnings("rawtypes")
+public class RSSFeedClient  implements IFeedClient {
+
+	private final String feedURL;
+	private int timeInterval = 1;
+	private Character delimiter = '|';
+	private long id = 0;
+	private String id_prefix;
+	private boolean feedModified = false;
+	private RSSFeedAdapter adapter;
+
+	public boolean isFeedModified() {
+		return feedModified;
+	}
+
+	public void setFeedModified(boolean feedModified) {
+		this.feedModified = feedModified;
+	}
+
+	public RSSFeedClient(RSSFeedAdapter adapter, String feedURL,
+			String id_prefix) {
+		this.adapter = adapter;
+		this.feedURL = feedURL;
+		this.id_prefix = id_prefix;
+	}
+
+	private void initialize(Map<String, String> params) {
+		if (params.get(adapter.KEY_INTERVAL) != null) {
+			this.timeInterval = Integer.parseInt(params
+					.get(adapter.KEY_INTERVAL));
+		}
+	}
+
+	@Override
+	public boolean next(List<String> feeds) {
+		try {
+			if (adapter.isStopRequested()) {
+				return false;
+			}
+			if (adapter.isAlterRequested()) {
+				initialize(adapter.getAlteredParams());
+				adapter.postAlteration();
+			}
+			Thread.sleep(timeInterval * 1000);
+			feeds.clear();
+			try {
+				getFeed(feeds);
+			} catch (Exception te) {
+				te.printStackTrace();
+				System.out.println("Failed to get feed: " + te.getMessage());
+			}
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+		return true;
+	}
+
+	public String formFeedTuple(Object entry) {
+		StringBuilder builder = new StringBuilder();
+		builder.append(id_prefix + ":" + id);
+		builder.append(delimiter);
+		builder.append(((SyndEntryImpl) entry).getTitle());
+		builder.append(delimiter);
+		builder.append(((SyndEntryImpl) entry).getLink());
+		id++;
+		return new String(builder);
+	}
+
+	private void getFeed(List<String> feeds) {
+		try {
+			URL feedUrl = new URL(feedURL);
+			FeedFetcherCache feedInfoCache = HashMapFeedInfoCache.getInstance();
+			FeedFetcher fetcher = new HttpURLFeedFetcher(feedInfoCache);
+
+			FetcherEventListenerImpl listener = new FetcherEventListenerImpl(
+					this);
+			fetcher.addFetcherEventListener(listener);
+			System.err.println("Retrieving feed " + feedUrl);
+			// Retrieve the feed.
+			// We will get a Feed Polled Event and then a
+			// Feed Retrieved event (assuming the feed is valid)
+			SyndFeed feed = fetcher.retrieveFeed(feedUrl);
+			if (feedModified) {
+				System.err.println(feedUrl + " retrieved");
+				System.err.println(feedUrl + " has a title: " + feed.getTitle()
+						+ " and contains " + feed.getEntries().size()
+						+ " entries.");
+
+				List fetchedFeeds = feed.getEntries();
+				Iterator feedIterator = fetchedFeeds.iterator();
+				while (feedIterator.hasNext()) {
+					SyndEntryImpl feedEntry = (SyndEntryImpl) feedIterator
+							.next();
+					String feedContent = formFeedTuple(feedEntry);
+					feeds.add(escapeChars(feedContent));
+					System.out.println(feedContent);
+				}
+			}
+		} catch (Exception ex) {
+			System.out.println("ERROR: " + ex.getMessage());
+			ex.printStackTrace();
+		}
+	}
+
+	private String escapeChars(String content) {
+		if (content.contains("\n")) {
+			return content.replace("\n", " ");
+		}
+		return content;
+	}
+
+}
+
+class FetcherEventListenerImpl implements FetcherListener {
+
+	private final IFeedClient feedClient;
+
+	public FetcherEventListenerImpl(IFeedClient feedClient) {
+		this.feedClient = feedClient;
+	}
+
+	/**
+	 * @see com.sun.syndication.fetcher.FetcherListener#fetcherEvent(com.sun.syndication.fetcher.FetcherEvent)
+	 */
+	public void fetcherEvent(FetcherEvent event) {
+		String eventType = event.getEventType();
+		if (FetcherEvent.EVENT_TYPE_FEED_POLLED.equals(eventType)) {
+			System.err.println("\tEVENT: Feed Polled. URL = "
+					+ event.getUrlString());
+		} else if (FetcherEvent.EVENT_TYPE_FEED_RETRIEVED.equals(eventType)) {
+			System.err.println("\tEVENT: Feed Retrieved. URL = "
+					+ event.getUrlString());
+			((RSSFeedClient) feedClient).setFeedModified(true);
+		} else if (FetcherEvent.EVENT_TYPE_FEED_UNCHANGED.equals(eventType)) {
+			System.err.println("\tEVENT: Feed Unchanged. URL = "
+					+ event.getUrlString());
+			((RSSFeedClient) feedClient).setFeedModified(true);
+		}
+	}
+}
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/managed/adapter/IManagedFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/managed/adapter/IManagedFeedAdapter.java
new file mode 100644
index 0000000..1eb1079
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/managed/adapter/IManagedFeedAdapter.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feed.managed.adapter;
+
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
+
+public interface IManagedFeedAdapter extends IDatasourceReadAdapter {
+
+	public enum OperationState {
+		SUSPENDED,
+		// INACTIVE state signifies that the feed dataset is not
+		// connected with the external world through the feed
+		// adapter.
+		ACTIVE,
+		// ACTIVE state signifies that the feed dataset is connected to the
+		// external world using an adapter that may put data into the dataset.
+		STOPPED, INACTIVE
+	}
+	
+	public void beforeSuspend() throws Exception;
+
+	public void beforeResume() throws Exception;
+
+	public void beforeStop() throws Exception;
+
+	public void stop() throws Exception;
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/managed/adapter/IMutableFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/managed/adapter/IMutableFeedAdapter.java
new file mode 100644
index 0000000..290c7d6
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/managed/adapter/IMutableFeedAdapter.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feed.managed.adapter;
+
+import java.util.Map;
+
+public interface IMutableFeedAdapter extends IManagedFeedAdapter {
+
+    public void alter(Map<String,String> properties) throws Exception;
+    
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/mgmt/FeedId.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/mgmt/FeedId.java
new file mode 100644
index 0000000..19076c4
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/mgmt/FeedId.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package edu.uci.ics.asterix.feed.mgmt;
+
+import java.io.Serializable;
+
+public class FeedId implements Serializable {
+
+    private final String dataverse;
+    private final String dataset;
+
+    public FeedId(String dataverse, String dataset) {
+        this.dataset = dataset;
+        this.dataverse = dataverse;
+    }
+
+    public String getDataverse() {
+        return dataverse;
+    }
+
+    public String getDataset() {
+        return dataset;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || !(o instanceof FeedId)) {
+            return false;
+        }
+        if (((FeedId) o).getDataset().equals(dataset) && ((FeedId) o).getDataverse().equals(dataverse)) {
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return dataverse.hashCode() + dataset.hashCode();
+    }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/mgmt/FeedManager.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/mgmt/FeedManager.java
new file mode 100644
index 0000000..39c0442
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/mgmt/FeedManager.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feed.mgmt;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import edu.uci.ics.asterix.feed.comm.IFeedMessage;
+
+public class FeedManager implements IFeedManager {
+
+    private Map<FeedId, Set<LinkedBlockingQueue<IFeedMessage>>> outGoingMsgQueueMap = new HashMap<FeedId, Set<LinkedBlockingQueue<IFeedMessage>>>();
+    private LinkedBlockingQueue<IFeedMessage> incomingMsgQueue = new LinkedBlockingQueue<IFeedMessage>();
+
+    
+    @Override
+    public boolean deliverMessage(FeedId feedId, IFeedMessage feedMessage) throws Exception {
+        Set<LinkedBlockingQueue<IFeedMessage>> operatorQueues = outGoingMsgQueueMap.get(feedId);
+        if (operatorQueues == null) {
+            throw new IllegalArgumentException(" unknown feed id " + feedId.getDataverse() + ":" + feedId.getDataset());
+        }
+
+        for (LinkedBlockingQueue<IFeedMessage> queue : operatorQueues) {
+            queue.put(feedMessage);
+        }
+        return false;
+    }
+
+    @Override
+    public void registerFeedOperatorMsgQueue(FeedId feedId, LinkedBlockingQueue<IFeedMessage> queue) {
+        Set<LinkedBlockingQueue<IFeedMessage>> feedQueues = outGoingMsgQueueMap.get(feedId);
+        if (feedQueues == null) {
+            feedQueues = new HashSet<LinkedBlockingQueue<IFeedMessage>>();
+        }
+        feedQueues.add(queue);
+        outGoingMsgQueueMap.put(feedId, feedQueues);
+        
+    }
+
+    @Override
+    public void unregisterFeedOperatorMsgQueue(FeedId feedId, LinkedBlockingQueue<IFeedMessage> queue) {
+        Set<LinkedBlockingQueue<IFeedMessage>> feedQueues = outGoingMsgQueueMap.get(feedId);
+        if (feedQueues == null || !feedQueues.contains(queue)) {
+            throw new IllegalArgumentException(" unable to de-register feed message queue");
+        }
+        feedQueues.remove(queue);
+    }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/mgmt/FeedSystemProvider.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/mgmt/FeedSystemProvider.java
new file mode 100644
index 0000000..82fb67d
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/mgmt/FeedSystemProvider.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feed.mgmt;
+
+
+/**
+ * Provider for all the sub-systems (transaction/lock/log/recovery) managers.
+ * Users of transaction sub-systems must obtain them from the provider.
+ */
+public class FeedSystemProvider {
+    private static final IFeedManager feedManager = new FeedManager();
+    
+    public static IFeedManager getFeedManager()  {
+      return feedManager;
+    }
+}
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/mgmt/IFeedManager.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/mgmt/IFeedManager.java
new file mode 100644
index 0000000..a8c1303
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/mgmt/IFeedManager.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feed.mgmt;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+import edu.uci.ics.asterix.feed.comm.IFeedMessage;
+
+public interface IFeedManager {
+
+    public void registerFeedOperatorMsgQueue(FeedId feedId, LinkedBlockingQueue<IFeedMessage> queue);
+
+    public void unregisterFeedOperatorMsgQueue(FeedId feedId, LinkedBlockingQueue<IFeedMessage> queue);
+
+    public boolean deliverMessage(FeedId feedId, IFeedMessage feedMessage) throws Exception;
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedIntakeOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedIntakeOperatorDescriptor.java
new file mode 100644
index 0000000..93b8fc5
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedIntakeOperatorDescriptor.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feed.operator;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
+import edu.uci.ics.asterix.feed.mgmt.FeedId;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    private final String adapter;
+    private final Map<String, String> adapterConfiguration;
+    private final IAType atype;
+    private final FeedId feedId;
+
+    private transient IDatasourceReadAdapter datasourceReadAdapter;
+
+    public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedId feedId, String adapter,
+            Map<String, String> arguments, ARecordType atype, RecordDescriptor rDesc) {
+        super(spec, 1, 1);
+        recordDescriptors[0] = rDesc;
+        this.adapter = adapter;
+        this.adapterConfiguration = arguments;
+        this.atype = atype;
+        this.feedId = feedId;
+    }
+
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+            throws HyracksDataException {
+
+        try {
+            datasourceReadAdapter = (IDatasourceReadAdapter) Class.forName(adapter).newInstance();
+            datasourceReadAdapter.configure(adapterConfiguration, atype);
+            datasourceReadAdapter.initialize(ctx);
+
+        } catch (Exception e) {
+            throw new HyracksDataException("initialization of adapter failed", e);
+        }
+        return new FeedIntakeOperatorNodePushable(feedId, datasourceReadAdapter, partition);
+    }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedIntakeOperatorNodePushable.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedIntakeOperatorNodePushable.java
new file mode 100644
index 0000000..4603208
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedIntakeOperatorNodePushable.java
@@ -0,0 +1,107 @@
+package edu.uci.ics.asterix.feed.operator;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
+import edu.uci.ics.asterix.external.data.parser.IManagedDataParser;
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.feed.managed.adapter.IMutableFeedAdapter;
+import edu.uci.ics.asterix.feed.mgmt.FeedId;
+import edu.uci.ics.asterix.feed.mgmt.FeedSystemProvider;
+import edu.uci.ics.asterix.feed.mgmt.IFeedManager;
+import edu.uci.ics.asterix.feed.comm.AlterFeedMessage;
+import edu.uci.ics.asterix.feed.comm.IFeedMessage;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+public class FeedIntakeOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+
+    private final IDatasourceReadAdapter adapter;
+    private final int partition;
+    private final IFeedManager feedManager;
+    private final FeedId feedId;
+    private final LinkedBlockingQueue<IFeedMessage> inbox;
+    private FeedInboxMonitor feedInboxMonitor;
+
+    public FeedIntakeOperatorNodePushable(FeedId feedId, IDatasourceReadAdapter adapter, int partition) {
+        this.adapter = adapter;
+        this.partition = partition;
+        this.feedManager = (IFeedManager) FeedSystemProvider.getFeedManager();
+        this.feedId = feedId;
+        inbox = new LinkedBlockingQueue<IFeedMessage>();
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        feedInboxMonitor = new FeedInboxMonitor((IManagedFeedAdapter) adapter, inbox, partition);
+        feedInboxMonitor.start();
+        feedManager.registerFeedOperatorMsgQueue(feedId, inbox);
+        writer.open();
+        try {
+            adapter.getDataParser(partition).parse(writer);
+        } catch (Exception e) {
+            throw new HyracksDataException("exception during reading from external data source", e);
+        } finally {
+            writer.close();
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.close();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        writer.close();
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        // TODO Auto-generated method stub
+
+    }
+}
+
+class FeedInboxMonitor extends Thread {
+
+    private LinkedBlockingQueue<IFeedMessage> inbox;
+    private final IManagedFeedAdapter adapter;
+    private final int partition;
+
+    public FeedInboxMonitor(IManagedFeedAdapter adapter, LinkedBlockingQueue<IFeedMessage> inbox, int partition) {
+        this.inbox = inbox;
+        this.adapter = adapter;
+        this.partition = partition;
+    }
+
+    @Override
+    public void run() {
+        while (true) {
+            try {
+                IFeedMessage feedMessage = inbox.take();
+                switch (feedMessage.getMessageType()) {
+                    case SUSPEND:
+                        ((IManagedDataParser) adapter.getDataParser(partition)).getManagedTupleParser().suspend();
+                        break;
+                    case RESUME:
+                        ((IManagedDataParser) adapter.getDataParser(partition)).getManagedTupleParser().resume();
+                        break;
+                    case STOP:
+                        ((IManagedDataParser) adapter.getDataParser(partition)).getManagedTupleParser().stop();
+                        break;
+                    case ALTER:
+                        ((IMutableFeedAdapter) adapter).alter(((AlterFeedMessage) feedMessage).getAlteredConfParams());
+                        break;
+                }
+            } catch (InterruptedException ie) {
+                break;
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedMessageOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedMessageOperatorDescriptor.java
new file mode 100644
index 0000000..c66c49e
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedMessageOperatorDescriptor.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feed.operator;
+
+import java.util.List;
+
+import edu.uci.ics.asterix.feed.comm.IFeedMessage;
+import edu.uci.ics.asterix.feed.mgmt.FeedId;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+public class FeedMessageOperatorDescriptor extends
+		AbstractSingleActivityOperatorDescriptor {
+
+	private final FeedId feedId;
+	private final List<IFeedMessage> feedMessages;
+	private final boolean sendToAll = true;
+
+	public FeedMessageOperatorDescriptor(JobSpecification spec,
+			String dataverse, String dataset, List<IFeedMessage> feedMessages) {
+		super(spec, 0, 1);
+		this.feedId = new FeedId(dataverse, dataset);
+		this.feedMessages = feedMessages;
+	}
+
+	@Override
+	public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+			IRecordDescriptorProvider recordDescProvider, int partition,
+			int nPartitions) throws HyracksDataException {
+		return new FeedMessageOperatorNodePushable(ctx, feedId, feedMessages,
+				sendToAll, partition, nPartitions);
+	}
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedMessageOperatorNodePushable.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedMessageOperatorNodePushable.java
new file mode 100644
index 0000000..37e3ba3
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedMessageOperatorNodePushable.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feed.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.asterix.feed.comm.IFeedMessage;
+import edu.uci.ics.asterix.feed.mgmt.FeedId;
+import edu.uci.ics.asterix.feed.mgmt.FeedSystemProvider;
+import edu.uci.ics.asterix.feed.mgmt.IFeedManager;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+public class FeedMessageOperatorNodePushable extends
+		AbstractUnaryOutputSourceOperatorNodePushable {
+
+	private final FeedId feedId;
+	private final List<IFeedMessage> feedMessages;
+	private IFeedManager feedManager;
+
+	public FeedMessageOperatorNodePushable(IHyracksTaskContext ctx,
+			FeedId feedId, List<IFeedMessage> feedMessages, boolean applyToAll,
+			int partition, int nPartitions) {
+		this.feedId = feedId;
+		if (applyToAll) {
+			this.feedMessages = feedMessages;
+		} else {
+			this.feedMessages = new ArrayList<IFeedMessage>();
+			feedMessages.add(feedMessages.get(partition));
+		}
+		feedManager = (IFeedManager) FeedSystemProvider.getFeedManager();
+	}
+
+	@Override
+	public void initialize() throws HyracksDataException {
+		try {
+			writer.open();
+			for (IFeedMessage feedMessage : feedMessages) {
+				feedManager.deliverMessage(feedId, feedMessage);
+			}
+		} catch (Exception e) {
+			throw new HyracksDataException(e);
+		} finally {
+			writer.close();
+		}
+	}
+
+}