Introduces Feeds 2.0

commit c3f577861fc705d848c1641605689cadd6973bae
Merge: ebc4cae fc0c2c0
Author: ramangrover29 <ramangrover29@gmail.com>
Date:   Fri Jun 26 13:04:05 2015 -0700

    Merge branch 'raman/feeds_2_release' of https://code.google.com/p/asterixdb-sandbox into raman/feeds_2_release

    Conflicts:
    	asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
    	asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java

commit ebc4cae21a7302869f953df1ebda601e798d12d2
Author: ramangrover29 <ramangrover29@gmail.com>
Date:   Sat Jun 20 17:14:45 2015 -0700

    Introduces Feeds 2.0

    Some of the prominent chnages introduced are as follows
     a) Support for building a cascade network of feeds (via secondary feeds feature)
     b) Feed Management Console for tracking active feeds and associated metrics
     c) Support for elastic runtime for data ingestion
     d) Improved fault-tolerance with support for logging of failed records

    Documentation has been added at asterix-doc/src/site/markdown/feeds/

commit fc0c2c0549a6ee8b202e57607d2e110478cd57bb
Author: ramangrover29 <ramangrover29@gmail.com>
Date:   Sat Jun 20 17:14:45 2015 -0700

    Introduces Feeds 2.0

    Some of the prominent chnages introduced are as follows
     a) Support for building a cascade network of feeds (via secondary feeds feature)
     b) Feed Management Console for tracking active feeds and associated metrics
     c) Support for elastic runtime for data ingestion
     d) Improved fault-tolerance with support for logging of failed records

    Documentation has been added at asterix-doc/src/site/markdown/feeds/

Change-Id: I498f01c591a229aaf51cec43ab20f3e5c4f072f4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/297
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Steven Jacobs <sjaco002@ucr.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 0b045f1..7d64b61 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -136,9 +136,14 @@
 		<dependency>
 			<groupId>org.twitter4j</groupId>
 			<artifactId>twitter4j-core</artifactId>
-			<version>2.2.3</version>
+			<version>[3.0,)</version>
 		</dependency>
 		<dependency>
+            <groupId>org.twitter4j</groupId>
+            <artifactId>twitter4j-stream</artifactId>
+            <version>4.0.2</version>
+        </dependency>
+		<dependency>
 			<groupId>org.apache.hadoop</groupId>
 			<artifactId>hadoop-client</artifactId>
 			<type>jar</type>
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
index fa66715..e4f118f 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
@@ -19,12 +19,12 @@
 import java.util.List;
 import java.util.Map;
 
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
 import edu.uci.ics.asterix.external.dataset.adapter.RSSFeedAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -32,7 +32,7 @@
 /**
  * A factory class for creating the @see {CNNFeedAdapter}.
  */
-public class CNNFeedAdapterFactory implements ITypedAdapterFactory {
+public class CNNFeedAdapterFactory implements IFeedAdapterFactory {
     private static final long serialVersionUID = 1L;
 
     private Map<String, String> configuration;
@@ -40,6 +40,7 @@
     private List<String> feedURLs = new ArrayList<String>();
     private static Map<String, String> topicFeeds = new HashMap<String, String>();
     private ARecordType recordType;
+    private FeedPolicyAccessor policyAccessor;
 
     public static final String KEY_RSS_URL = "topic";
     public static final String KEY_INTERVAL = "interval";
@@ -91,22 +92,14 @@
     }
 
     @Override
-    public AdapterType getAdapterType() {
-        return AdapterType.TYPED;
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws Exception {
+    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
         this.configuration = configuration;
         String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
         if (rssURLProperty == null) {
             throw new IllegalArgumentException("no rss url provided");
         }
         initializeFeedURLs(rssURLProperty);
-        recordType = new ARecordType("FeedRecordType", new String[] { "id", "title", "description", "link" },
-                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
-                false);
-
+        this.recordType = outputType;
     }
 
     private void initializeFeedURLs(String rssURLProperty) {
@@ -147,4 +140,14 @@
         return recordType;
     }
 
+    @Override
+    public boolean isRecordTrackingEnabled() {
+        return false;
+    }
+
+    @Override
+    public IIntakeProgressTracker createIntakeProgressTracker() {
+        return null;
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
index dbf3fd7..ee481de 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -31,15 +31,17 @@
 import org.apache.hadoop.mapred.JobConf;
 
 import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
 import edu.uci.ics.asterix.external.indexing.dataflow.HDFSObjectTupleParserFactory;
 import edu.uci.ics.asterix.metadata.entities.ExternalFile;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
 import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.ICCContext;
@@ -53,7 +55,7 @@
 /**
  * A factory class for creating an instance of HDFSAdapter
  */
-public class HDFSAdapterFactory extends StreamBasedAdapterFactory implements IGenericAdapterFactory {
+public class HDFSAdapterFactory extends StreamBasedAdapterFactory implements IAdapterFactory {
     private static final long serialVersionUID = 1L;
 
     public static final String HDFS_ADAPTER_NAME = "hdfs";
@@ -80,6 +82,7 @@
     private boolean configured = false;
     public static Scheduler hdfsScheduler;
     private static boolean initialized = false;
+    protected List<ExternalFile> files;
 
     private static Scheduler initializeHDFSScheduler() {
         ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
@@ -137,11 +140,6 @@
     }
 
     @Override
-    public AdapterType getAdapterType() {
-        return AdapterType.GENERIC;
-    }
-
-    @Override
     public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
         if (!configured) {
             throw new IllegalStateException("Adapter factory has not been configured yet");
@@ -203,25 +201,41 @@
         return new AlgebricksAbsolutePartitionConstraint(cluster);
     }
 
+    @Override
+    public ARecordType getAdapterOutputType() {
+        return (ARecordType) atype;
+    }
+
+    @Override
+    public InputDataFormat getInputDataFormat() {
+        return InputDataFormat.UNKNOWN;
+    }
+
     /*
      * This method is overridden to do the following:
      * if data is text data (adm or delimited text), it will use a text tuple parser,
      * otherwise it will use hdfs record object parser
      */
     protected void configureFormat(IAType sourceDatatype) throws Exception {
-        String specifiedFormat = (String) configuration.get(KEY_FORMAT);
-        if (specifiedFormat == null) {
-            throw new IllegalArgumentException(" Unspecified data format");
-        } else if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
-            parserFactory = getDelimitedDataTupleParserFactory((ARecordType) sourceDatatype, false);
-        } else if (FORMAT_ADM.equalsIgnoreCase((String) configuration.get(KEY_FORMAT))) {
-            parserFactory = getADMDataTupleParserFactory((ARecordType) sourceDatatype, false);
-        } else if (FORMAT_BINARY.equalsIgnoreCase((String) configuration.get(KEY_FORMAT))) {
-            parserFactory = new HDFSObjectTupleParserFactory((ARecordType) atype, this, configuration);
-        } else {
-            throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported");
-        }
-    }
+         String specifiedFormat = (String) configuration.get(AsterixTupleParserFactory.KEY_FORMAT);
+         if (specifiedFormat == null) {
+                 throw new IllegalArgumentException(" Unspecified data format");
+         } 
+         
+         if(AsterixTupleParserFactory.FORMAT_BINARY.equalsIgnoreCase(specifiedFormat)){
+             parserFactory = new HDFSObjectTupleParserFactory((ARecordType) atype, this, configuration);
+         } else {
+             InputDataFormat inputFormat = InputDataFormat.UNKNOWN;
+             if (AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
+                 inputFormat = InputDataFormat.DELIMITED;
+             }   else if (AsterixTupleParserFactory.FORMAT_ADM.equalsIgnoreCase(specifiedFormat)) {
+                 inputFormat = InputDataFormat.ADM;
+             }    
+             parserFactory = new AsterixTupleParserFactory(configuration, (ARecordType) sourceDatatype
+                     , inputFormat);
+         }  
+        
+     }
 
     /**
      * Instead of creating the split using the input format, we do it manually
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
index 2a4836c..37b8050 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
@@ -16,22 +16,24 @@
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.HDFSIndexingAdapter;
 import edu.uci.ics.asterix.external.indexing.dataflow.HDFSIndexingParserFactory;
 import edu.uci.ics.asterix.external.indexing.dataflow.IndexingScheduler;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.AUnionType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
 import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
 import edu.uci.ics.asterix.runtime.operators.file.DelimitedDataParser;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -39,7 +41,12 @@
 import edu.uci.ics.hyracks.api.context.ICCContext;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.LongParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
 import edu.uci.ics.hyracks.hdfs.dataflow.InputSplitsFactory;
 
@@ -83,11 +90,6 @@
     }
 
     @Override
-    public AdapterType getAdapterType() {
-        return AdapterType.GENERIC;
-    }
-
-    @Override
     public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
         if (!configured) {
             throw new IllegalStateException("Adapter factory has not been configured yet");
@@ -104,7 +106,8 @@
         ((HDFSIndexingParserFactory) parserFactory).setArguments(configuration);
         HDFSIndexingAdapter hdfsIndexingAdapter = new HDFSIndexingAdapter(atype, readSchedule, executed, inputSplits,
                 conf, clusterLocations, files, parserFactory, ctx, nodeName,
-                (String) configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT), (String) configuration.get(KEY_FORMAT));
+                (String) configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT),
+                (String) configuration.get(AsterixTupleParserFactory.KEY_FORMAT));
         return hdfsIndexingAdapter;
     }
 
@@ -131,12 +134,13 @@
 
     protected void configureFormat(IAType sourceDatatype) throws Exception {
 
-        char delimiter = StreamBasedAdapterFactory.getDelimiter(configuration);
-        char quote = StreamBasedAdapterFactory.getQuote(configuration, delimiter);
+        char delimiter = AsterixTupleParserFactory.getDelimiter(configuration);
+        char quote = AsterixTupleParserFactory.getQuote(configuration, delimiter);
 
         parserFactory = new HDFSIndexingParserFactory((ARecordType) atype,
-                configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT), configuration.get(KEY_FORMAT), delimiter,
-                quote, configuration.get(HDFSAdapterFactory.KEY_PARSER));
+                configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT),
+                configuration.get(AsterixTupleParserFactory.KEY_FORMAT), delimiter, quote,
+                configuration.get(HDFSAdapterFactory.KEY_PARSER));
     }
 
     /**
@@ -165,7 +169,7 @@
             if (tag == null) {
                 throw new NotImplementedException("Failed to get the type information for field " + i + ".");
             }
-            IValueParserFactory vpf = typeToValueParserFactMap.get(tag);
+            IValueParserFactory vpf = valueParserFactoryMap.get(tag);
             if (vpf == null) {
                 throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
             }
@@ -190,4 +194,16 @@
         cluster = locs.toArray(cluster);
         return new AlgebricksAbsolutePartitionConstraint(cluster);
     }
+
+    private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = initializeValueParserFactoryMap();
+
+    private static Map<ATypeTag, IValueParserFactory> initializeValueParserFactoryMap() {
+        Map<ATypeTag, IValueParserFactory> m = new HashMap<ATypeTag, IValueParserFactory>();
+        m.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
+        m.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
+        m.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
+        m.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
+        m.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
+        return m;
+    }
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
index ab59241..c5ebe67 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -17,20 +17,22 @@
 import java.util.List;
 import java.util.Map;
 
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
 import edu.uci.ics.asterix.metadata.entities.ExternalFile;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
 /**
  * A factory class for creating an instance of HiveAdapter
  */
-public class HiveAdapterFactory extends StreamBasedAdapterFactory implements IGenericAdapterFactory {
+public class HiveAdapterFactory extends StreamBasedAdapterFactory implements IAdapterFactory {
     private static final long serialVersionUID = 1L;
 
     public static final String HIVE_DATABASE = "database";
@@ -61,11 +63,7 @@
         return "hive";
     }
 
-    @Override
-    public AdapterType getAdapterType() {
-        return AdapterType.GENERIC;
-    }
-
+  
     @Override
     public SupportedOperation getSupportedOperations() {
         return SupportedOperation.READ;
@@ -76,6 +74,7 @@
         if (!configured) {
             populateConfiguration(configuration);
             hdfsAdapterFactory.configure(configuration, outputType);
+            this.atype = (IAType) outputType;
         }
     }
 
@@ -90,8 +89,8 @@
                     + configuration.get(HIVE_TABLE);
         }
         configuration.put(HDFSAdapterFactory.KEY_PATH, tablePath);
-        if (!configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT)) {
-            throw new IllegalArgumentException("format" + configuration.get(KEY_FORMAT) + " is not supported");
+        if (!configuration.get(AsterixTupleParserFactory.KEY_FORMAT).equals(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) {
+            throw new IllegalArgumentException("format" + configuration.get(AsterixTupleParserFactory.KEY_FORMAT) + " is not supported");
         }
 
         if (!(configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT) || configuration
@@ -106,9 +105,19 @@
         return hdfsAdapterFactory.getPartitionConstraint();
     }
 
+    
     @Override
+    public ARecordType getAdapterOutputType() {
+        return (ARecordType) atype;
+    }
+
+    @Override
+    public InputDataFormat getInputDataFormat() {
+        return InputDataFormat.UNKNOWN;
+    }
+
     public void setFiles(List<ExternalFile> files) {
-        this.files = files;
+        hdfsAdapterFactory.setFiles(files);
     }
 
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
index 110a965..446199e 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -20,15 +20,17 @@
 import java.util.logging.Level;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter;
 import edu.uci.ics.asterix.external.util.DNSResolverFactory;
 import edu.uci.ics.asterix.external.util.INodeResolver;
 import edu.uci.ics.asterix.external.util.INodeResolverFactory;
 import edu.uci.ics.asterix.metadata.entities.ExternalFile;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -41,7 +43,7 @@
  * NCFileSystemAdapter reads external data residing on the local file system of
  * an NC.
  */
-public class NCFileSystemAdapterFactory extends StreamBasedAdapterFactory implements IGenericAdapterFactory {
+public class NCFileSystemAdapterFactory extends StreamBasedAdapterFactory implements IAdapterFactory {
     private static final long serialVersionUID = 1L;
 
     public static final String NC_FILE_SYSTEM_ADAPTER_NAME = "localfs";
@@ -50,6 +52,8 @@
 
     private IAType sourceDatatype;
     private FileSplit[] fileSplits;
+    private ARecordType outputType;
+
 
     @Override
     public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
@@ -62,10 +66,6 @@
         return NC_FILE_SYSTEM_ADAPTER_NAME;
     }
 
-    @Override
-    public AdapterType getAdapterType() {
-        return AdapterType.GENERIC;
-    }
 
     @Override
     public SupportedOperation getSupportedOperations() {
@@ -75,7 +75,8 @@
     @Override
     public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
         this.configuration = configuration;
-        String[] splits = ((String) configuration.get(KEY_PATH)).split(",");
+        this.outputType = outputType;
+        String[] splits = ((String) configuration.get(AsterixTupleParserFactory.KEY_PATH)).split(",");
         IAType sourceDatatype = (IAType) outputType;
         configureFileSplits(splits);
         configureFormat(sourceDatatype);
@@ -127,7 +128,7 @@
 
     private static INodeResolver initializeNodeResolver() {
         INodeResolver nodeResolver = null;
-        String configuredNodeResolverFactory = System.getProperty(NODE_RESOLVER_FACTORY_PROPERTY);
+        String configuredNodeResolverFactory = System.getProperty(AsterixTupleParserFactory.NODE_RESOLVER_FACTORY_PROPERTY);
         if (configuredNodeResolverFactory != null) {
             try {
                 nodeResolver = ((INodeResolverFactory) (Class.forName(configuredNodeResolverFactory).newInstance()))
@@ -145,8 +146,17 @@
         }
         return nodeResolver;
     }
-
+    
     @Override
+    public ARecordType getAdapterOutputType() {
+        return outputType;
+    }
+    
+    @Override
+    public InputDataFormat getInputDataFormat() {
+        return InputDataFormat.UNKNOWN;
+    }
+
     public void setFiles(List<ExternalFile> files) throws AlgebricksException {
         throw new AlgebricksException("can't set files for this Adapter");
     }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
index b4dbe13..298a443 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
@@ -3,12 +3,14 @@
 import java.util.Map;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
 import edu.uci.ics.asterix.external.dataset.adapter.PullBasedAzureTwitterAdapter;
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
 import edu.uci.ics.asterix.metadata.entities.Datatype;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.IAType;
@@ -16,7 +18,7 @@
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
-public class PullBasedAzureTwitterAdapterFactory implements ITypedAdapterFactory {
+public class PullBasedAzureTwitterAdapterFactory implements IFeedAdapterFactory {
 
     private static final long serialVersionUID = 1L;
 
@@ -27,13 +29,14 @@
     private static final String ACCOUNT_NAME_KEY = "account-name";
     private static final String ACCOUNT_KEY_KEY = "account-key";
 
-    private ARecordType recordType;
+    private ARecordType outputType;
     private Map<String, String> configuration;
     private String tableName;
     private String azureAccountName;
     private String azureAccountKey;
     private String[] locations;
     private String[] partitions;
+    private FeedPolicyAccessor ingestionPolicy;
 
     @Override
     public SupportedOperation getSupportedOperations() {
@@ -46,11 +49,6 @@
     }
 
     @Override
-    public AdapterType getAdapterType() {
-        return AdapterType.TYPED;
-    }
-
-    @Override
     public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
         String locationsStr = configuration.get(INGESTOR_LOCATIONS_KEY);
         if (locationsStr == null) {
@@ -63,17 +61,18 @@
     @Override
     public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
         return new PullBasedAzureTwitterAdapter(azureAccountName, azureAccountKey, tableName, partitions,
-                configuration, ctx, recordType);
+                configuration, ctx, outputType);
     }
 
     @Override
     public ARecordType getAdapterOutputType() {
-        return recordType;
+        return outputType;
     }
 
     @Override
-    public void configure(Map<String, String> configuration) throws Exception {
+    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
         this.configuration = configuration;
+        this.outputType = outputType;
 
         tableName = configuration.get(TABLE_NAME_KEY);
         if (tableName == null) {
@@ -125,7 +124,7 @@
             if (type.getTypeTag() != ATypeTag.RECORD) {
                 throw new IllegalStateException();
             }
-            recordType = (ARecordType) t.getDatatype();
+            outputType = (ARecordType) t.getDatatype();
             MetadataManager.INSTANCE.commitTransaction(ctx);
         } catch (Exception e) {
             if (ctx != null) {
@@ -136,4 +135,21 @@
             MetadataManager.INSTANCE.releaseReadLatch();
         }
     }
+
+    @Override
+    public boolean isRecordTrackingEnabled() {
+        return false;
+    }
+
+    @Override
+    public IIntakeProgressTracker createIntakeProgressTracker() {
+        return null;
+    }
+
+    public FeedPolicyAccessor getIngestionPolicy() {
+        return ingestionPolicy;
+    }
+    
+    
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
index 6058bd2..c43ca05 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
@@ -15,13 +15,18 @@
 package edu.uci.ics.asterix.external.adapter.factory;
 
 import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
 import edu.uci.ics.asterix.external.dataset.adapter.PullBasedTwitterAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.external.util.TwitterUtil;
+import edu.uci.ics.asterix.external.util.TwitterUtil.SearchAPIConstants;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -31,29 +36,22 @@
  * This adapter provides the functionality of fetching tweets from Twitter service
  * via pull-based Twitter API.
  */
-public class PullBasedTwitterAdapterFactory implements ITypedAdapterFactory {
+public class PullBasedTwitterAdapterFactory implements IFeedAdapterFactory {
     private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = Logger.getLogger(PullBasedTwitterAdapterFactory.class.getName());
+
     public static final String PULL_BASED_TWITTER_ADAPTER_NAME = "pull_twitter";
 
-    private Map<String, String> configuration;
-    private static ARecordType recordType = initOutputType();
+    private static final String DEFAULT_INTERVAL = "10"; // 10 seconds
+    private static final int INTAKE_CARDINALITY = 1; // degree of parallelism at intake stage 
 
-    private static ARecordType initOutputType() {
-        ARecordType recordType = null;
-        String[] fieldNames = { "id", "username", "location", "text", "timestamp" };
-        IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
-                BuiltinType.ASTRING };
-        try {
-            recordType = new ARecordType("TweetType", fieldNames, fieldTypes, false);
-        } catch (Exception e) {
-            throw new IllegalStateException("Unable to create adapter output type");
-        }
-        return recordType;
-    }
+    private ARecordType outputType;
+
+    private Map<String, String> configuration;
 
     @Override
     public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
-        return new PullBasedTwitterAdapter(configuration, recordType, ctx);
+        return new PullBasedTwitterAdapter(configuration, outputType, ctx);
     }
 
     @Override
@@ -62,28 +60,57 @@
     }
 
     @Override
-    public AdapterType getAdapterType() {
-        return AdapterType.TYPED;
-    }
-
-    @Override
     public SupportedOperation getSupportedOperations() {
         return SupportedOperation.READ;
     }
 
     @Override
-    public void configure(Map<String, String> configuration) throws Exception {
+    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
+        this.outputType = outputType;
         this.configuration = configuration;
+        TwitterUtil.initializeConfigurationWithAuthInfo(configuration);
+
+        if (configuration.get(SearchAPIConstants.QUERY) == null) {
+            throw new AsterixException("parameter " + SearchAPIConstants.QUERY
+                    + " not specified as part of adaptor configuration");
+        }
+
+        String interval = configuration.get(SearchAPIConstants.INTERVAL);
+        if (interval != null) {
+            try {
+                Integer.parseInt(interval);
+            } catch (NumberFormatException nfe) {
+                throw new IllegalArgumentException("parameter " + SearchAPIConstants.INTERVAL
+                        + " is defined incorrectly, expecting a number");
+            }
+        } else {
+            configuration.put(SearchAPIConstants.INTERVAL, DEFAULT_INTERVAL);
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning(" Parameter " + SearchAPIConstants.INTERVAL + " not defined, using default ("
+                        + DEFAULT_INTERVAL + ")");
+            }
+        }
+
     }
 
     @Override
     public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return new AlgebricksCountPartitionConstraint(1);
+        return new AlgebricksCountPartitionConstraint(INTAKE_CARDINALITY);
+    }
+
+    @Override
+    public boolean isRecordTrackingEnabled() {
+        return false;
+    }
+
+    @Override
+    public IIntakeProgressTracker createIntakeProgressTracker() {
+        return null;
     }
 
     @Override
     public ARecordType getAdapterOutputType() {
-        return recordType;
+        return outputType;
     }
 
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PushBasedTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PushBasedTwitterAdapterFactory.java
new file mode 100644
index 0000000..c947a6a
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PushBasedTwitterAdapterFactory.java
@@ -0,0 +1,69 @@
+package edu.uci.ics.asterix.external.adapter.factory;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
+import edu.uci.ics.asterix.external.dataset.adapter.PushBasedTwitterAdapter;
+import edu.uci.ics.asterix.external.util.TwitterUtil;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class PushBasedTwitterAdapterFactory implements IFeedAdapterFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String NAME = "push_twitter";
+
+    private ARecordType outputType;
+
+    private Map<String, String> configuration;
+
+    @Override
+    public SupportedOperation getSupportedOperations() {
+        return SupportedOperation.READ;
+    }
+
+    @Override
+    public String getName() {
+        return NAME;
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        return new AlgebricksCountPartitionConstraint(1);
+    }
+
+    @Override
+    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
+        PushBasedTwitterAdapter twitterAdapter = new PushBasedTwitterAdapter(configuration, outputType, ctx);
+        return twitterAdapter;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
+        this.outputType = outputType;
+        this.configuration = configuration;
+        TwitterUtil.initializeConfigurationWithAuthInfo(configuration);
+    }
+
+    @Override
+    public ARecordType getAdapterOutputType() {
+        return outputType;
+    }
+    
+
+    @Override
+    public boolean isRecordTrackingEnabled() {
+        return false;
+    }
+
+    @Override
+    public IIntakeProgressTracker createIntakeProgressTracker() {
+        return null;
+    }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
index 41f1d56..01ae2f2 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
@@ -18,12 +18,12 @@
 import java.util.List;
 import java.util.Map;
 
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
 import edu.uci.ics.asterix.external.dataset.adapter.RSSFeedAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -32,7 +32,7 @@
  * Factory class for creating an instance of @see {RSSFeedAdapter}.
  * RSSFeedAdapter provides the functionality of fetching an RSS based feed.
  */
-public class RSSFeedAdapterFactory implements ITypedAdapterFactory {
+public class RSSFeedAdapterFactory implements IFeedAdapterFactory {
     private static final long serialVersionUID = 1L;
     public static final String RSS_FEED_ADAPTER_NAME = "rss_feed";
 
@@ -40,12 +40,13 @@
     public static final String KEY_INTERVAL = "interval";
 
     private Map<String, String> configuration;
-    private ARecordType recordType;
+    private ARecordType outputType;
     private List<String> feedURLs = new ArrayList<String>();
+    private FeedPolicyAccessor ingestionPolicy;
 
     @Override
     public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
-        RSSFeedAdapter rssFeedAdapter = new RSSFeedAdapter(configuration, recordType, ctx);
+        RSSFeedAdapter rssFeedAdapter = new RSSFeedAdapter(configuration, outputType, ctx);
         return rssFeedAdapter;
     }
 
@@ -55,28 +56,20 @@
     }
 
     @Override
-    public AdapterType getAdapterType() {
-        return AdapterType.TYPED;
-    }
-
-    @Override
     public SupportedOperation getSupportedOperations() {
         return SupportedOperation.READ;
     }
 
     @Override
-    public void configure(Map<String, String> configuration) throws Exception {
+    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
         this.configuration = configuration;
+        this.outputType = outputType;
         String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
         if (rssURLProperty == null) {
             throw new IllegalArgumentException("no rss url provided");
         }
         initializeFeedURLs(rssURLProperty);
         configurePartitionConstraints();
-        recordType = new ARecordType("FeedRecordType", new String[] { "id", "title", "description", "link" },
-                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
-                false);
-
     }
 
     @Override
@@ -98,7 +91,21 @@
 
     @Override
     public ARecordType getAdapterOutputType() {
-        return recordType;
+        return outputType;
+    }
+
+    @Override
+    public boolean isRecordTrackingEnabled() {
+        return false;
+    }
+
+    @Override
+    public IIntakeProgressTracker createIntakeProgressTracker() {
+        return null;
+    }
+
+    public FeedPolicyAccessor getIngestionPolicy() {
+        return ingestionPolicy;
     }
 
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
index 0f340bc..7d45448 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
@@ -14,30 +14,15 @@
  */
 package edu.uci.ics.asterix.external.adapter.factory;
 
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.logging.Logger;
 
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.external.util.INodeResolver;
-import edu.uci.ics.asterix.metadata.entities.ExternalFile;
-import edu.uci.ics.asterix.metadata.feeds.ConditionalPushTupleParserFactory;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
-import edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.LongParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
 
 public abstract class StreamBasedAdapterFactory implements IAdapterFactory {
@@ -45,146 +30,18 @@
     private static final long serialVersionUID = 1L;
     protected static final Logger LOGGER = Logger.getLogger(StreamBasedAdapterFactory.class.getName());
 
-    protected Map<String, String> configuration;
     protected static INodeResolver nodeResolver;
 
-    public static final String KEY_FORMAT = "format";
-    public static final String KEY_PARSER_FACTORY = "parser";
-    public static final String KEY_DELIMITER = "delimiter";
-    public static final String KEY_QUOTE = "quote";
-    public static final String KEY_HEADER = "header";
-    public static final String KEY_PATH = "path";
-    public static final String KEY_SOURCE_DATATYPE = "output-type-name";
-    // The length of a delimiter should be 1.
-    public static final String DEFAULT_DELIMITER = ",";
-    // A quote is used to enclose a string if it includes delimiter(s) in it.
-    // The length of a quote should be 1.
-    public static final String DEFAULT_QUOTE = "\"";
-    public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
-    public static final String FORMAT_ADM = "adm";
-    public static final String NODE_RESOLVER_FACTORY_PROPERTY = "node.Resolver";
-    public static final String BATCH_SIZE = "batch-size";
-    public static final String BATCH_INTERVAL = "batch-interval";
-
+    protected Map<String, String> configuration;
     protected ITupleParserFactory parserFactory;
-    protected ITupleParser parser;
+   
 
-    protected List<ExternalFile> files;
-
-    protected static final HashMap<ATypeTag, IValueParserFactory> typeToValueParserFactMap = new HashMap<ATypeTag, IValueParserFactory>();
-
-    static {
-        typeToValueParserFactMap.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
-        typeToValueParserFactMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
-        typeToValueParserFactMap.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
-        typeToValueParserFactMap.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
-        typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
-    }
-
-    protected ITupleParserFactory getDelimitedDataTupleParserFactory(ARecordType recordType, boolean conditionalPush)
-            throws AsterixException {
-        int n = recordType.getFieldTypes().length;
-        IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
-        for (int i = 0; i < n; i++) {
-            ATypeTag tag = null;
-            if (recordType.getFieldTypes()[i].getTypeTag() == ATypeTag.UNION) {
-                List<IAType> unionTypes = ((AUnionType) recordType.getFieldTypes()[i]).getUnionList();
-                if (unionTypes.size() != 2 && unionTypes.get(0).getTypeTag() != ATypeTag.NULL) {
-                    throw new NotImplementedException("Non-optional UNION type is not supported.");
-                }
-                tag = unionTypes.get(1).getTypeTag();
-            } else {
-                tag = recordType.getFieldTypes()[i].getTypeTag();
-            }
-            if (tag == null) {
-                throw new NotImplementedException("Failed to get the type information for field " + i + ".");
-            }
-            IValueParserFactory vpf = typeToValueParserFactMap.get(tag);
-            if (vpf == null) {
-                throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
-            }
-            fieldParserFactories[i] = vpf;
-        }
-
-        char delimiter = getDelimiter(configuration);
-        char quote = getQuote(configuration, delimiter);
-        boolean hasHeader = getHasHeader(configuration);
-
-        return conditionalPush ? new ConditionalPushTupleParserFactory(recordType, fieldParserFactories, delimiter,
-                quote, hasHeader, configuration) : new NtDelimitedDataTupleParserFactory(recordType,
-                fieldParserFactories, delimiter, quote, hasHeader);
-    }
-
-    protected ITupleParserFactory getADMDataTupleParserFactory(ARecordType recordType, boolean conditionalPush)
-            throws AsterixException {
-        try {
-            return conditionalPush ? new ConditionalPushTupleParserFactory(recordType, configuration)
-                    : new AdmSchemafullRecordParserFactory(recordType);
-        } catch (Exception e) {
-            throw new AsterixException(e);
-        }
-
-    }
+    public abstract InputDataFormat getInputDataFormat();
 
     protected void configureFormat(IAType sourceDatatype) throws Exception {
-        String propValue = (String) configuration.get(BATCH_SIZE);
-        int batchSize = propValue != null ? Integer.parseInt(propValue) : -1;
-        propValue = configuration.get(BATCH_INTERVAL);
-        long batchInterval = propValue != null ? Long.parseLong(propValue) : -1;
-        boolean conditionalPush = batchSize > 0 || batchInterval > 0;
-
-        String parserFactoryClassname = configuration.get(KEY_PARSER_FACTORY);
-        if (parserFactoryClassname == null) {
-            String specifiedFormat = configuration.get(KEY_FORMAT);
-            if (specifiedFormat == null) {
-                throw new IllegalArgumentException(" Unspecified data format");
-            } else if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
-                parserFactory = getDelimitedDataTupleParserFactory((ARecordType) sourceDatatype, conditionalPush);
-            } else if (FORMAT_ADM.equalsIgnoreCase((String) configuration.get(KEY_FORMAT))) {
-                parserFactory = getADMDataTupleParserFactory((ARecordType) sourceDatatype, conditionalPush);
-            } else {
-                throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported");
-            }
-        } else {
-            parserFactory = (ITupleParserFactory) Class.forName(parserFactoryClassname).newInstance();
-        }
+        parserFactory = new AsterixTupleParserFactory(configuration, (ARecordType) sourceDatatype, getInputDataFormat());
 
     }
 
-    // Get a delimiter from the given configuration
-    public static char getDelimiter(Map<String, String> configuration) throws AsterixException {
-        String delimiterValue = configuration.get(KEY_DELIMITER);
-        if (delimiterValue == null) {
-            delimiterValue = DEFAULT_DELIMITER;
-        } else if (delimiterValue.length() != 1) {
-            throw new AsterixException("'" + delimiterValue
-                    + "' is not a valid delimiter. The length of a delimiter should be 1.");
-        }
-        return delimiterValue.charAt(0);
-    }
-
-    // Get a quote from the given configuration when the delimiter is given
-    // Need to pass delimiter to check whether they share the same character
-    public static char getQuote(Map<String, String> configuration, char delimiter) throws AsterixException {
-        String quoteValue = configuration.get(KEY_QUOTE);
-        if (quoteValue == null) {
-            quoteValue = DEFAULT_QUOTE;
-        } else if (quoteValue.length() != 1) {
-            throw new AsterixException("'" + quoteValue + "' is not a valid quote. The length of a quote should be 1.");
-        }
-
-        // Since delimiter (char type value) can't be null,
-        // we only check whether delimiter and quote use the same character
-        if (quoteValue.charAt(0) == delimiter) {
-            throw new AsterixException("Quote '" + quoteValue + "' cannot be used with the delimiter '" + delimiter
-                    + "'. ");
-        }
-
-        return quoteValue.charAt(0);
-    }
-
-    // Get the header flag
-    public static boolean getHasHeader(Map<String, String> configuration) {
-        return Boolean.parseBoolean(configuration.get(KEY_HEADER));
-    }
+  
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java
new file mode 100644
index 0000000..4712e7c
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
+import edu.uci.ics.asterix.common.parse.ITupleForwardPolicy;
+import edu.uci.ics.asterix.external.dataset.adapter.IFeedClient.InflowState;
+import edu.uci.ics.asterix.metadata.feeds.FeedPolicyEnforcer;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+
+/**
+ * Acts as an abstract class for all pull-based external data adapters. Captures
+ * the common logic for obtaining bytes from an external source and packing them
+ * into frames as tuples.
+ */
+public abstract class ClientBasedFeedAdapter implements IFeedAdapter {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = Logger.getLogger(ClientBasedFeedAdapter.class.getName());
+    private static final int timeout = 5; // seconds
+
+    protected ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(1);
+    protected IFeedClient pullBasedFeedClient;
+    protected ARecordType adapterOutputType;
+    protected boolean continueIngestion = true;
+    protected Map<String, String> configuration;
+
+    private FrameTupleAppender appender;
+    private IFrame frame;
+    private long tupleCount = 0;
+    private final IHyracksTaskContext ctx;
+    private int frameTupleCount = 0;
+
+    protected FeedPolicyEnforcer policyEnforcer;
+
+    public FeedPolicyEnforcer getPolicyEnforcer() {
+        return policyEnforcer;
+    }
+
+    public void setFeedPolicyEnforcer(FeedPolicyEnforcer policyEnforcer) {
+        this.policyEnforcer = policyEnforcer;
+    }
+
+    public abstract IFeedClient getFeedClient(int partition) throws Exception;
+
+    public abstract ITupleForwardPolicy getTupleParserPolicy();
+
+    public ClientBasedFeedAdapter(Map<String, String> configuration, IHyracksTaskContext ctx) {
+        this.ctx = ctx;
+        this.configuration = configuration;
+    }
+
+    public long getIngestedRecordsCount() {
+        return tupleCount;
+    }
+
+    @Override
+    public void start(int partition, IFrameWriter writer) throws Exception {
+        appender = new FrameTupleAppender();
+        frame = new VSizeFrame(ctx);
+        appender.reset(frame, true);
+        ITupleForwardPolicy policy = getTupleParserPolicy();
+        policy.configure(configuration);
+        pullBasedFeedClient = getFeedClient(partition);
+        InflowState inflowState = null;
+        policy.initialize(ctx, writer);
+        while (continueIngestion) {
+            tupleBuilder.reset();
+            try {
+                inflowState = pullBasedFeedClient.nextTuple(tupleBuilder.getDataOutput(), timeout);
+                switch (inflowState) {
+                    case DATA_AVAILABLE:
+                        tupleBuilder.addFieldEndOffset();
+                        policy.addTuple(tupleBuilder);
+                        frameTupleCount++;
+                        break;
+                    case NO_MORE_DATA:
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("Reached end of feed");
+                        }
+                        policy.close();
+                        tupleCount += frameTupleCount;
+                        frameTupleCount = 0;
+                        continueIngestion = false;
+                        break;
+                    case DATA_NOT_AVAILABLE:
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Timed out on obtaining data from pull based adapter. Trying again!");
+                        }
+                        break;
+                }
+
+            } catch (Exception failureException) {
+                try {
+                    failureException.printStackTrace();
+                    boolean continueIngestion = policyEnforcer.continueIngestionPostSoftwareFailure(failureException);
+                    if (continueIngestion) {
+                        tupleBuilder.reset();
+                        continue;
+                    } else {
+                        throw failureException;
+                    }
+                } catch (Exception recoveryException) {
+                    throw new Exception(recoveryException);
+                }
+            }
+        }
+    }
+
+    /**
+     * Discontinue the ingestion of data and end the feed.
+     * 
+     * @throws Exception
+     */
+    public void stop() throws Exception {
+        continueIngestion = false;
+    }
+
+    public Map<String, String> getConfiguration() {
+        return configuration;
+    }
+
+    @Override
+    public boolean handleException(Exception e) {
+        return false;
+    }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FeedClient.java
similarity index 82%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
rename to asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FeedClient.java
index 37d93ad..da2cde0 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FeedClient.java
@@ -20,6 +20,7 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.builders.IARecordBuilder;
+import edu.uci.ics.asterix.builders.OrderedListBuilder;
 import edu.uci.ics.asterix.builders.RecordBuilder;
 import edu.uci.ics.asterix.builders.UnorderedListBuilder;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
@@ -29,6 +30,7 @@
 import edu.uci.ics.asterix.om.base.AInt32;
 import edu.uci.ics.asterix.om.base.AMutableDateTime;
 import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableOrderedList;
 import edu.uci.ics.asterix.om.base.AMutablePoint;
 import edu.uci.ics.asterix.om.base.AMutableRecord;
 import edu.uci.ics.asterix.om.base.AMutableString;
@@ -36,18 +38,16 @@
 import edu.uci.ics.asterix.om.base.AString;
 import edu.uci.ics.asterix.om.base.IACursor;
 import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.om.types.AOrderedListType;
 import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.AUnorderedListType;
 import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
 
-public abstract class PullBasedFeedClient implements IPullBasedFeedClient {
+public abstract class FeedClient implements IFeedClient {
 
-    protected static final Logger LOGGER = Logger.getLogger(PullBasedFeedClient.class.getName());
+    protected static final Logger LOGGER = Logger.getLogger(FeedClient.class.getName());
 
     protected ARecordSerializerDeserializer recordSerDe;
     protected AMutableRecord mutableRecord;
@@ -70,7 +70,7 @@
     protected ISerializerDeserializer<AInt32> int32Serde = AqlSerializerDeserializerProvider.INSTANCE
             .getSerializerDeserializer(BuiltinType.AINT32);
 
-    public abstract InflowState setNextRecord() throws Exception;
+    public abstract InflowState retrieveNextRecord() throws Exception;
 
     @Override
     public InflowState nextTuple(DataOutput dataOutput, int timeout) throws AsterixException {
@@ -79,12 +79,9 @@
             int waitCount = 0;
             boolean continueWait = true;
             while ((state == null || state.equals(InflowState.DATA_NOT_AVAILABLE)) && continueWait) {
-                state = setNextRecord();
+                state = retrieveNextRecord();
                 switch (state) {
                     case DATA_AVAILABLE:
-                        IAType t = mutableRecord.getType();
-                        ATypeTag tag = t.getTypeTag();
-                        dataOutput.writeByte(tag.serialize());
                         recordBuilder.reset(mutableRecord.getType());
                         recordBuilder.init();
                         writeRecord(mutableRecord, dataOutput, recordBuilder);
@@ -94,7 +91,7 @@
                             continueWait = false;
                         } else {
                             if (LOGGER.isLoggable(Level.WARNING)) {
-                                LOGGER.warning("Waiting to obtaing data from pull based adapter");
+                                LOGGER.warning("Waiting to obtain data from pull based adaptor");
                             }
                             Thread.sleep(1000);
                             waitCount++;
@@ -121,30 +118,35 @@
             writeObject(obj, fieldValue.getDataOutput());
             recordBuilder.addField(pos, fieldValue);
         }
-        recordBuilder.write(dataOutput, false);
+        recordBuilder.write(dataOutput, true);
     }
 
     private void writeObject(IAObject obj, DataOutput dataOutput) throws IOException, AsterixException {
         switch (obj.getType().getTypeTag()) {
-            case RECORD:
-                ATypeTag tag = obj.getType().getTypeTag();
-                try {
-                    dataOutput.writeByte(tag.serialize());
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
+            case RECORD: {
                 IARecordBuilder recordBuilder = new RecordBuilder();
                 recordBuilder.reset((ARecordType) obj.getType());
                 recordBuilder.init();
                 writeRecord((AMutableRecord) obj, dataOutput, recordBuilder);
                 break;
-            case UNORDEREDLIST:
-                tag = obj.getType().getTypeTag();
-                try {
-                    dataOutput.writeByte(tag.serialize());
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
+            }
+
+            case ORDEREDLIST: {
+                OrderedListBuilder listBuilder = new OrderedListBuilder();
+                listBuilder.reset((AOrderedListType) ((AMutableOrderedList) obj).getType());
+                IACursor cursor = ((AMutableOrderedList) obj).getCursor();
+                ArrayBackedValueStorage listItemValue = new ArrayBackedValueStorage();
+                while (cursor.next()) {
+                    listItemValue.reset();
+                    IAObject item = cursor.get();
+                    writeObject(item, listItemValue.getDataOutput());
+                    listBuilder.addItem(listItemValue);
                 }
+                listBuilder.write(dataOutput, true);
+                break;
+            }
+
+            case UNORDEREDLIST: {
                 UnorderedListBuilder listBuilder = new UnorderedListBuilder();
                 listBuilder.reset((AUnorderedListType) ((AMutableUnorderedList) obj).getType());
                 IACursor cursor = ((AMutableUnorderedList) obj).getCursor();
@@ -155,8 +157,10 @@
                     writeObject(item, listItemValue.getDataOutput());
                     listBuilder.addItem(listItemValue);
                 }
-                listBuilder.write(dataOutput, false);
+                listBuilder.write(dataOutput, true);
                 break;
+            }
+
             default:
                 AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(obj.getType()).serialize(obj,
                         dataOutput);
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
index c2a8a95..f9793f6 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
@@ -17,9 +17,8 @@
 import java.io.IOException;
 import java.io.InputStream;
 
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.runtime.operators.file.AbstractTupleParser;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -34,23 +33,22 @@
 
     public abstract InputStream getInputStream(int partition) throws IOException;
 
-    protected final ITupleParser tupleParser;
+    protected final ITupleParserFactory parserFactory;
+    protected ITupleParser tupleParser;
     protected final IAType sourceDatatype;
     protected IHyracksTaskContext ctx;
 
     public FileSystemBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx)
             throws HyracksDataException {
-        this.tupleParser = parserFactory.createTupleParser(ctx);
+        this.parserFactory = parserFactory;
         this.sourceDatatype = sourceDatatype;
         this.ctx = ctx;
     }
 
     @Override
     public void start(int partition, IFrameWriter writer) throws Exception {
+        tupleParser = parserFactory.createTupleParser(ctx);
         InputStream in = getInputStream(partition);
-        if (tupleParser instanceof AbstractTupleParser) {
-            ((AbstractTupleParser) tupleParser).setFilename(getFilename(partition));
-        }
         tupleParser.parse(in, writer);
     }
 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
index 9f4b97c..8036a7b 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -23,6 +23,7 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
+
 import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
 import edu.uci.ics.asterix.external.indexing.input.GenericFileAwareRecordReader;
 import edu.uci.ics.asterix.external.indexing.input.GenericRecordReader;
@@ -30,6 +31,7 @@
 import edu.uci.ics.asterix.external.indexing.input.TextualFullScanDataReader;
 import edu.uci.ics.asterix.metadata.entities.ExternalFile;
 import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
@@ -71,9 +73,9 @@
     @Override
     public InputStream getInputStream(int partition) throws IOException {
         if ((conf.getInputFormat() instanceof TextInputFormat || conf.getInputFormat() instanceof SequenceFileInputFormat)
-                && (HDFSAdapterFactory.FORMAT_ADM.equalsIgnoreCase((String) configuration
-                        .get(HDFSAdapterFactory.KEY_FORMAT)) || HDFSAdapterFactory.FORMAT_DELIMITED_TEXT
-                        .equalsIgnoreCase((String) configuration.get(HDFSAdapterFactory.KEY_FORMAT)))) {
+                && (AsterixTupleParserFactory.FORMAT_ADM.equalsIgnoreCase((String) configuration
+                        .get(AsterixTupleParserFactory.KEY_FORMAT)) || AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT
+                        .equalsIgnoreCase((String) configuration.get(AsterixTupleParserFactory.KEY_FORMAT)))) {
             if (files != null) {
                 return new TextualDataReader(inputSplits, readSchedule, nodeName, conf, executed, files);
             } else {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
index 9af1dd7..f730f28 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
@@ -27,6 +27,7 @@
 import edu.uci.ics.asterix.external.indexing.input.TextualDataReader;
 import edu.uci.ics.asterix.metadata.entities.ExternalFile;
 import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
@@ -64,8 +65,8 @@
     public InputStream getInputStream(int partition) throws IOException {
         if (inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_RC)) {
             return new RCFileDataReader(inputSplits, readSchedule, nodeName, conf, executed, files);
-        } else if (format.equals(HDFSAdapterFactory.FORMAT_ADM)
-                || format.equals(HDFSAdapterFactory.FORMAT_DELIMITED_TEXT)) {
+        } else if (format.equals(AsterixTupleParserFactory.FORMAT_ADM)
+                || format.equals(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) {
             return new TextualDataReader(inputSplits, readSchedule, nodeName, conf, executed, files);
         } else {
             return new GenericFileAwareRecordReader(inputSplits, readSchedule, nodeName, conf, executed, files);
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IFeedClient.java
new file mode 100644
index 0000000..02e60b4
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IFeedClient.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+
+public interface IFeedClient {
+
+    public enum InflowState {
+        NO_MORE_DATA,
+        DATA_AVAILABLE,
+        DATA_NOT_AVAILABLE
+    }
+
+    /**
+     * Writes the next fetched tuple into the provided instance of DatatOutput. Invocation of this method blocks until
+     * a new tuple has been written or the specified time has expired.
+     * 
+     * @param dataOutput
+     *            The receiving channel for the feed client to write ADM records to.
+     * @param timeout
+     *            Threshold time (expressed in seconds) for the next tuple to be obtained from the external source.
+     * @return
+     * @throws AsterixException
+     */
+    public InflowState nextTuple(DataOutput dataOutput, int timeout) throws AsterixException;
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
index 78554f2..b31e824 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
@@ -21,7 +21,6 @@
 import java.io.InputStream;
 
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.runtime.operators.file.AbstractTupleParser;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -58,15 +57,7 @@
         }
     }
 
-    @Override
-    public void start(int partition, IFrameWriter writer) throws Exception {
-        InputStream in = getInputStream(partition);
-        if (tupleParser instanceof AbstractTupleParser) {
-            ((AbstractTupleParser) tupleParser).setFilename(getFilename(partition));
-        }
-        tupleParser.parse(in, writer);
-    }
-
+   
     @Override
     public String getFilename(int partition) {
         final FileSplit fileSplit = fileSplits[partition];
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
index c739ca3..63181dc 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
@@ -9,7 +9,7 @@
 import com.microsoft.windowsazure.services.core.storage.CloudStorageAccount;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
@@ -77,4 +77,9 @@
     public DataExchangeMode getDataExchangeMode() {
         return DataExchangeMode.PULL;
     }
+
+    @Override
+    public boolean handleException(Exception e) {
+        return false;
+    }
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
index 838cfeb..27d7049 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
@@ -17,30 +17,33 @@
 import java.util.Map;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
+import edu.uci.ics.asterix.common.parse.ITupleForwardPolicy;
 import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.CounterTimerTupleForwardPolicy;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
 /**
  * An adapter that provides the functionality of receiving tweets from the
  * Twitter service in the form of ADM formatted records.
  */
-public class PullBasedTwitterAdapter extends PullBasedAdapter implements IFeedAdapter {
+public class PullBasedTwitterAdapter extends ClientBasedFeedAdapter implements IFeedAdapter {
 
     private static final long serialVersionUID = 1L;
 
-    public static final String QUERY = "query";
-    public static final String INTERVAL = "interval";
+    private static final int DEFAULT_BATCH_SIZE = 5;
 
     private ARecordType recordType;
     private PullBasedTwitterFeedClient tweetClient;
 
     @Override
-    public IPullBasedFeedClient getFeedClient(int partition) {
+    public IFeedClient getFeedClient(int partition) {
         return tweetClient;
     }
 
-    public PullBasedTwitterAdapter(Map<String, String> configuration, ARecordType recordType, IHyracksTaskContext ctx) throws AsterixException {
+    public PullBasedTwitterAdapter(Map<String, String> configuration, ARecordType recordType, IHyracksTaskContext ctx)
+            throws AsterixException {
         super(configuration, ctx);
         tweetClient = new PullBasedTwitterFeedClient(ctx, recordType, this);
     }
@@ -54,4 +57,20 @@
         return DataExchangeMode.PULL;
     }
 
+    @Override
+    public boolean handleException(Exception e) {
+        return true;
+    }
+
+    @Override
+    public ITupleForwardPolicy getTupleParserPolicy() {
+        configuration.put(ITupleForwardPolicy.PARSER_POLICY,
+                ITupleForwardPolicy.TupleForwardPolicyType.COUNTER_TIMER_EXPIRED.name());
+        String propValue = configuration.get(CounterTimerTupleForwardPolicy.BATCH_SIZE);
+        if (propValue == null) {
+            configuration.put(CounterTimerTupleForwardPolicy.BATCH_SIZE, "" + DEFAULT_BATCH_SIZE);
+        }
+        return AsterixTupleParserFactory.getTupleParserPolicy(configuration);
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
index 2c8d659..2b68c88 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
@@ -16,93 +16,86 @@
 
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 
 import twitter4j.Query;
 import twitter4j.QueryResult;
-import twitter4j.Tweet;
+import twitter4j.Status;
 import twitter4j.Twitter;
 import twitter4j.TwitterException;
-import twitter4j.TwitterFactory;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import edu.uci.ics.asterix.om.base.AMutableRecord;
-import edu.uci.ics.asterix.om.base.AMutableString;
-import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.external.util.TweetProcessor;
+import edu.uci.ics.asterix.external.util.TwitterUtil;
+import edu.uci.ics.asterix.external.util.TwitterUtil.SearchAPIConstants;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
 /**
- * An implementation of @see {PullBasedFeedClient} for the Twitter service.
- * The feed client fetches data from Twitter service by sending request at
- * regular (configurable) interval.
+ * An implementation of @see {PullBasedFeedClient} for the Twitter service. The
+ * feed client fetches data from Twitter service by sending request at regular
+ * (configurable) interval.
  */
-public class PullBasedTwitterFeedClient extends PullBasedFeedClient {
+public class PullBasedTwitterFeedClient extends FeedClient {
 
     private String keywords;
     private Query query;
     private Twitter twitter;
-    private int requestInterval = 10; // seconds
+    private int requestInterval = 5; // seconds
     private QueryResult result;
 
-    private IAObject[] mutableFields;
-    private String[] tupleFieldValues;
     private ARecordType recordType;
     private int nextTweetIndex = 0;
+    private long lastTweetIdReceived = 0;
+    private TweetProcessor tweetProcessor;
 
     public PullBasedTwitterFeedClient(IHyracksTaskContext ctx, ARecordType recordType, PullBasedTwitterAdapter adapter) {
-        twitter = new TwitterFactory().getInstance();
-        mutableFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableString(null),
-                new AMutableString(null), new AMutableString(null) };
+        this.twitter = TwitterUtil.getTwitterService(adapter.getConfiguration());
         this.recordType = recordType;
-        recordSerDe = new ARecordSerializerDeserializer(recordType);
-        mutableRecord = new AMutableRecord(recordType, mutableFields);
-        tupleFieldValues = new String[recordType.getFieldNames().length];
-        initialize(adapter.getConfiguration());
+        this.tweetProcessor = new TweetProcessor(recordType);
+        this.recordSerDe = new ARecordSerializerDeserializer(recordType);
+        this.mutableRecord = tweetProcessor.getMutableRecord();
+        this.initialize(adapter.getConfiguration());
     }
 
     public ARecordType getRecordType() {
         return recordType;
     }
 
-    public AMutableRecord getMutableRecord() {
-        return mutableRecord;
-    }
-
     @Override
-    public InflowState setNextRecord() throws Exception {
-        Tweet tweet;
+    public InflowState retrieveNextRecord() throws Exception {
+        Status tweet;
         tweet = getNextTweet();
         if (tweet == null) {
             return InflowState.DATA_NOT_AVAILABLE;
         }
-        int numFields = recordType.getFieldNames().length;
-        tupleFieldValues[0] = UUID.randomUUID().toString();
-        tupleFieldValues[1] = tweet.getFromUser();
-        tupleFieldValues[2] = tweet.getLocation() == null ? "" : tweet.getLocation();
-        tupleFieldValues[3] = tweet.getText();
-        tupleFieldValues[4] = tweet.getCreatedAt().toString();
-        for (int i = 0; i < numFields; i++) {
-            ((AMutableString) mutableFields[i]).setValue(tupleFieldValues[i]);
-            mutableRecord.setValueAtPos(i, mutableFields[i]);
-        }
+
+        tweetProcessor.processNextTweet(tweet);
         return InflowState.DATA_AVAILABLE;
     }
 
     private void initialize(Map<String, String> params) {
-        this.keywords = (String) params.get(PullBasedTwitterAdapter.QUERY);
-        this.requestInterval = Integer.parseInt((String) params.get(PullBasedTwitterAdapter.INTERVAL));
+        this.keywords = (String) params.get(SearchAPIConstants.QUERY);
+        this.requestInterval = Integer.parseInt((String) params.get(SearchAPIConstants.INTERVAL));
         this.query = new Query(keywords);
-        query.setRpp(100);
+        this.query.setCount(100);
     }
 
-    private Tweet getNextTweet() throws TwitterException, InterruptedException {
+    private Status getNextTweet() throws TwitterException, InterruptedException {
         if (result == null || nextTweetIndex >= result.getTweets().size()) {
             Thread.sleep(1000 * requestInterval);
+            query.setSinceId(lastTweetIdReceived);
             result = twitter.search(query);
             nextTweetIndex = 0;
         }
-        List<Tweet> tw = result.getTweets();
-        return tw.get(nextTweetIndex++);
+        if (result != null && !result.getTweets().isEmpty()) {
+            List<Status> tw = result.getTweets();
+            Status tweet = tw.get(nextTweetIndex++);
+            if (lastTweetIdReceived < tweet.getId()) {
+                lastTweetIdReceived = tweet.getId();
+            }
+            return tweet;
+        } else {
+            return null;
+        }
     }
 
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PushBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PushBasedTwitterAdapter.java
new file mode 100644
index 0000000..8bc9a37
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PushBasedTwitterAdapter.java
@@ -0,0 +1,52 @@
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.parse.ITupleForwardPolicy;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.CounterTimerTupleForwardPolicy;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class PushBasedTwitterAdapter extends ClientBasedFeedAdapter {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final int DEFAULT_BATCH_SIZE = 50;
+
+    private PushBasedTwitterFeedClient tweetClient;
+
+    public PushBasedTwitterAdapter(Map<String, String> configuration, ARecordType recordType, IHyracksTaskContext ctx) throws AsterixException {
+        super(configuration, ctx);
+        this.configuration = configuration;
+        this.tweetClient = new PushBasedTwitterFeedClient(ctx, recordType, this);
+    }
+
+    @Override
+    public DataExchangeMode getDataExchangeMode() {
+        return DataExchangeMode.PUSH;
+    }
+
+    @Override
+    public boolean handleException(Exception e) {
+        return true;
+    }
+
+    @Override
+    public IFeedClient getFeedClient(int partition) throws Exception {
+        return tweetClient;
+    }
+
+    @Override
+    public ITupleForwardPolicy getTupleParserPolicy() {
+        configuration.put(ITupleForwardPolicy.PARSER_POLICY,
+                ITupleForwardPolicy.TupleForwardPolicyType.COUNTER_TIMER_EXPIRED.name());
+        String propValue = configuration.get(CounterTimerTupleForwardPolicy.BATCH_SIZE);
+        if (propValue == null) {
+            configuration.put(CounterTimerTupleForwardPolicy.BATCH_SIZE, "" + DEFAULT_BATCH_SIZE);
+        }
+        return AsterixTupleParserFactory.getTupleParserPolicy(configuration);
+    }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java
new file mode 100644
index 0000000..908fd34
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import twitter4j.FilterQuery;
+import twitter4j.Query;
+import twitter4j.StallWarning;
+import twitter4j.Status;
+import twitter4j.StatusDeletionNotice;
+import twitter4j.StatusListener;
+import twitter4j.TwitterStream;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import edu.uci.ics.asterix.external.util.TweetProcessor;
+import edu.uci.ics.asterix.external.util.TwitterUtil;
+import edu.uci.ics.asterix.external.util.TwitterUtil.SearchAPIConstants;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * An implementation of @see {PullBasedFeedClient} for the Twitter service. The
+ * feed client fetches data from Twitter service by sending request at regular
+ * (configurable) interval.
+ */
+public class PushBasedTwitterFeedClient extends FeedClient {
+
+    private String keywords;
+    private Query query;
+
+    private ARecordType recordType;
+    private TweetProcessor tweetProcessor;
+    private LinkedBlockingQueue<Status> inputQ;
+
+    public PushBasedTwitterFeedClient(IHyracksTaskContext ctx, ARecordType recordType, PushBasedTwitterAdapter adapter) throws AsterixException {
+        this.recordType = recordType;
+        this.tweetProcessor = new TweetProcessor(recordType);
+        this.recordSerDe = new ARecordSerializerDeserializer(recordType);
+        this.mutableRecord = tweetProcessor.getMutableRecord();
+        this.initialize(adapter.getConfiguration());
+        this.inputQ = new LinkedBlockingQueue<Status>();
+        TwitterStream twitterStream = TwitterUtil.getTwitterStream(adapter.getConfiguration());
+        twitterStream.addListener(new TweetListener(inputQ));
+        FilterQuery query = TwitterUtil.getFilterQuery(adapter.getConfiguration());
+        if (query != null) {
+            twitterStream.filter(query);
+        } else {
+            twitterStream.sample();
+        }
+    }
+
+    public ARecordType getRecordType() {
+        return recordType;
+    }
+
+    private class TweetListener implements StatusListener {
+
+        private LinkedBlockingQueue<Status> inputQ;
+
+        public TweetListener(LinkedBlockingQueue<Status> inputQ) {
+            this.inputQ = inputQ;
+        }
+
+        @Override
+        public void onStatus(Status tweet) {
+            inputQ.add(tweet);
+        }
+
+        @Override
+        public void onException(Exception arg0) {
+
+        }
+
+        @Override
+        public void onDeletionNotice(StatusDeletionNotice arg0) {
+        }
+
+        @Override
+        public void onScrubGeo(long arg0, long arg1) {
+        }
+
+        @Override
+        public void onStallWarning(StallWarning arg0) {
+        }
+
+        @Override
+        public void onTrackLimitationNotice(int arg0) {
+        }
+    }
+
+    @Override
+    public InflowState retrieveNextRecord() throws Exception {
+        Status tweet = inputQ.take();
+        tweetProcessor.processNextTweet(tweet);
+        return InflowState.DATA_AVAILABLE;
+    }
+
+    private void initialize(Map<String, String> params) {
+        this.keywords = (String) params.get(SearchAPIConstants.QUERY);
+        this.query = new Query(keywords);
+        this.query.setCount(100);
+    }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
index 4eea034..bcf809d 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -19,14 +19,16 @@
 import java.util.Map;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
+import edu.uci.ics.asterix.common.parse.ITupleForwardPolicy;
 import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
 /**
  * RSSFeedAdapter provides the functionality of fetching an RSS based feed.
  */
-public class RSSFeedAdapter extends PullBasedAdapter implements IFeedAdapter {
+public class RSSFeedAdapter extends ClientBasedFeedAdapter implements IFeedAdapter {
 
     private static final long serialVersionUID = 1L;
 
@@ -35,7 +37,7 @@
     private List<String> feedURLs = new ArrayList<String>();
     private String id_prefix = "";
 
-    private IPullBasedFeedClient rssFeedClient;
+    private IFeedClient rssFeedClient;
 
     private ARecordType recordType;
 
@@ -62,7 +64,7 @@
     }
 
     @Override
-    public IPullBasedFeedClient getFeedClient(int partition) throws Exception {
+    public IFeedClient getFeedClient(int partition) throws Exception {
         if (rssFeedClient == null) {
             rssFeedClient = new RSSFeedClient(this, feedURLs.get(partition), id_prefix);
         }
@@ -78,4 +80,14 @@
         return DataExchangeMode.PULL;
     }
 
+    @Override
+    public boolean handleException(Exception e) {
+        return false;
+    }
+
+    @Override
+    public ITupleForwardPolicy getTupleParserPolicy() {
+        return AsterixTupleParserFactory.getTupleParserPolicy(configuration);
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
index 41ed923..e7cbd16 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
@@ -39,7 +39,7 @@
  * fetching from an RSS feed source at regular interval.
  */
 @SuppressWarnings("rawtypes")
-public class RSSFeedClient extends PullBasedFeedClient {
+public class RSSFeedClient extends FeedClient {
 
     private long id = 0;
     private String idPrefix;
@@ -79,7 +79,7 @@
     }
 
     @Override
-    public InflowState setNextRecord() throws Exception {
+    public InflowState retrieveNextRecord() throws Exception {
         SyndEntryImpl feedEntry = getNextRSSFeed();
         if (feedEntry == null) {
             return InflowState.DATA_NOT_AVAILABLE;
@@ -133,9 +133,9 @@
 
 class FetcherEventListenerImpl implements FetcherListener {
 
-    private final IPullBasedFeedClient feedClient;
+    private final IFeedClient feedClient;
 
-    public FetcherEventListenerImpl(IPullBasedFeedClient feedClient) {
+    public FetcherEventListenerImpl(IFeedClient feedClient) {
         this.feedClient = feedClient;
     }
 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
index 80965b0..25a0221 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
@@ -5,7 +5,7 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -25,7 +25,7 @@
 
     protected final IAType sourceDatatype;
 
-    public StreamBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx)
+    public StreamBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx, int partition)
             throws HyracksDataException {
         this.tupleParser = parserFactory.createTupleParser(ctx);
         this.sourceDatatype = sourceDatatype;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
index e2ef9fd..ca15b359 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.asterix.external.adapter.factory.StreamBasedAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.runtime.operators.file.ADMDataParser;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
 import edu.uci.ics.asterix.runtime.operators.file.DelimitedDataParser;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -84,11 +85,11 @@
          * 2. RC indexing tuple parser
          * 3. textual data tuple parser
          */
-        if (format.equalsIgnoreCase(StreamBasedAdapterFactory.FORMAT_ADM)) {
+        if (format.equalsIgnoreCase(AsterixTupleParserFactory.FORMAT_ADM)) {
             // choice 3 with adm data parser
             ADMDataParser dataParser = new ADMDataParser();
             return new AdmOrDelimitedIndexingTupleParser(ctx, atype, dataParser);
-        } else if (format.equalsIgnoreCase(StreamBasedAdapterFactory.FORMAT_DELIMITED_TEXT)) {
+        } else if (format.equalsIgnoreCase(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) {
             // choice 3 with delimited data parser
             DelimitedDataParser dataParser = HDFSIndexingAdapterFactory.getDelimitedDataParser(atype,
                 delimiter, quote); 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSLookupAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
index c6176fc..7e0d9f9 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
@@ -35,6 +35,7 @@
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.runtime.operators.file.ADMDataParser;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
 import edu.uci.ics.asterix.runtime.operators.file.DelimitedDataParser;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -81,7 +82,7 @@
         // Create the lookup reader and the controlled parser
         if (configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_RC)) {
             configureRCFile(jobConf, iNullWriterFactory);
-        } else if (configuration.get(HDFSAdapterFactory.KEY_FORMAT).equals(HDFSAdapterFactory.FORMAT_ADM)) {
+        } else if (configuration.get(AsterixTupleParserFactory.KEY_FORMAT).equals(AsterixTupleParserFactory.FORMAT_ADM)) {
             // create an adm parser
             ADMDataParser dataParser = new ADMDataParser();
             if (configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT)) {
@@ -95,10 +96,10 @@
                 parser = new AdmOrDelimitedControlledTupleParser(ctx, (ARecordType) atype, in, propagateInput,
                         inRecDesc, dataParser, propagatedFields, ridFields, retainNull, iNullWriterFactory);
             }
-        } else if (configuration.get(HDFSAdapterFactory.KEY_FORMAT).equals(HDFSAdapterFactory.FORMAT_DELIMITED_TEXT)) {
+        } else if (configuration.get(AsterixTupleParserFactory.KEY_FORMAT).equals(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) {
             // create a delimited text parser
-            char delimiter = StreamBasedAdapterFactory.getDelimiter(configuration);
-            char quote = StreamBasedAdapterFactory.getQuote(configuration, delimiter);
+            char delimiter = AsterixTupleParserFactory.getDelimiter(configuration);
+            char quote = AsterixTupleParserFactory.getQuote(configuration, delimiter);
 
             DelimitedDataParser dataParser = HDFSIndexingAdapterFactory.getDelimitedDataParser((ARecordType) atype,
                     delimiter, quote);
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunction.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunction.java
index a148e66..57a8e48 100755
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunction.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunction.java
@@ -90,9 +90,9 @@
                 castBuffer.reset();
                 ATypeHierarchy.convertNumericTypeByteArray(inputVal.getByteArray(), inputVal.getStartOffset(),
                         inputVal.getLength(), targetTypeTag, castBuffer.getDataOutput());
-                functionHelper.setArgument(i, castBuffer.getByteArray());
+                functionHelper.setArgument(i, castBuffer);
             } else {
-                functionHelper.setArgument(i, inputVal.getByteArray());
+                functionHelper.setArgument(i, inputVal);
             }
         }
     }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java
index fc629ea..d989323 100755
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java
@@ -14,9 +14,6 @@
  */
 package edu.uci.ics.asterix.external.library;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import edu.uci.ics.asterix.om.functions.IExternalFunctionInfo;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
@@ -26,19 +23,14 @@
 
 public class ExternalFunctionProvider {
 
-    private static Map<IExternalFunctionInfo, ExternalScalarFunction> functionRepo = new HashMap<IExternalFunctionInfo, ExternalScalarFunction>();
-
     public static IExternalFunction getExternalFunctionEvaluator(IExternalFunctionInfo finfo,
             ICopyEvaluatorFactory args[], IDataOutputProvider outputProvider) throws AlgebricksException {
         switch (finfo.getKind()) {
             case SCALAR:
-                ExternalScalarFunction function = functionRepo.get(finfo);
-                function = new ExternalScalarFunction(finfo, args, outputProvider);
-                // functionRepo.put(finfo, function);
-                return function;
+                return new ExternalScalarFunction(finfo, args, outputProvider);
             case AGGREGATE:
             case UNNEST:
-                throw new IllegalArgumentException(" not supported function kind" + finfo.getKind());
+                throw new IllegalArgumentException(" UDF of kind" + finfo.getKind() + " not supported.");
             default:
                 throw new IllegalArgumentException(" unknown function kind" + finfo.getKind());
         }
@@ -62,9 +54,10 @@
         try {
             setArguments(tuple);
             evaluate(functionHelper);
+            functionHelper.reset();
         } catch (Exception e) {
             e.printStackTrace();
-            throw new AlgebricksException(e);
+            //throw new AlgebricksException(e);
         }
     }
 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java
index 43eef52..6109588 100755
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java
@@ -29,4 +29,6 @@
     public void setResult(IJObject result) throws IOException, AsterixException;
 
     public IJObject getObject(JTypeTag jtypeTag);
+
+    public void reset();
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JTypeObjectFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JTypeObjectFactory.java
index 3c5ddfd..4beb259b 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JTypeObjectFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JTypeObjectFactory.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.asterix.external.library;
 
-import java.util.ArrayList;
 import java.util.List;
 
 import edu.uci.ics.asterix.external.library.java.IJObject;
@@ -48,6 +47,11 @@
 
 public class JTypeObjectFactory implements IObjectFactory<IJObject, IAType> {
 
+    public static final JTypeObjectFactory INSTANCE = new JTypeObjectFactory();
+
+    private JTypeObjectFactory() {
+    }
+
     @Override
     public IJObject create(IAType type) {
         IJObject retValue = null;
@@ -77,7 +81,7 @@
                 retValue = new JPoint3D(0, 0, 0);
                 break;
             case POLYGON:
-                retValue = new JPolygon(new ArrayList<JPoint>());
+                retValue = new JPolygon(new JPoint[] {});
                 break;
             case LINE:
                 retValue = new JLine(new JPoint(0, 0), new JPoint(0, 0));
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java
index 192cf3e..7f5b3bc 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java
@@ -14,59 +14,55 @@
  */
 package edu.uci.ics.asterix.external.library;
 
-import java.io.DataOutput;
 import java.io.IOException;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 
-import edu.uci.ics.asterix.builders.RecordBuilder;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.external.library.java.IJObject;
-import edu.uci.ics.asterix.external.library.java.JObjectUtil;
-import edu.uci.ics.asterix.external.library.java.JObjects.ByteArrayAccessibleDataInputStream;
-import edu.uci.ics.asterix.external.library.java.JObjects.ByteArrayAccessibleInputStream;
-import edu.uci.ics.asterix.external.library.java.JObjects.JRecord;
+import edu.uci.ics.asterix.external.library.java.JObjectPointableVisitor;
 import edu.uci.ics.asterix.external.library.java.JTypeTag;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ARecord;
-import edu.uci.ics.asterix.om.base.AString;
-import edu.uci.ics.asterix.om.base.IAObject;
 import edu.uci.ics.asterix.om.functions.IExternalFunctionInfo;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.pointables.AFlatValuePointable;
+import edu.uci.ics.asterix.om.pointables.AListPointable;
+import edu.uci.ics.asterix.om.pointables.ARecordPointable;
+import edu.uci.ics.asterix.om.pointables.PointableAllocator;
+import edu.uci.ics.asterix.om.pointables.base.IVisitablePointable;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.om.util.container.IObjectPool;
 import edu.uci.ics.asterix.om.util.container.ListObjectPool;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.data.std.api.IValueReference;
 
 public class JavaFunctionHelper implements IFunctionHelper {
 
     private final IExternalFunctionInfo finfo;
     private final IDataOutputProvider outputProvider;
-    private IJObject[] arguments;
+    private final IJObject[] arguments;
     private IJObject resultHolder;
-    private ISerializerDeserializer resultSerde;
-    private IObjectPool<IJObject, IAType> objectPool = new ListObjectPool<IJObject, IAType>(new JTypeObjectFactory());
-    byte[] buffer = new byte[32768];
-    ByteArrayAccessibleInputStream bis = new ByteArrayAccessibleInputStream(buffer, 0, buffer.length);
-    ByteArrayAccessibleDataInputStream dis = new ByteArrayAccessibleDataInputStream(bis);
+    private final IObjectPool<IJObject, IAType> objectPool = new ListObjectPool<IJObject, IAType>(
+            JTypeObjectFactory.INSTANCE);
+    private final JObjectPointableVisitor pointableVisitor;
+    private final PointableAllocator pointableAllocator;
+    private final Map<Integer, TypeInfo> poolTypeInfo;
 
     public JavaFunctionHelper(IExternalFunctionInfo finfo, IDataOutputProvider outputProvider)
             throws AlgebricksException {
         this.finfo = finfo;
         this.outputProvider = outputProvider;
-        List<IAType> params = finfo.getParamList();
-        arguments = new IJObject[params.size()];
+        this.pointableVisitor = new JObjectPointableVisitor();
+        this.pointableAllocator = new PointableAllocator();
+        this.arguments = new IJObject[finfo.getParamList().size()];
         int index = 0;
-        for (IAType param : params) {
-            this.arguments[index] = objectPool.allocate(param);
-            index++;
+        for (IAType param : finfo.getParamList()) {
+            this.arguments[index++] = objectPool.allocate(param);
         }
-        resultHolder = objectPool.allocate(finfo.getReturnType());
+        this.resultHolder = objectPool.allocate(finfo.getReturnType());
+        this.poolTypeInfo = new HashMap<Integer, TypeInfo>();
+
     }
 
     @Override
@@ -76,110 +72,55 @@
 
     @Override
     public void setResult(IJObject result) throws IOException, AsterixException {
-        IAObject obj = result.getIAObject();
         try {
-            outputProvider.getDataOutput().writeByte(obj.getType().getTypeTag().serialize());
-        } catch (IOException e) {
+            result.serialize(outputProvider.getDataOutput(), true);
+            result.reset();
+        } catch (IOException  | AlgebricksException e) {
             throw new HyracksDataException(e);
         }
-
-        if (obj.getType().getTypeTag().equals(ATypeTag.RECORD)) {
-            ARecordType recType = (ARecordType) obj.getType();
-            if (recType.isOpen()) {
-                writeOpenRecord((JRecord) result, outputProvider.getDataOutput());
-            } else {
-                resultSerde = AqlSerializerDeserializerProvider.INSTANCE.getNonTaggedSerializerDeserializer(recType);
-                resultSerde.serialize(obj, outputProvider.getDataOutput());
-            }
-        } else {
-            resultSerde = AqlSerializerDeserializerProvider.INSTANCE.getNonTaggedSerializerDeserializer(obj.getType());
-            resultSerde.serialize(obj, outputProvider.getDataOutput());
-        }
-        reset();
     }
 
-    private void writeOpenRecord(JRecord jRecord, DataOutput dataOutput) throws AsterixException, IOException {
-        ARecord aRecord = (ARecord) jRecord.getIAObject();
-        RecordBuilder recordBuilder = new RecordBuilder();
-        ARecordType recordType = aRecord.getType();
-        recordBuilder.reset(recordType);
-        ArrayBackedValueStorage fieldName = new ArrayBackedValueStorage();
-        ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
-        List<Boolean> openField = jRecord.getOpenField();
-
-        int fieldIndex = 0;
-        int closedFieldId = 0;
-        for (IJObject field : jRecord.getFields()) {
-            fieldValue.reset();
-            switch (field.getTypeTag()) {
-                case RECORD:
-                    ARecordType recType = (ARecordType) field.getIAObject().getType();
-                    if (recType.isOpen()) {
-                        fieldValue.getDataOutput().writeByte(recType.getTypeTag().serialize());
-                        writeOpenRecord((JRecord) field, fieldValue.getDataOutput());
-                    } else {
-                        AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(
-                                field.getIAObject().getType()).serialize(field.getIAObject(),
-                                fieldValue.getDataOutput());
-                    }
-                    break;
-                default:
-                    AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(field.getIAObject().getType())
-                            .serialize(field.getIAObject(), fieldValue.getDataOutput());
-                    break;
-            }
-            if (openField.get(fieldIndex)) {
-                String fName = jRecord.getFieldNames().get(fieldIndex);
-                fieldName.reset();
-                AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING).serialize(
-                        new AString(fName), fieldName.getDataOutput());
-                recordBuilder.addField(fieldName, fieldValue);
-            } else {
-                recordBuilder.addField(closedFieldId, fieldValue);
-                closedFieldId++;
-            }
-            fieldIndex++;
-        }
-
-        recordBuilder.write(dataOutput, false);
-
-    }
-
-    private void reset() {
-        for (IJObject jObject : arguments) {
-            switch (jObject.getTypeTag()) {
-                case RECORD:
-                    reset((JRecord) jObject);
-                    break;
-            }
-        }
-        switch (resultHolder.getTypeTag()) {
+    public void setArgument(int index, IValueReference valueReference) throws IOException, AsterixException {
+        IVisitablePointable pointable = null;
+        IJObject jObject = null;
+        IAType type = finfo.getParamList().get(index);
+        switch (type.getTypeTag()) {
             case RECORD:
-                reset((JRecord) resultHolder);
+                pointable = pointableAllocator.allocateRecordValue(type);
+                pointable.set(valueReference);
+                jObject = pointableVisitor.visit((ARecordPointable) pointable, getTypeInfo(index, type));
+                break;
+            case ORDEREDLIST:
+            case UNORDEREDLIST:
+                pointable = pointableAllocator.allocateListValue(type);
+                pointable.set(valueReference);
+                jObject = pointableVisitor.visit((AListPointable) pointable, getTypeInfo(index, type));
+                break;
+            case ANY:
+                throw new IllegalStateException("Cannot handle a function argument of type " + type.getTypeTag());
+            default:
+                pointable = pointableAllocator.allocateFieldValue(type);
+                pointable.set(valueReference);
+                jObject = pointableVisitor.visit((AFlatValuePointable) pointable, getTypeInfo(index, type));
                 break;
         }
+        arguments[index] = jObject;
     }
 
-    private void reset(JRecord jRecord) {
-        List<IJObject> fields = ((JRecord) jRecord).getFields();
-        for (IJObject field : fields) {
-            switch (field.getTypeTag()) {
-                case RECORD:
-                    reset((JRecord) field);
-                    break;
-            }
+    private TypeInfo getTypeInfo(int index, IAType type) {
+        TypeInfo typeInfo = poolTypeInfo.get(index);
+        if (typeInfo == null) {
+            typeInfo = new TypeInfo(objectPool, type, type.getTypeTag());
+            poolTypeInfo.put(index, typeInfo);
         }
-        jRecord.close();
-    }
-
-    public void setArgument(int index, byte[] argument) throws IOException, AsterixException {
-        bis.setContent(argument, 1, argument.length);
-        IAType type = finfo.getParamList().get(index);
-        arguments[index] = JObjectUtil.getJType(type.getTypeTag(), type, dis, objectPool);
+        return typeInfo;
     }
 
     @Override
     public IJObject getResultObject() {
+        if (resultHolder == null) {
+            resultHolder = objectPool.allocate(finfo.getReturnType());
+        }
         return resultHolder;
     }
 
@@ -197,4 +138,10 @@
         return retValue;
     }
 
+    @Override
+    public void reset() {
+        pointableAllocator.reset();
+        objectPool.reset();
+    }
+
 }
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/TypeInfo.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/TypeInfo.java
new file mode 100644
index 0000000..e2c66ca
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/TypeInfo.java
@@ -0,0 +1,49 @@
+package edu.uci.ics.asterix.external.library;
+
+import edu.uci.ics.asterix.external.library.java.IJObject;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.container.IObjectPool;
+
+public class TypeInfo {
+
+    private IObjectPool<IJObject, IAType> objectPool;
+    private IAType atype;
+    private ATypeTag typeTag;
+
+    public TypeInfo(IObjectPool<IJObject, IAType> objectPool, IAType atype, ATypeTag typeTag) {
+        this.objectPool = objectPool;
+        this.atype = atype;
+        this.typeTag = typeTag;
+    }
+
+    public void reset(IAType atype, ATypeTag typeTag) {
+        this.atype = atype;
+        this.typeTag = typeTag;
+    }
+
+    public IObjectPool<IJObject, IAType> getObjectPool() {
+        return objectPool;
+    }
+
+    public void setObjectPool(IObjectPool<IJObject, IAType> objectPool) {
+        this.objectPool = objectPool;
+    }
+
+    public IAType getAtype() {
+        return atype;
+    }
+
+    public void setAtype(IAType atype) {
+        this.atype = atype;
+    }
+
+    public ATypeTag getTypeTag() {
+        return typeTag;
+    }
+
+    public void setTypeTag(ATypeTag typeTag) {
+        this.typeTag = typeTag;
+    }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJListAccessor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJListAccessor.java
new file mode 100644
index 0000000..87db84a
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJListAccessor.java
@@ -0,0 +1,12 @@
+package edu.uci.ics.asterix.external.library.java;
+
+import edu.uci.ics.asterix.om.pointables.AListPointable;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.container.IObjectPool;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IJListAccessor {
+
+    IJObject access(AListPointable pointable, IObjectPool<IJObject, IAType> objectPool, IAType listType,
+            JObjectPointableVisitor pointableVisitor) throws HyracksDataException;
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObject.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObject.java
index ff8e563..3567e7f 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObject.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObject.java
@@ -14,12 +14,20 @@
  */
 package edu.uci.ics.asterix.external.library.java;
 
+import java.io.DataOutput;
+
 import edu.uci.ics.asterix.om.base.IAObject;
 import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 public interface IJObject {
 
     public ATypeTag getTypeTag();
 
     public IAObject getIAObject();
+
+    public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException;
+
+    public void reset() throws AlgebricksException;
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObjectAccessor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObjectAccessor.java
new file mode 100644
index 0000000..6967243
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObjectAccessor.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.asterix.external.library.java;
+
+import edu.uci.ics.asterix.om.pointables.base.IVisitablePointable;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.container.IObjectPool;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IJObjectAccessor {
+    IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> obj) throws HyracksDataException;
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJRecordAccessor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJRecordAccessor.java
new file mode 100644
index 0000000..55ae262
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJRecordAccessor.java
@@ -0,0 +1,15 @@
+package edu.uci.ics.asterix.external.library.java;
+
+import edu.uci.ics.asterix.external.library.java.JObjects.JRecord;
+import edu.uci.ics.asterix.om.pointables.ARecordPointable;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.container.IObjectPool;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IJRecordAccessor {
+
+    public JRecord access(ARecordPointable pointable, IObjectPool<IJObject, IAType> objectPool, ARecordType recordType,
+            JObjectPointableVisitor pointableVisitor) throws HyracksDataException;
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java
new file mode 100644
index 0000000..02d11b9
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java
@@ -0,0 +1,571 @@
+package edu.uci.ics.asterix.external.library.java;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ACircleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADateSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADurationSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AIntervalSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ALineSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.APoint3DSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.APointSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.APolygonSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARectangleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ATimeSerializerDeserializer;
+import edu.uci.ics.asterix.external.library.TypeInfo;
+import edu.uci.ics.asterix.external.library.java.JObjects.JBoolean;
+import edu.uci.ics.asterix.external.library.java.JObjects.JByte;
+import edu.uci.ics.asterix.external.library.java.JObjects.JCircle;
+import edu.uci.ics.asterix.external.library.java.JObjects.JDate;
+import edu.uci.ics.asterix.external.library.java.JObjects.JDateTime;
+import edu.uci.ics.asterix.external.library.java.JObjects.JDouble;
+import edu.uci.ics.asterix.external.library.java.JObjects.JDuration;
+import edu.uci.ics.asterix.external.library.java.JObjects.JFloat;
+import edu.uci.ics.asterix.external.library.java.JObjects.JInt;
+import edu.uci.ics.asterix.external.library.java.JObjects.JInterval;
+import edu.uci.ics.asterix.external.library.java.JObjects.JLine;
+import edu.uci.ics.asterix.external.library.java.JObjects.JList;
+import edu.uci.ics.asterix.external.library.java.JObjects.JLong;
+import edu.uci.ics.asterix.external.library.java.JObjects.JOrderedList;
+import edu.uci.ics.asterix.external.library.java.JObjects.JPoint;
+import edu.uci.ics.asterix.external.library.java.JObjects.JPoint3D;
+import edu.uci.ics.asterix.external.library.java.JObjects.JPolygon;
+import edu.uci.ics.asterix.external.library.java.JObjects.JRecord;
+import edu.uci.ics.asterix.external.library.java.JObjects.JRectangle;
+import edu.uci.ics.asterix.external.library.java.JObjects.JString;
+import edu.uci.ics.asterix.external.library.java.JObjects.JTime;
+import edu.uci.ics.asterix.external.library.java.JObjects.JUnorderedList;
+import edu.uci.ics.asterix.external.util.TweetProcessor;
+import edu.uci.ics.asterix.om.base.ACircle;
+import edu.uci.ics.asterix.om.base.ADuration;
+import edu.uci.ics.asterix.om.base.ALine;
+import edu.uci.ics.asterix.om.base.APoint;
+import edu.uci.ics.asterix.om.base.APoint3D;
+import edu.uci.ics.asterix.om.base.APolygon;
+import edu.uci.ics.asterix.om.base.ARectangle;
+import edu.uci.ics.asterix.om.pointables.AFlatValuePointable;
+import edu.uci.ics.asterix.om.pointables.AListPointable;
+import edu.uci.ics.asterix.om.pointables.ARecordPointable;
+import edu.uci.ics.asterix.om.pointables.base.IVisitablePointable;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AbstractCollectionType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.container.IObjectPool;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class JObjectAccessors {
+
+    public static IJObjectAccessor createFlatJObjectAccessor(ATypeTag aTypeTag) {
+        IJObjectAccessor accessor = null;
+        switch (aTypeTag) {
+            case BOOLEAN:
+                accessor = new JBooleanAccessor();
+                break;
+            case INT8:
+                accessor = new JInt8Accessor();
+                break;
+            case INT16:
+                accessor = new JInt16Accessor();
+                break;
+            case INT32:
+                accessor = new JInt32Accessor();
+                break;
+            case INT64:
+                accessor = new JInt64Accessor();
+                break;
+            case FLOAT:
+                accessor = new JFloatAccessor();
+                break;
+            case DOUBLE:
+                accessor = new JDoubleAccessor();
+                break;
+            case STRING:
+                accessor = new JStringAccessor();
+                break;
+            case POINT:
+                accessor = new JPointAccessor();
+                break;
+            case POINT3D:
+                accessor = new JPoint3DAccessor();
+                break;
+            case LINE:
+                accessor = new JLineAccessor();
+                break;
+            case DATE:
+                accessor = new JDateAccessor();
+                break;
+            case DATETIME:
+                accessor = new JDateTimeAccessor();
+                break;
+            case DURATION:
+                accessor = new JDurationAccessor();
+                break;
+        }
+        return accessor;
+    }
+
+    public static class JInt8Accessor implements IJObjectAccessor {
+
+        @Override
+        public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+                throws HyracksDataException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            byte o = AInt8SerializerDeserializer.getByte(b, s + 1);
+            IJObject jObject = objectPool.allocate(BuiltinType.AINT8);
+            ((JByte) jObject).setValue(o);
+            return null;
+        }
+
+    }
+
+    public static class JInt16Accessor implements IJObjectAccessor {
+
+        @Override
+        public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+                throws HyracksDataException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            short i = AInt16SerializerDeserializer.getShort(b, s + 1);
+            IJObject jObject = objectPool.allocate(BuiltinType.AINT16);
+            ((JInt) jObject).setValue(i);
+            return null;
+        }
+    }
+
+    public static class JInt32Accessor implements IJObjectAccessor {
+
+        @Override
+        public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+                throws HyracksDataException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            int i = AInt32SerializerDeserializer.getInt(b, s + 1);
+            IJObject jObject = objectPool.allocate(BuiltinType.AINT32);
+            ((JInt) jObject).setValue(i);
+            return jObject;
+        }
+    }
+
+    public static class JInt64Accessor implements IJObjectAccessor {
+
+        @Override
+        public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+                throws HyracksDataException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            long v = AInt64SerializerDeserializer.getLong(b, s + 1);
+            IJObject jObject = objectPool.allocate(BuiltinType.AINT64);
+            ((JLong) jObject).setValue(v);
+            return jObject;
+        }
+    }
+
+    public static class JFloatAccessor implements IJObjectAccessor {
+
+        @Override
+        public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+                throws HyracksDataException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            float v = AFloatSerializerDeserializer.getFloat(b, s + 1);
+            IJObject jObject = objectPool.allocate(BuiltinType.AFLOAT);
+            ((JFloat) jObject).setValue(v);
+            return jObject;
+        }
+    }
+
+    public static class JDoubleAccessor implements IJObjectAccessor {
+
+        @Override
+        public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+                throws HyracksDataException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            double v = ADoubleSerializerDeserializer.getDouble(b, s + 1);
+            IJObject jObject = objectPool.allocate(BuiltinType.ADOUBLE);
+            ((JDouble) jObject).setValue(v);
+            return jObject;
+        }
+    }
+
+    public static class JStringAccessor implements IJObjectAccessor {
+
+        @Override
+        public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+                throws HyracksDataException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            int l = pointable.getLength();
+
+            String v = null;
+            v = AStringSerializerDeserializer.INSTANCE.deserialize(
+                    new DataInputStream(new ByteArrayInputStream(b, s+1, l-1))).getStringValue();
+            //v = new String(b, s+1, l, "UTF-8");
+            TweetProcessor.getNormalizedString(v);
+            IJObject jObject = objectPool.allocate(BuiltinType.ASTRING);
+            ((JString) jObject).setValue(TweetProcessor.getNormalizedString(v));
+            return jObject;
+        }
+    }
+
+    public static class JBooleanAccessor implements IJObjectAccessor {
+
+        @Override
+        public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+                throws HyracksDataException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            Boolean v = ABooleanSerializerDeserializer.getBoolean(b, s);
+            IJObject jObject = objectPool.allocate(BuiltinType.ABOOLEAN);
+            ((JBoolean) jObject).setValue(v);
+            return jObject;
+        }
+    }
+
+    public static class JDateAccessor implements IJObjectAccessor {
+
+        @Override
+        public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+                throws HyracksDataException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            int v = ADateSerializerDeserializer.getChronon(b, s);
+            IJObject jObject = objectPool.allocate(BuiltinType.ADATE);
+            ((JDate) jObject).setValue(v);
+            return jObject;
+        }
+    }
+
+    public static class JDateTimeAccessor implements IJObjectAccessor {
+
+        @Override
+        public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+                throws HyracksDataException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            long v = ADateTimeSerializerDeserializer.getChronon(b, s);
+            IJObject jObject = objectPool.allocate(BuiltinType.ADATETIME);
+            ((JDateTime) jObject).setValue(v);
+            return jObject;
+        }
+    }
+
+    public static class JDurationAccessor implements IJObjectAccessor {
+
+        @Override
+        public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+                throws HyracksDataException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            int l = pointable.getLength();
+            ADuration duration = ADurationSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
+                    new ByteArrayInputStream(b, s, l)));
+            IJObject jObject = objectPool.allocate(BuiltinType.ADURATION);
+            ((JDuration) jObject).setValue(duration.getMonths(), duration.getMilliseconds());
+            return jObject;
+        }
+    }
+
+    public static class JTimeAccessor implements IJObjectAccessor {
+
+        @Override
+        public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+                throws HyracksDataException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            int v = ATimeSerializerDeserializer.getChronon(b, s);
+            IJObject jObject = objectPool.allocate(BuiltinType.ATIME);
+            ((JTime) jObject).setValue(v);
+            return jObject;
+        }
+    }
+
+    public static class JIntervalAccessor implements IJObjectAccessor {
+
+        @Override
+        public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+                throws HyracksDataException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            long intervalStart = AIntervalSerializerDeserializer.getIntervalStart(b, s);
+            long intervalEnd = AIntervalSerializerDeserializer.getIntervalEnd(b, s);
+            byte intervalType = AIntervalSerializerDeserializer.getIntervalTimeType(b, s);
+            IJObject jObject = objectPool.allocate(BuiltinType.AINTERVAL);
+            try {
+                ((JInterval) jObject).setValue(intervalStart, intervalEnd, intervalType);
+            } catch (AlgebricksException e) {
+                throw new HyracksDataException(e);
+            }
+            return jObject;
+        }
+    }
+
+    // Spatial Types
+
+    public static class JCircleAccessor implements IJObjectAccessor {
+
+        @Override
+        public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+                throws HyracksDataException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            int l = pointable.getLength();
+            ACircle v = ACircleSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
+                    new ByteArrayInputStream(b, s, l)));
+            JPoint jpoint = (JPoint) objectPool.allocate(BuiltinType.APOINT);
+            jpoint.setValue(v.getP().getX(), v.getP().getY());
+            IJObject jObject = objectPool.allocate(BuiltinType.ACIRCLE);
+            ((JCircle) jObject).setValue(jpoint, v.getRadius());
+            return jObject;
+        }
+    }
+
+    public static class JPointAccessor implements IJObjectAccessor {
+
+        @Override
+        public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+                throws HyracksDataException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            int l = pointable.getLength();
+            APoint v = APointSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(new ByteArrayInputStream(
+                    b, s, l)));
+            JPoint jObject = (JPoint) objectPool.allocate(BuiltinType.APOINT);
+            jObject.setValue(v.getX(), v.getY());
+            return jObject;
+        }
+    }
+
+    public static class JPoint3DAccessor implements IJObjectAccessor {
+
+        @Override
+        public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+                throws HyracksDataException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            int l = pointable.getLength();
+            APoint3D v = APoint3DSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
+                    new ByteArrayInputStream(b, s, l)));
+            JPoint3D jObject = (JPoint3D) objectPool.allocate(BuiltinType.APOINT3D);
+            jObject.setValue(v.getX(), v.getY(), v.getZ());
+            return jObject;
+        }
+    }
+
+    public static class JLineAccessor implements IJObjectAccessor {
+
+        @Override
+        public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+                throws HyracksDataException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            int l = pointable.getLength();
+            ALine v = ALineSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(new ByteArrayInputStream(b,
+                    s, l)));
+            JLine jObject = (JLine) objectPool.allocate(BuiltinType.ALINE);
+            jObject.setValue(v.getP1(), v.getP2());
+            return jObject;
+        }
+    }
+
+    public static class JPolygonAccessor implements IJObjectAccessor {
+
+        @Override
+        public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+                throws HyracksDataException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            int l = pointable.getLength();
+            APolygon v = APolygonSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
+                    new ByteArrayInputStream(b, s, l)));
+            JPolygon jObject = (JPolygon) objectPool.allocate(BuiltinType.APOLYGON);
+            jObject.setValue(v.getPoints());
+            return jObject;
+        }
+    }
+
+    public static class JRectangleAccessor implements IJObjectAccessor {
+
+        @Override
+        public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+                throws HyracksDataException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            int l = pointable.getLength();
+            ARectangle v = ARectangleSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
+                    new ByteArrayInputStream(b, s, l)));
+            JRectangle jObject = (JRectangle) objectPool.allocate(BuiltinType.ARECTANGLE);
+            jObject.setValue(v.getP1(), v.getP2());
+            return jObject;
+        }
+    }
+
+    public static class JRecordAccessor implements IJRecordAccessor {
+
+        private final TypeInfo typeInfo;
+        private final JRecord jRecord;
+        private final IJObject[] jObjects;
+        private final LinkedHashMap<String, IJObject> openFields;
+
+        public JRecordAccessor(ARecordType recordType, IObjectPool<IJObject, IAType> objectPool) {
+            this.typeInfo = new TypeInfo(objectPool, null, null);
+            this.jObjects = new IJObject[recordType.getFieldNames().length];
+            this.jRecord = new JRecord(recordType, jObjects);
+            this.openFields = new LinkedHashMap<String, IJObject>();
+        }
+
+        @Override
+        public JRecord access(ARecordPointable pointable, IObjectPool<IJObject, IAType> objectPool,
+                ARecordType recordType, JObjectPointableVisitor pointableVisitor) throws HyracksDataException {
+            try {
+                jRecord.reset();
+            } catch (AlgebricksException e) {
+                throw new HyracksDataException(e);
+            }
+            ARecordPointable recordPointable = (ARecordPointable) pointable;
+            List<IVisitablePointable> fieldPointables = recordPointable.getFieldValues();
+            List<IVisitablePointable> fieldTypeTags = recordPointable.getFieldTypeTags();
+            List<IVisitablePointable> fieldNames = recordPointable.getFieldNames();
+            int index = 0;
+            boolean closedPart = true;
+            try {
+                IJObject fieldObject = null;
+                for (IVisitablePointable fieldPointable : fieldPointables) {
+                    closedPart = index < recordType.getFieldTypes().length;
+                    IVisitablePointable tt = fieldTypeTags.get(index);
+                    IAType fieldType = closedPart ? recordType.getFieldTypes()[index] : null;
+                    ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(tt.getByteArray()[tt
+                            .getStartOffset()]);
+                    IVisitablePointable fieldName = fieldNames.get(index);
+                    typeInfo.reset(fieldType, typeTag);
+                    switch (typeTag) {
+                        case RECORD:
+                            fieldObject = pointableVisitor.visit((ARecordPointable) fieldPointable, typeInfo);
+                            break;
+                        case ORDEREDLIST:
+                        case UNORDEREDLIST:
+                            if (fieldPointable instanceof AFlatValuePointable) {
+                                // value is null
+                                fieldObject = null;
+                            } else {
+                                fieldObject = pointableVisitor.visit((AListPointable) fieldPointable, typeInfo);
+                            }
+                            break;
+                        case ANY:
+                            break;
+                        default:
+                            fieldObject = pointableVisitor.visit((AFlatValuePointable) fieldPointable, typeInfo);
+                    }
+                    if (closedPart) {
+                        jObjects[index] = fieldObject;
+                    } else {
+                        byte[] b = fieldName.getByteArray();
+                        int s = fieldName.getStartOffset();
+                        int l = fieldName.getLength();
+                        String v = AStringSerializerDeserializer.INSTANCE.deserialize(
+                                new DataInputStream(new ByteArrayInputStream(b, s + 1, l - 1))).getStringValue();
+                        openFields.put(v, fieldObject);
+                    }
+                    index++;
+                    fieldObject = null;
+                }
+
+            } catch (Exception e) {
+                e.printStackTrace();
+                throw new HyracksDataException(e);
+            }
+            return jRecord;
+        }
+
+        public void reset() throws HyracksDataException {
+            try {
+                jRecord.reset();
+            } catch (AlgebricksException e) {
+                throw new HyracksDataException(e);
+            }
+            openFields.clear();
+        }
+
+    }
+
+    public static class JListAccessor implements IJListAccessor {
+
+        private final TypeInfo typeInfo;
+
+        public JListAccessor(IObjectPool<IJObject, IAType> objectPool) {
+            this.typeInfo = new TypeInfo(objectPool, null, null);
+        }
+
+        @Override
+        public IJObject access(AListPointable pointable, IObjectPool<IJObject, IAType> objectPool, IAType listType,
+                JObjectPointableVisitor pointableVisitor) throws HyracksDataException {
+            List<IVisitablePointable> items = pointable.getItems();
+            List<IVisitablePointable> itemTags = pointable.getItemTags();
+            JList list = pointable.ordered() ? new JOrderedList(listType) : new JUnorderedList(listType);
+            IJObject listItem = null;
+            int index = 0;
+            try {
+
+                for (IVisitablePointable itemPointable : items) {
+                    IVisitablePointable itemTagPointable = itemTags.get(index);
+                    ATypeTag itemTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(itemTagPointable
+                            .getByteArray()[itemTagPointable.getStartOffset()]);
+                    typeInfo.reset(listType.getType(), listType.getTypeTag());
+                    switch (itemTypeTag) {
+                        case RECORD:
+                            listItem = pointableVisitor.visit((ARecordPointable) itemPointable, typeInfo);
+                            break;
+                        case UNORDEREDLIST:
+                        case ORDEREDLIST:
+                            listItem = pointableVisitor.visit((AListPointable) itemPointable, typeInfo);
+                            break;
+                        case ANY:
+                            throw new IllegalArgumentException("Cannot parse list item of type "
+                                    + listType.getTypeTag());
+                        default:
+                            typeInfo.reset(((AbstractCollectionType) listType).getItemType(),
+                                    ((AbstractCollectionType) listType).getTypeTag());
+                            listItem = pointableVisitor.visit((AFlatValuePointable) itemPointable, typeInfo);
+
+                    }
+                    ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+                            .deserialize(itemPointable.getByteArray()[itemPointable.getStartOffset()]);
+
+                    list.add(listItem);
+                }
+            } catch (AsterixException exception) {
+                throw new HyracksDataException(exception);
+            }
+            return list;
+        }
+    }
+
+    public static class JUnorderedListAccessor implements IJObjectAccessor {
+
+        @Override
+        public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+                throws HyracksDataException {
+            return null;
+        }
+
+    }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectPointableVisitor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectPointableVisitor.java
new file mode 100644
index 0000000..0f4ce58
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectPointableVisitor.java
@@ -0,0 +1,75 @@
+package edu.uci.ics.asterix.external.library.java;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.external.library.TypeInfo;
+import edu.uci.ics.asterix.external.library.java.JObjectAccessors.JListAccessor;
+import edu.uci.ics.asterix.external.library.java.JObjectAccessors.JRecordAccessor;
+import edu.uci.ics.asterix.om.pointables.AFlatValuePointable;
+import edu.uci.ics.asterix.om.pointables.AListPointable;
+import edu.uci.ics.asterix.om.pointables.ARecordPointable;
+import edu.uci.ics.asterix.om.pointables.base.IVisitablePointable;
+import edu.uci.ics.asterix.om.pointables.visitor.IVisitablePointableVisitor;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class JObjectPointableVisitor implements IVisitablePointableVisitor<IJObject, TypeInfo> {
+
+    private final Map<ATypeTag, IJObjectAccessor> flatJObjectAccessors = new HashMap<ATypeTag, IJObjectAccessor>();
+    private final Map<IVisitablePointable, IJRecordAccessor> raccessorToJObject = new HashMap<IVisitablePointable, IJRecordAccessor>();
+    private final Map<IVisitablePointable, IJListAccessor> laccessorToPrinter = new HashMap<IVisitablePointable, IJListAccessor>();
+
+    @Override
+    public IJObject visit(AListPointable accessor, TypeInfo arg) throws AsterixException {
+        IJObject result = null;
+        IJListAccessor jListAccessor = laccessorToPrinter.get(accessor);
+        if (jListAccessor == null) {
+            jListAccessor = new JListAccessor(arg.getObjectPool());
+            laccessorToPrinter.put(accessor, jListAccessor);
+        }
+        try {
+            result = jListAccessor.access(accessor, arg.getObjectPool(), arg.getAtype(), this);
+        } catch (Exception e) {
+            throw new AsterixException(e);
+        }
+        return result;
+    }
+
+    @Override
+    public IJObject visit(ARecordPointable accessor, TypeInfo arg) throws AsterixException {
+        IJObject result = null;
+        IJRecordAccessor jRecordAccessor = raccessorToJObject.get(accessor);
+        if (jRecordAccessor == null) {
+            jRecordAccessor = new JRecordAccessor(accessor.getInputRecordType(), arg.getObjectPool());
+            raccessorToJObject.put(accessor, jRecordAccessor);
+        }
+        try {
+            result = jRecordAccessor.access(accessor, arg.getObjectPool(), (ARecordType) arg.getAtype(), this);
+        } catch (Exception e) {
+            throw new AsterixException(e);
+        }
+        return result;
+    }
+
+    @Override
+    public IJObject visit(AFlatValuePointable accessor, TypeInfo arg) throws AsterixException {
+        ATypeTag typeTag = arg.getTypeTag();
+        IJObject result = null;
+        IJObjectAccessor jObjectAccessor = flatJObjectAccessors.get(typeTag);
+        if (jObjectAccessor == null) {
+            jObjectAccessor = JObjectAccessors.createFlatJObjectAccessor(typeTag);
+            flatJObjectAccessors.put(typeTag, jObjectAccessor);
+        }
+
+        try {
+            result = jObjectAccessor.access(accessor, arg.getObjectPool());
+        } catch (HyracksDataException e) {
+            throw new AsterixException(e);
+        }
+        return result;
+    }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java
index ff662ae..f5f404a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java
@@ -42,6 +42,7 @@
 import edu.uci.ics.asterix.external.library.java.JObjects.JString;
 import edu.uci.ics.asterix.external.library.java.JObjects.JTime;
 import edu.uci.ics.asterix.external.library.java.JObjects.JUnorderedList;
+import edu.uci.ics.asterix.om.base.APoint;
 import edu.uci.ics.asterix.om.types.AOrderedListType;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
@@ -52,6 +53,7 @@
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
 import edu.uci.ics.asterix.om.util.container.IObjectPool;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 public class JObjectUtil {
@@ -129,7 +131,11 @@
                 long start = dis.readLong();
                 long end = dis.readLong();
                 byte intervalType = dis.readByte();
-                ((JInterval) jObject).setValue(start, end, intervalType);
+                try {
+                    ((JInterval) jObject).setValue(start, end, intervalType);
+                } catch (AlgebricksException e) {
+                    throw new AsterixException(e);
+                }
                 break;
             }
 
@@ -184,7 +190,7 @@
                     p1.setValue(dis.readDouble(), dis.readDouble());
                     points.add(p1);
                 }
-                ((JPolygon) jObject).setValue(points);
+                ((JPolygon) jObject).setValue(points.toArray(new APoint[]{}));
                 break;
             }
 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java
index 191fda6..61a60b6 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java
@@ -16,21 +16,56 @@
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 
+import edu.uci.ics.asterix.builders.IAsterixListBuilder;
+import edu.uci.ics.asterix.builders.RecordBuilder;
+import edu.uci.ics.asterix.builders.UnorderedListBuilder;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ACircleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADateSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADurationSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AIntervalSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ALineSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.APoint3DSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.APointSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.APolygonSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARectangleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ATimeSerializerDeserializer;
 import edu.uci.ics.asterix.om.base.ABoolean;
+import edu.uci.ics.asterix.om.base.ADouble;
+import edu.uci.ics.asterix.om.base.AFloat;
+import edu.uci.ics.asterix.om.base.AInt16;
+import edu.uci.ics.asterix.om.base.AInt32;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AInt8;
 import edu.uci.ics.asterix.om.base.AMutableCircle;
 import edu.uci.ics.asterix.om.base.AMutableDate;
 import edu.uci.ics.asterix.om.base.AMutableDateTime;
 import edu.uci.ics.asterix.om.base.AMutableDouble;
 import edu.uci.ics.asterix.om.base.AMutableDuration;
 import edu.uci.ics.asterix.om.base.AMutableFloat;
+import edu.uci.ics.asterix.om.base.AMutableInt16;
 import edu.uci.ics.asterix.om.base.AMutableInt32;
 import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt8;
 import edu.uci.ics.asterix.om.base.AMutableInterval;
 import edu.uci.ics.asterix.om.base.AMutableLine;
 import edu.uci.ics.asterix.om.base.AMutableOrderedList;
@@ -43,15 +78,17 @@
 import edu.uci.ics.asterix.om.base.AMutableTime;
 import edu.uci.ics.asterix.om.base.AMutableUnorderedList;
 import edu.uci.ics.asterix.om.base.APoint;
+import edu.uci.ics.asterix.om.base.ARectangle;
+import edu.uci.ics.asterix.om.base.AString;
 import edu.uci.ics.asterix.om.base.IAObject;
 import edu.uci.ics.asterix.om.types.AOrderedListType;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.AUnorderedListType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
 
 public class JObjects {
 
@@ -73,26 +110,84 @@
         public IAObject getIAObject() {
             return value;
         }
+
     }
 
-    public static final class JInt implements IJObject {
+    public static final class JByte extends JObject {
 
-        private AMutableInt32 value;
+        public JByte(byte value) {
+            super(new AMutableInt8(value));
+        }
+
+        public void setValue(byte v) {
+            ((AMutableInt8) value).setValue(v);
+        }
+
+        public byte getValue() {
+            return ((AMutableInt8) value).getByteValue();
+        }
+
+        @Override
+        public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+            if (writeTypeTag) {
+                try {
+                    dataOutput.writeByte(value.getType().getTypeTag().serialize());
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+            AInt8SerializerDeserializer.INSTANCE.serialize((AInt8) value, dataOutput);
+        }
+
+        @Override
+        public void reset() {
+            ((AMutableInt8) value).setValue((byte) 0);
+        }
+    }
+
+    public static final class JShort extends JObject {
+
+        private AMutableInt16 value;
+
+        public JShort(short value) {
+            super(new AMutableInt16(value));
+        }
+
+        public void setValue(byte v) {
+            ((AMutableInt16) value).setValue(v);
+        }
+
+        public short getValue() {
+            return ((AMutableInt16) value).getShortValue();
+        }
+
+        @Override
+        public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+            if (writeTypeTag) {
+                try {
+                    dataOutput.writeByte(value.getType().getTypeTag().serialize());
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+            AInt16SerializerDeserializer.INSTANCE.serialize((AInt16) value, dataOutput);
+        }
+
+        @Override
+        public void reset() {
+            ((AMutableInt16) value).setValue((short) 0);
+        }
+
+    }
+
+    public static final class JInt extends JObject {
 
         public JInt(int value) {
-            this.value = new AMutableInt32(value);
+            super(new AMutableInt32(value));
         }
 
         public void setValue(int v) {
-            if (value == null) {
-                value = new AMutableInt32(v);
-            } else {
-                ((AMutableInt32) value).setValue(v);
-            }
-        }
-
-        public void setValue(AMutableInt32 v) {
-            value = v;
+            ((AMutableInt32) value).setValue(v);
         }
 
         public int getValue() {
@@ -100,15 +195,21 @@
         }
 
         @Override
-        public ATypeTag getTypeTag() {
-            return BuiltinType.AINT32.getTypeTag();
+        public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+            if (writeTypeTag) {
+                try {
+                    dataOutput.writeByte(value.getType().getTypeTag().serialize());
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+            AInt32SerializerDeserializer.INSTANCE.serialize((AInt32) value, dataOutput);
         }
 
         @Override
-        public IAObject getIAObject() {
-            return value;
+        public void reset() {
+            ((AMutableInt32) value).setValue(0);
         }
-
     }
 
     public static final class JBoolean implements IJObject {
@@ -133,6 +234,23 @@
             return value ? ABoolean.TRUE : ABoolean.FALSE;
         }
 
+        @Override
+        public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+            if (writeTypeTag) {
+                try {
+                    dataOutput.writeByte(ATypeTag.BOOLEAN.serialize());
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+            ABooleanSerializerDeserializer.INSTANCE.serialize((ABoolean) getIAObject(), dataOutput);
+        }
+
+        @Override
+        public void reset() {
+            value = false;
+        }
+
     }
 
     public static final class JLong extends JObject {
@@ -149,6 +267,23 @@
             return ((AMutableInt64) value).getLongValue();
         }
 
+        @Override
+        public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+            if (writeTypeTag) {
+                try {
+                    dataOutput.writeByte(value.getType().getTypeTag().serialize());
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+            AInt64SerializerDeserializer.INSTANCE.serialize((AInt64) value, dataOutput);
+        }
+
+        @Override
+        public void reset() {
+            ((AMutableInt64) value).setValue(0);
+        }
+
     }
 
     public static final class JDouble extends JObject {
@@ -165,6 +300,23 @@
             return ((AMutableDouble) value).getDoubleValue();
         }
 
+        @Override
+        public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+            if (writeTypeTag) {
+                try {
+                    dataOutput.writeByte(value.getType().getTypeTag().serialize());
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+            ADoubleSerializerDeserializer.INSTANCE.serialize((ADouble) value, dataOutput);
+        }
+
+        @Override
+        public void reset() {
+            ((AMutableDouble) value).setValue(0);
+        }
+
     }
 
     public static final class JString extends JObject {
@@ -181,14 +333,29 @@
             return ((AMutableString) value).getStringValue();
         }
 
+        @Override
+        public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+            if (writeTypeTag) {
+                try {
+                    dataOutput.writeByte(value.getType().getTypeTag().serialize());
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+            AStringSerializerDeserializer.INSTANCE.serialize((AString) value, dataOutput);
+        }
+
+        @Override
+        public void reset() {
+            ((AMutableString) value).setValue("");
+        }
+
     }
 
-    public static final class JFloat implements IJObject {
-
-        private AMutableFloat value;
+    public static final class JFloat extends JObject {
 
         public JFloat(float v) {
-            value = new AMutableFloat(v);
+            super(new AMutableFloat(v));
         }
 
         public void setValue(float v) {
@@ -200,13 +367,20 @@
         }
 
         @Override
-        public ATypeTag getTypeTag() {
-            return BuiltinType.AFLOAT.getTypeTag();
+        public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+            if (writeTypeTag) {
+                try {
+                    dataOutput.writeByte(value.getType().getTypeTag().serialize());
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+            AFloatSerializerDeserializer.INSTANCE.serialize((AFloat) value, dataOutput);
         }
 
         @Override
-        public IAObject getIAObject() {
-            return value;
+        public void reset() {
+            ((AMutableFloat) value).setValue(0);
         }
 
     }
@@ -237,18 +411,37 @@
         public String toString() {
             return value.toString();
         }
+
+        @Override
+        public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+            if (writeTypeTag) {
+                try {
+                    dataOutput.writeByte(value.getType().getTypeTag().serialize());
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+            APointSerializerDeserializer.INSTANCE.serialize((APoint) value, dataOutput);
+        }
+
+        @Override
+        public void reset() {
+            ((AMutablePoint) value).setValue(0, 0);
+        }
     }
 
-    public static final class JRectangle implements IJObject {
-
-        private AMutableRectangle rect;
+    public static final class JRectangle extends JObject {
 
         public JRectangle(JPoint p1, JPoint p2) {
-            rect = new AMutableRectangle((APoint) p1.getValue(), (APoint) p2.getValue());
+            super(new AMutableRectangle((APoint) p1.getIAObject(), (APoint) p2.getIAObject()));
         }
 
         public void setValue(JPoint p1, JPoint p2) {
-            this.rect.setValue((APoint) p1.getValue(), (APoint) p2.getValue());
+            ((AMutableRectangle) value).setValue((APoint) p1.getValue(), (APoint) p2.getValue());
+        }
+
+        public void setValue(APoint p1, APoint p2) {
+            ((AMutableRectangle) value).setValue(p1, p2);
         }
 
         @Override
@@ -257,190 +450,200 @@
         }
 
         @Override
-        public IAObject getIAObject() {
-            return rect;
+        public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+            if (writeTypeTag) {
+                try {
+                    dataOutput.writeByte(value.getType().getTypeTag().serialize());
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+            ARectangleSerializerDeserializer.INSTANCE.serialize((ARectangle) value, dataOutput);
         }
 
         @Override
-        public String toString() {
-            return rect.toString();
+        public void reset() {
         }
 
     }
 
-    public static final class JTime implements IJObject {
-
-        private AMutableTime time;
+    public static final class JTime extends JObject {
 
         public JTime(int timeInMillsec) {
-            time = new AMutableTime(timeInMillsec);
+            super(new AMutableTime(timeInMillsec));
         }
 
         public void setValue(int timeInMillsec) {
-            time.setValue(timeInMillsec);
+            ((AMutableTime) value).setValue(timeInMillsec);
         }
 
         @Override
-        public ATypeTag getTypeTag() {
-            return ATypeTag.TIME;
+        public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+            if (writeTypeTag) {
+                try {
+                    dataOutput.writeByte(ATypeTag.TIME.serialize());
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+            ATimeSerializerDeserializer.INSTANCE.serialize((AMutableTime) value, dataOutput);
         }
 
         @Override
-        public IAObject getIAObject() {
-            return time;
-        }
-
-        @Override
-        public String toString() {
-            return time.toString();
+        public void reset() {
+            ((AMutableTime) value).setValue(0);
         }
 
     }
 
-    public static final class JInterval implements IJObject {
-
-        private AMutableInterval interval;
+    public static final class JInterval extends JObject {
 
         public JInterval(long intervalStart, long intervalEnd) {
-            interval = new AMutableInterval(intervalStart, intervalEnd, (byte) 0);
+            super(new AMutableInterval(intervalStart, intervalEnd, (byte) 0));
         }
 
-        public void setValue(long intervalStart, long intervalEnd, byte typetag) throws AsterixException {
-            try {
-                interval.setValue(intervalStart, intervalEnd, typetag);
-            } catch (AlgebricksException e) {
-                throw new AsterixException(e);
-            }
-        }
-
-        @Override
-        public ATypeTag getTypeTag() {
-            return ATypeTag.INTERVAL;
-        }
-
-        @Override
-        public IAObject getIAObject() {
-            return interval;
-        }
-
-        @Override
-        public String toString() {
-            return interval.toString();
+        public void setValue(long intervalStart, long intervalEnd, byte typetag) throws AlgebricksException {
+            ((AMutableInterval) value).setValue(intervalStart, intervalEnd, typetag);
         }
 
         public long getIntervalStart() {
-            return interval.getIntervalStart();
+            return ((AMutableInterval) value).getIntervalStart();
         }
 
         public long getIntervalEnd() {
-            return interval.getIntervalEnd();
+            return ((AMutableInterval) value).getIntervalEnd();
         }
 
         public short getIntervalType() {
-            return interval.getIntervalType();
+            return ((AMutableInterval) value).getIntervalType();
+        }
+
+        @Override
+        public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+            if (writeTypeTag) {
+                try {
+                    dataOutput.writeByte(ATypeTag.INTERVAL.serialize());
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+            AIntervalSerializerDeserializer.INSTANCE.serialize(((AMutableInterval) value), dataOutput);
+        }
+
+        @Override
+        public void reset() throws AlgebricksException {
+            ((AMutableInterval) value).setValue(0L, 0L, (byte) 0);
         }
 
     }
 
-    public static final class JDate implements IJObject {
-
-        private AMutableDate date;
+    public static final class JDate extends JObject {
 
         public JDate(int chrononTimeInDays) {
-            date = new AMutableDate(chrononTimeInDays);
+            super(new AMutableDate(chrononTimeInDays));
         }
 
         public void setValue(int chrononTimeInDays) {
-            date.setValue(chrononTimeInDays);
+            ((AMutableDate) value).setValue(chrononTimeInDays);
         }
 
         @Override
-        public ATypeTag getTypeTag() {
-            return ATypeTag.DATE;
+        public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+            if (writeTypeTag) {
+                try {
+                    dataOutput.writeByte(ATypeTag.DATE.serialize());
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+            ADateSerializerDeserializer.INSTANCE.serialize(((AMutableDate) value), dataOutput);
         }
 
         @Override
-        public IAObject getIAObject() {
-            return date;
+        public void reset() {
+            ((AMutableDate) value).setValue(0);
         }
-
-        @Override
-        public String toString() {
-            return date.toString();
-        }
-
     }
 
-    public static final class JDateTime implements IJObject {
-
-        private AMutableDateTime dateTime;
+    public static final class JDateTime extends JObject {
 
         public JDateTime(long chrononTime) {
-            dateTime = new AMutableDateTime(chrononTime);
+            super(new AMutableDateTime(chrononTime));
         }
 
         public void setValue(long chrononTime) {
-            dateTime.setValue(chrononTime);
+            ((AMutableDateTime) value).setValue(chrononTime);
         }
 
         @Override
-        public ATypeTag getTypeTag() {
-            return ATypeTag.DATETIME;
+        public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+            if (writeTypeTag) {
+                try {
+                    dataOutput.writeByte(ATypeTag.DATETIME.serialize());
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+            ADateTimeSerializerDeserializer.INSTANCE.serialize(((AMutableDateTime) value), dataOutput);
         }
 
         @Override
-        public IAObject getIAObject() {
-            return dateTime;
-        }
-
-        @Override
-        public String toString() {
-            return dateTime.toString();
+        public void reset() {
+            ((AMutableDateTime) value).setValue(0);
         }
 
     }
 
-    public static final class JDuration implements IJObject {
-
-        private AMutableDuration duration;
+    public static final class JDuration extends JObject {
 
         public JDuration(int months, long milliseconds) {
-            duration = new AMutableDuration(months, milliseconds);
+            super(new AMutableDuration(months, milliseconds));
         }
 
         public void setValue(int months, long milliseconds) {
-            duration.setValue(months, milliseconds);
+            ((AMutableDuration) value).setValue(months, milliseconds);
         }
 
         @Override
-        public ATypeTag getTypeTag() {
-            return ATypeTag.DURATION;
+        public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+            if (writeTypeTag) {
+                try {
+                    dataOutput.writeByte(ATypeTag.DURATION.serialize());
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+            ADurationSerializerDeserializer.INSTANCE.serialize(((AMutableDuration) value), dataOutput);
         }
 
         @Override
-        public IAObject getIAObject() {
-            return duration;
-        }
-
-        @Override
-        public String toString() {
-            return duration.toString();
+        public void reset() {
+            ((AMutableDuration) value).setValue(0, 0);
         }
 
     }
 
-    public static final class JPolygon implements IJObject {
+    public static final class JPolygon extends JObject {
 
-        private AMutablePolygon polygon;
-        private List<JPoint> points;
-
-        public JPolygon(List<JPoint> points) {
-            this.points = points;
+        public JPolygon(JPoint[] points) {
+            super(new AMutablePolygon(getAPoints(points)));
         }
 
-        public void setValue(List<JPoint> points) {
-            this.points = points;
-            polygon = null;
+        public void setValue(APoint[] points) {
+            ((AMutablePolygon) value).setValue(points);
+        }
+
+        public void setValue(JPoint[] points) {
+            ((AMutablePolygon) value).setValue(getAPoints(points));
+        }
+
+        private static APoint[] getAPoints(JPoint[] jpoints) {
+            APoint[] apoints = new APoint[jpoints.length];
+            int index = 0;
+            for (JPoint jpoint : jpoints) {
+                apoints[index++] = (APoint) jpoint.getIAObject();
+            }
+            return apoints;
         }
 
         @Override
@@ -449,35 +652,32 @@
         }
 
         @Override
-        public IAObject getIAObject() {
-            if (polygon == null) {
-                APoint[] pts = new APoint[points.size()];
-                int index = 0;
-                for (JPoint p : points) {
-                    pts[index++] = (APoint) p.getIAObject();
+        public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+            if (writeTypeTag) {
+                try {
+                    dataOutput.writeByte(ATypeTag.POLYGON.serialize());
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
                 }
-                polygon = new AMutablePolygon(pts);
             }
-            return polygon;
+            APolygonSerializerDeserializer.INSTANCE.serialize((AMutablePolygon) value, dataOutput);
         }
 
         @Override
-        public String toString() {
-            return getIAObject().toString();
+        public void reset() {
+            ((AMutablePolygon) value).setValue(null);
         }
 
     }
 
-    public static final class JCircle implements IJObject {
-
-        private AMutableCircle circle;
+    public static final class JCircle extends JObject {
 
         public JCircle(JPoint center, double radius) {
-            circle = new AMutableCircle((APoint) center.getIAObject(), radius);
+            super(new AMutableCircle((APoint) center.getIAObject(), radius));
         }
 
         public void setValue(JPoint center, double radius) {
-            circle.setValue((APoint) center.getIAObject(), radius);
+            ((AMutableCircle) (value)).setValue((APoint) center.getIAObject(), radius);
         }
 
         @Override
@@ -486,56 +686,64 @@
         }
 
         @Override
-        public IAObject getIAObject() {
-            return circle;
+        public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+            if (writeTypeTag) {
+                try {
+                    dataOutput.writeByte(ATypeTag.CIRCLE.serialize());
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+            ACircleSerializerDeserializer.INSTANCE.serialize(((AMutableCircle) (value)), dataOutput);
         }
 
         @Override
-        public String toString() {
-            return circle.toString();
+        public void reset() {
         }
-
     }
 
-    public static final class JLine implements IJObject {
-
-        private AMutableLine line;
+    public static final class JLine extends JObject {
 
         public JLine(JPoint p1, JPoint p2) {
-            line = new AMutableLine((APoint) p1.getIAObject(), (APoint) p2.getIAObject());
+            super(new AMutableLine((APoint) p1.getIAObject(), (APoint) p2.getIAObject()));
         }
 
         public void setValue(JPoint p1, JPoint p2) {
-            line.setValue((APoint) p1.getIAObject(), (APoint) p2.getIAObject());
+            ((AMutableLine) value).setValue((APoint) p1.getIAObject(), (APoint) p2.getIAObject());
+        }
+
+        public void setValue(APoint p1, APoint p2) {
+            ((AMutableLine) value).setValue(p1, p2);
         }
 
         @Override
-        public ATypeTag getTypeTag() {
-            return ATypeTag.LINE;
+        public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+            if (writeTypeTag) {
+                try {
+                    dataOutput.writeByte(ATypeTag.LINE.serialize());
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+            ALineSerializerDeserializer.INSTANCE.serialize(((AMutableLine) value), dataOutput);
         }
 
         @Override
-        public IAObject getIAObject() {
-            return line;
-        }
+        public void reset() {
+            // TODO Auto-generated method stub
 
-        @Override
-        public String toString() {
-            return line.toString();
         }
 
     }
 
-    public static final class JPoint3D implements IJObject {
-
-        private AMutablePoint3D value;
+    public static final class JPoint3D extends JObject {
 
         public JPoint3D(double x, double y, double z) {
-            value = new AMutablePoint3D(x, y, z);
+            super(new AMutablePoint3D(x, y, z));
         }
 
         public void setValue(double x, double y, double z) {
-            value.setValue(x, y, z);
+            ((AMutablePoint3D) value).setValue(x, y, z);
         }
 
         public double getXValue() {
@@ -550,40 +758,75 @@
             return ((AMutablePoint3D) value).getZ();
         }
 
-        public IAObject getValue() {
-            return value;
+        @Override
+        public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+            if (writeTypeTag) {
+                try {
+                    dataOutput.writeByte(ATypeTag.POINT3D.serialize());
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+            APoint3DSerializerDeserializer.INSTANCE.serialize(((AMutablePoint3D) value), dataOutput);
         }
 
         @Override
-        public String toString() {
-            return value.toString();
-        }
+        public void reset() {
+            // TODO Auto-generated method stub
 
-        @Override
-        public ATypeTag getTypeTag() {
-            return ATypeTag.POINT3D;
-        }
-
-        @Override
-        public IAObject getIAObject() {
-            return value;
         }
     }
 
-    public static final class JOrderedList implements IJObject {
+    public static abstract class JList implements IJObject {
+        protected List<IJObject> jObjects;
 
-        private AOrderedListType listType;
-        private List<IJObject> jObjects;
+        public JList() {
+            jObjects = new ArrayList<IJObject>();
+        }
 
-        public JOrderedList(IJObject jObject) {
-            this.listType = new AOrderedListType(jObject.getIAObject().getType(), null);
-            this.jObjects = new ArrayList<IJObject>();
+        public boolean isEmpty() {
+            return jObjects.isEmpty();
         }
 
         public void add(IJObject jObject) {
             jObjects.add(jObject);
         }
 
+        public void addAll(Collection<IJObject> jObjectCollection) {
+            jObjects.addAll(jObjectCollection);
+        }
+
+        public void clear() {
+            jObjects.clear();
+        }
+
+        public IJObject getElement(int index) {
+            return jObjects.get(index);
+        }
+
+        public int size() {
+            return jObjects.size();
+        }
+
+        public Iterator<IJObject> iterator() {
+            return jObjects.iterator();
+        }
+    }
+
+    public static final class JOrderedList extends JList {
+
+        private AOrderedListType listType;
+
+        public JOrderedList(IJObject jObject) {
+            super();
+            this.listType = new AOrderedListType(jObject.getIAObject().getType(), null);
+        }
+
+        public JOrderedList(IAType listItemType) {
+            super();
+            this.listType = new AOrderedListType(listItemType, null);
+        }
+
         @Override
         public ATypeTag getTypeTag() {
             return ATypeTag.ORDEREDLIST;
@@ -602,34 +845,43 @@
             return listType;
         }
 
-        public void addAll(Collection<IJObject> jObjectCollection) {
-            jObjects.addAll(jObjectCollection);
+        @Override
+        public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+            IAsterixListBuilder listBuilder = new UnorderedListBuilder();
+            listBuilder.reset(listType);
+            ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
+            for (IJObject jObject : jObjects) {
+                fieldValue.reset();
+                jObject.serialize(fieldValue.getDataOutput(), true);
+                listBuilder.addItem(fieldValue);
+            }
+            listBuilder.write(dataOutput, writeTypeTag);
+
         }
 
-        public void clear() {
-            jObjects.clear();
-        }
+        @Override
+        public void reset() {
+            // TODO Auto-generated method stub
 
-        public IJObject getElement(int index) {
-            return jObjects.get(index);
-        }
-
-        public int size() {
-            return jObjects.size();
         }
 
     }
 
-    public static final class JUnorderedList implements IJObject {
+    public static final class JUnorderedList extends JList {
 
         private AUnorderedListType listType;
-        private List<IJObject> jObjects;
 
         public JUnorderedList(IJObject jObject) {
             this.listType = new AUnorderedListType(jObject.getIAObject().getType(), null);
             this.jObjects = new ArrayList<IJObject>();
         }
 
+        public JUnorderedList(IAType listItemType) {
+            super();
+            this.listType = new AUnorderedListType(listItemType, null);
+            this.jObjects = new ArrayList<IJObject>();
+        }
+
         public void add(IJObject jObject) {
             jObjects.add(jObject);
         }
@@ -652,60 +904,43 @@
             return listType;
         }
 
-        public boolean isEmpty() {
-            return jObjects.isEmpty();
+        @Override
+        public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+            IAsterixListBuilder listBuilder = new UnorderedListBuilder();
+            listBuilder.reset(listType);
+            ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
+            for (IJObject jObject : jObjects) {
+                fieldValue.reset();
+                jObject.serialize(fieldValue.getDataOutput(), true);
+                listBuilder.addItem(fieldValue);
+            }
+            listBuilder.write(dataOutput, writeTypeTag);
         }
 
-        public void addAll(Collection<IJObject> jObjectCollection) {
-            jObjects.addAll(jObjectCollection);
-        }
-
-        public void clear() {
+        @Override
+        public void reset() {
             jObjects.clear();
         }
 
-        public IJObject getElement(int index) {
-            return jObjects.get(index);
-        }
-
-        public int size() {
-            return jObjects.size();
-        }
-
     }
 
     public static final class JRecord implements IJObject {
 
         private AMutableRecord value;
         private ARecordType recordType;
-        private List<IJObject> fields;
-        private List<String> fieldNames;
-        private List<IAType> fieldTypes;
-        private int numFieldsAdded = 0;
-        private List<Boolean> openField;
-
-        public JRecord(ARecordType recordType) {
-            this.recordType = recordType;
-            this.fields = new ArrayList<IJObject>();
-            initFieldInfo();
-        }
+        private IJObject[] fields;
+        private Map<String, IJObject> openFields;
 
         public JRecord(ARecordType recordType, IJObject[] fields) {
             this.recordType = recordType;
-            this.fields = new ArrayList<IJObject>();
-            for (IJObject jObject : fields) {
-                this.fields.add(jObject);
-            }
-            initFieldInfo();
+            this.fields = fields;
+            this.openFields = new LinkedHashMap<String, IJObject>();
         }
 
-        public JRecord(String[] fieldNames, IJObject[] fields) throws AsterixException {
-            this.recordType = getARecordType(fieldNames, fields);
-            this.fields = new ArrayList<IJObject>();
-            for (IJObject jObject : fields) {
-                this.fields.add(jObject);
-            }
-            initFieldInfo();
+        public JRecord(ARecordType recordType, IJObject[] fields, LinkedHashMap<String, IJObject> openFields) {
+            this.recordType = recordType;
+            this.fields = fields;
+            this.openFields = openFields;
         }
 
         private ARecordType getARecordType(String[] fieldNames, IJObject[] fields) throws AsterixException {
@@ -723,74 +958,34 @@
             return recordType;
         }
 
-        private void initFieldInfo() {
-            this.openField = new ArrayList<Boolean>();
-            fieldNames = new ArrayList<String>();
-            for (String name : recordType.getFieldNames()) {
-                fieldNames.add(name);
-                openField.add(false);
-            }
-            fieldTypes = new ArrayList<IAType>();
-            for (IAType type : recordType.getFieldTypes()) {
-                fieldTypes.add(type);
-            }
-
-        }
-
-        private IAObject[] getIAObjectArray(List<IJObject> fields) {
-            IAObject[] retValue = new IAObject[fields.size()];
-            int index = 0;
-            for (IJObject jObject : fields) {
-                retValue[index++] = getIAObject(jObject);
-            }
-            return retValue;
-        }
-
-        private IAObject getIAObject(IJObject jObject) {
-            IAObject retVal = null;
-            switch (jObject.getTypeTag()) {
-                case RECORD:
-                    ARecordType recType = ((JRecord) jObject).getRecordType();
-                    IAObject[] fields = new IAObject[((JRecord) jObject).getFields().size()];
-                    int index = 0;
-                    for (IJObject field : ((JRecord) jObject).getFields()) {
-                        fields[index++] = getIAObject(field);
-                    }
-                    retVal = new AMutableRecord(recType, fields);
-                default:
-                    retVal = jObject.getIAObject();
-                    break;
-            }
-            return retVal;
-        }
-
-        public void addField(String fieldName, IJObject fieldValue) {
+        public void addField(String fieldName, IJObject fieldValue) throws AsterixException {
             int pos = getFieldPosByName(fieldName);
             if (pos >= 0) {
-                throw new IllegalArgumentException("field already defined");
+                throw new AsterixException("field already defined in closed part");
             }
-            numFieldsAdded++;
-            fields.add(fieldValue);
-            fieldNames.add(fieldName);
-            fieldTypes.add(fieldValue.getIAObject().getType());
-            openField.add(true);
+            if (openFields.get(fieldName) != null) {
+                throw new AsterixException("field already defined in open part");
+            }
+            openFields.put(fieldName, fieldValue);
         }
 
         public IJObject getValueByName(String fieldName) throws AsterixException, IOException {
+            // check closed part
             int fieldPos = getFieldPosByName(fieldName);
-            if (fieldPos < 0) {
-                throw new AsterixException("unknown field: " + fieldName);
+            if (fieldPos >= 0) {
+                return fields[fieldPos];
+            } else {
+                // check open part
+                IJObject fieldValue = openFields.get(fieldName);
+                if (fieldValue == null) {
+                    throw new AsterixException("unknown field: " + fieldName);
+                }
+                return fieldValue;
             }
-            return fields.get(fieldPos);
         }
 
-        public void setValueAtPos(int pos, IJObject jtype) {
-            fields.set(pos, jtype);
-        }
-
-        public void setValue(AMutableRecord mutableRecord) {
-            this.value = mutableRecord;
-            this.recordType = mutableRecord.getType();
+        public void setValueAtPos(int pos, IJObject jObject) {
+            fields[pos] = jObject;
         }
 
         @Override
@@ -798,16 +993,22 @@
             return recordType.getTypeTag();
         }
 
-        public void setField(String fieldName, IJObject fieldValue) {
+        public void setField(String fieldName, IJObject fieldValue) throws AsterixException {
             int pos = getFieldPosByName(fieldName);
-            fields.set(pos, fieldValue);
-            if (value != null) {
-                value.setValueAtPos(pos, fieldValue.getIAObject());
+            if (pos >= 0) {
+                fields[pos] = fieldValue;
+            } else {
+                if (openFields.get(fieldName) != null) {
+                    openFields.put(fieldName, fieldValue);
+                } else {
+                    throw new AsterixException("unknown field: " + fieldName);
+                }
             }
         }
 
         private int getFieldPosByName(String fieldName) {
             int index = 0;
+            String[] fieldNames = recordType.getFieldNames();
             for (String name : fieldNames) {
                 if (name.equals(fieldName)) {
                     return index;
@@ -821,40 +1022,75 @@
             return recordType;
         }
 
-        public List<IJObject> getFields() {
+        public IJObject[] getFields() {
             return fields;
         }
 
+        public RecordBuilder getRecordBuilder() {
+            RecordBuilder recordBuilder = new RecordBuilder();
+            recordBuilder.reset(recordType);
+            return recordBuilder;
+        }
+
+        public void serialize(DataOutput output, boolean writeTypeTag) throws HyracksDataException {
+            RecordBuilder recordBuilder = new RecordBuilder();
+            recordBuilder.reset(recordType);
+            int index = 0;
+            ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
+            for (IJObject jObject : fields) {
+                fieldValue.reset();
+                jObject.serialize(fieldValue.getDataOutput(), writeTypeTag);
+                recordBuilder.addField(index, fieldValue);
+                index++;
+            }
+
+            try {
+                if (openFields != null && !openFields.isEmpty()) {
+                    ArrayBackedValueStorage openFieldName = new ArrayBackedValueStorage();
+                    ArrayBackedValueStorage openFieldValue = new ArrayBackedValueStorage();
+                    AMutableString nameValue = new AMutableString(""); // get from the pool
+                    for (Entry<String, IJObject> entry : openFields.entrySet()) {
+                        openFieldName.reset();
+                        openFieldValue.reset();
+                        nameValue.setValue(entry.getKey());
+                        openFieldName.getDataOutput().write(ATypeTag.STRING.serialize());
+                        AStringSerializerDeserializer.INSTANCE.serialize(nameValue, openFieldName.getDataOutput());
+                        entry.getValue().serialize(openFieldValue.getDataOutput(), true);
+                        recordBuilder.addField(openFieldName, openFieldValue);
+                    }
+                }
+            } catch (IOException | AsterixException ae) {
+                throw new HyracksDataException(ae);
+            }
+            try {
+                recordBuilder.write(output, writeTypeTag);
+            } catch (IOException | AsterixException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
         @Override
         public IAObject getIAObject() {
-            if (value == null || numFieldsAdded > 0) {
-                value = new AMutableRecord(recordType, getIAObjectArray(fields));
-            }
             return value;
         }
 
-        public void close() {
-            if (numFieldsAdded > 0) {
-                int totalFields = fieldNames.size();
-                for (int i = 0; i < numFieldsAdded; i++) {
-                    fieldNames.remove(totalFields - 1 - i);
-                    fieldTypes.remove(totalFields - 1 - i);
-                    fields.remove(totalFields - 1 - i);
+        public void reset() throws AlgebricksException {
+            if (openFields != null && !openFields.isEmpty()) {
+                openFields.clear();
+            }
+            if (fields != null) {
+                for (IJObject field : fields) {
+                    if (field != null) {
+                        field.reset();
+                    }
                 }
-                numFieldsAdded = 0;
             }
         }
 
-        public List<Boolean> getOpenField() {
-            return openField;
-        }
-
-        public List<String> getFieldNames() {
-            return fieldNames;
-        }
-
-        public List<IAType> getFieldTypes() {
-            return fieldTypes;
+        public void reset(IJObject[] fields, LinkedHashMap<String, IJObject> openFields) throws AlgebricksException {
+            this.reset();
+            this.fields = fields;
+            this.openFields = openFields;
         }
 
     }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/Datatypes.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/Datatypes.java
new file mode 100644
index 0000000..e126bf6
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/Datatypes.java
@@ -0,0 +1,21 @@
+package edu.uci.ics.asterix.external.util;
+
+public class Datatypes {
+
+    public static final class Tweet {
+        public static final String ID = "id";
+        public static final String USER = "user";
+        public static final String MESSAGE = "message_text";
+        public static final String LATITUDE = "latitude";
+        public static final String LONGITUDE = "longitude";
+        public static final String CREATED_AT = "created_at";
+        public static final String SCREEN_NAME = "screen_name";
+        public static final String COUNTRY = "country";
+    }
+
+    public static final class ProcessedTweet {
+        public static final String USER_NAME = "user_name";
+        public static final String LOCATION = "location";
+        public static final String TOPICS = "topics";
+    }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TweetProcessor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TweetProcessor.java
new file mode 100644
index 0000000..092d715
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TweetProcessor.java
@@ -0,0 +1,72 @@
+package edu.uci.ics.asterix.external.util;
+
+import twitter4j.Status;
+import twitter4j.User;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableRecord;
+import edu.uci.ics.asterix.om.base.AMutableString;
+import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.om.types.ARecordType;
+
+public class TweetProcessor {
+
+    private IAObject[] mutableTweetFields;
+    private IAObject[] mutableUserFields;
+    private AMutableRecord mutableRecord;
+    private AMutableRecord mutableUser;
+
+    public TweetProcessor(ARecordType recordType) {
+        mutableUserFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableInt32(0),
+                new AMutableInt32(0), new AMutableString(null), new AMutableInt32(0) };
+        mutableUser = new AMutableRecord((ARecordType) recordType.getFieldTypes()[1], mutableUserFields);
+
+        mutableTweetFields = new IAObject[] { new AMutableString(null), mutableUser, new AMutableDouble(0),
+                new AMutableDouble(0), new AMutableString(null), new AMutableString(null) };
+        mutableRecord = new AMutableRecord(recordType, mutableTweetFields);
+
+    }
+
+    public AMutableRecord processNextTweet(Status tweet) {
+        User user = tweet.getUser();
+        ((AMutableString) mutableUserFields[0]).setValue(getNormalizedString(user.getScreenName()));
+        ((AMutableString) mutableUserFields[1]).setValue(getNormalizedString(user.getLang()));
+        ((AMutableInt32) mutableUserFields[2]).setValue(user.getFriendsCount());
+        ((AMutableInt32) mutableUserFields[3]).setValue(user.getStatusesCount());
+        ((AMutableString) mutableUserFields[4]).setValue(getNormalizedString(user.getName()));
+        ((AMutableInt32) mutableUserFields[5]).setValue(user.getFollowersCount());
+
+        ((AMutableString) mutableTweetFields[0]).setValue(tweet.getId() + "");
+
+        for (int i = 0; i < 6; i++) {
+            ((AMutableRecord) mutableTweetFields[1]).setValueAtPos(i, mutableUserFields[i]);
+        }
+        if (tweet.getGeoLocation() != null) {
+            ((AMutableDouble) mutableTweetFields[2]).setValue(tweet.getGeoLocation().getLatitude());
+            ((AMutableDouble) mutableTweetFields[3]).setValue(tweet.getGeoLocation().getLongitude());
+        } else {
+            ((AMutableDouble) mutableTweetFields[2]).setValue(0);
+            ((AMutableDouble) mutableTweetFields[3]).setValue(0);
+        }
+        ((AMutableString) mutableTweetFields[4]).setValue(getNormalizedString(tweet.getCreatedAt().toString()));
+        ((AMutableString) mutableTweetFields[5]).setValue(getNormalizedString(tweet.getText()));
+
+        for (int i = 0; i < 6; i++) {
+            mutableRecord.setValueAtPos(i, mutableTweetFields[i]);
+        }
+
+        return mutableRecord;
+
+    }
+
+    public static String getNormalizedString(String originalString) {
+        String asciiText = originalString.replaceAll("[^\\x00-\\x7F]", "").replaceAll("\n", " ");
+        return asciiText.trim();
+
+    }
+
+    public AMutableRecord getMutableRecord() {
+        return mutableRecord;
+    }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java
new file mode 100644
index 0000000..bd1d75c
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java
@@ -0,0 +1,143 @@
+package edu.uci.ics.asterix.external.util;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import twitter4j.FilterQuery;
+import twitter4j.Twitter;
+import twitter4j.TwitterFactory;
+import twitter4j.TwitterStream;
+import twitter4j.TwitterStreamFactory;
+import twitter4j.conf.ConfigurationBuilder;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+
+public class TwitterUtil {
+
+    public static class ConfigurationConstants {
+        public static final String KEY_LOCATION = "location";
+        public static final String LOCATION_US = "US";
+    }
+
+    public static class GeoConstants {
+        public static Map<String, double[][]> boundingBoxes = initializeBoundingBoxes();
+        public static final double[][] US = new double[][] { { -124.848974, 24.396308 }, { -66.885444, 49.384358 } };
+    }
+
+    private static Map<String, double[][]> initializeBoundingBoxes() {
+        Map<String, double[][]> boundingBoxes = new HashMap<String, double[][]>();
+        boundingBoxes.put(ConfigurationConstants.LOCATION_US, new double[][] { { -124.848974, 24.396308 },
+                { -66.885444, 49.384358 } });
+        return boundingBoxes;
+    }
+
+    public static FilterQuery getFilterQuery(Map<String, String> configuration) throws AsterixException {
+        String locationValue = configuration.get(ConfigurationConstants.KEY_LOCATION);
+        double[][] locations = null;
+        if (locationValue != null) {
+            if (locationValue.contains(",")) {
+                String[] coordinatesString = locationValue.trim().split(",");
+                locations = new double[2][2];
+                for (int i = 0; i < 2; i++) {
+                    for (int j = 0; j < 2; j++) {
+                        try {
+                            locations[i][j] = Double.parseDouble(coordinatesString[2 * i + j]);
+                        } catch (NumberFormatException ne) {
+                            throw new AsterixException("Incorrect coordinate value " + coordinatesString[2 * i + j]);
+                        }
+                    }
+                }
+            } else {
+                locations = GeoConstants.boundingBoxes.get(locationValue);
+            }
+            if (locations != null) {
+                FilterQuery filterQuery = new FilterQuery();
+                filterQuery.locations(locations);
+                return filterQuery;
+            }
+        }
+        return null;
+
+    }
+
+    public static Twitter getTwitterService(Map<String, String> configuration) {
+        ConfigurationBuilder cb = getAuthConfiguration(configuration);
+        TwitterFactory tf = new TwitterFactory(cb.build());
+        Twitter twitter = tf.getInstance();
+        return twitter;
+    }
+
+    public static TwitterStream getTwitterStream(Map<String, String> configuration) {
+        ConfigurationBuilder cb = getAuthConfiguration(configuration);
+        TwitterStreamFactory factory = new TwitterStreamFactory(cb.build());
+        return factory.getInstance();
+    }
+
+    private static ConfigurationBuilder getAuthConfiguration(Map<String, String> configuration) {
+        ConfigurationBuilder cb = new ConfigurationBuilder();
+        cb.setDebugEnabled(true);
+        String oAuthConsumerKey = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_KEY);
+        String oAuthConsumerSecret = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_SECRET);
+        String oAuthAccessToken = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN);
+        String oAuthAccessTokenSecret = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
+
+        cb.setOAuthConsumerKey(oAuthConsumerKey);
+        cb.setOAuthConsumerSecret(oAuthConsumerSecret);
+        cb.setOAuthAccessToken(oAuthAccessToken);
+        cb.setOAuthAccessTokenSecret(oAuthAccessTokenSecret);
+        return cb;
+    }
+
+    public static void initializeConfigurationWithAuthInfo(Map<String, String> configuration) throws AsterixException {
+        String authMode = configuration.get(AuthenticationConstants.AUTHENTICATION_MODE);
+        if (authMode == null) {
+            authMode = AuthenticationConstants.AUTHENTICATION_MODE_FILE;
+        }
+        try {
+            switch (authMode) {
+                case AuthenticationConstants.AUTHENTICATION_MODE_FILE:
+                    Properties prop = new Properties();
+                    String authFile = configuration.get(AuthenticationConstants.OAUTH_AUTHENTICATION_FILE);
+                    if (authFile == null) {
+                        authFile = AuthenticationConstants.DEFAULT_AUTH_FILE;
+                    }
+                    InputStream in = TwitterUtil.class.getResourceAsStream(authFile);
+                    prop.load(in);
+                    in.close();
+                    configuration.put(AuthenticationConstants.OAUTH_CONSUMER_KEY,
+                            prop.getProperty(AuthenticationConstants.OAUTH_CONSUMER_KEY));
+                    configuration.put(AuthenticationConstants.OAUTH_CONSUMER_SECRET,
+                            prop.getProperty(AuthenticationConstants.OAUTH_CONSUMER_SECRET));
+                    configuration.put(AuthenticationConstants.OAUTH_ACCESS_TOKEN,
+                            prop.getProperty(AuthenticationConstants.OAUTH_ACCESS_TOKEN));
+                    configuration.put(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET,
+                            prop.getProperty(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET));
+                    break;
+                case AuthenticationConstants.AUTHENTICATION_MODE_EXPLICIT:
+                    break;
+            }
+        } catch (Exception e) {
+            throw new AsterixException("Incorrect configuration! unable to load authentication credentials "
+                    + e.getMessage());
+        }
+    }
+
+    public static final class AuthenticationConstants {
+        public static final String OAUTH_CONSUMER_KEY = "consumer.key";
+        public static final String OAUTH_CONSUMER_SECRET = "consumer.secret";
+        public static final String OAUTH_ACCESS_TOKEN = "access.token";
+        public static final String OAUTH_ACCESS_TOKEN_SECRET = "access.token.secret";
+        public static final String OAUTH_AUTHENTICATION_FILE = "authentication.file";
+        public static final String AUTHENTICATION_MODE = "authentication.mode";
+        public static final String AUTHENTICATION_MODE_FILE = "file";
+        public static final String AUTHENTICATION_MODE_EXPLICIT = "explicit";
+        public static final String DEFAULT_AUTH_FILE = "/feed/twitter/auth.properties"; // default authentication file
+    }
+
+    public static final class SearchAPIConstants {
+        public static final String QUERY = "query";
+        public static final String INTERVAL = "interval";
+    }
+
+}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFactory.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFactory.java
new file mode 100644
index 0000000..90fddcd
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library;
+
+import edu.uci.ics.asterix.external.library.IExternalScalarFunction;
+import edu.uci.ics.asterix.external.library.IFunctionFactory;
+
+public class AddHashTagsFactory implements IFunctionFactory {
+
+    @Override
+    public IExternalScalarFunction getExternalFunction() {
+        return new AddHashTagsFunction();
+    }
+
+}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFunction.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFunction.java
new file mode 100644
index 0000000..93a87f5
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFunction.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library;
+
+import edu.uci.ics.asterix.external.library.java.JObjects.JDouble;
+import edu.uci.ics.asterix.external.library.java.JObjects.JPoint;
+import edu.uci.ics.asterix.external.library.java.JObjects.JRecord;
+import edu.uci.ics.asterix.external.library.java.JObjects.JString;
+import edu.uci.ics.asterix.external.library.java.JObjects.JUnorderedList;
+import edu.uci.ics.asterix.external.library.java.JTypeTag;
+import edu.uci.ics.asterix.external.util.Datatypes;
+
+public class AddHashTagsFunction implements IExternalScalarFunction {
+
+    private JUnorderedList list = null;
+    private JPoint location = null;
+
+    @Override
+    public void initialize(IFunctionHelper functionHelper) {
+        list = new JUnorderedList(functionHelper.getObject(JTypeTag.STRING));
+        location = new JPoint(0, 0);
+    }
+
+    @Override
+    public void deinitialize() {
+    }
+
+    @Override
+    public void evaluate(IFunctionHelper functionHelper) throws Exception {
+        list.clear();
+        JRecord inputRecord = (JRecord) functionHelper.getArgument(0);
+        JString text = (JString) inputRecord.getValueByName(Datatypes.Tweet.MESSAGE);
+        JDouble latitude = (JDouble) inputRecord.getValueByName(Datatypes.Tweet.LATITUDE);
+        JDouble longitude = (JDouble) inputRecord.getValueByName(Datatypes.Tweet.LONGITUDE);
+
+        if (latitude != null && longitude != null) {
+            location.setValue(latitude.getValue(), longitude.getValue());
+        }
+        String[] tokens = text.getValue().split(" ");
+        for (String tk : tokens) {
+            if (tk.startsWith("#")) {
+                JString newField = (JString) functionHelper.getObject(JTypeTag.STRING);
+                newField.setValue(tk);
+                list.add(newField);
+            }
+        }
+
+        JRecord outputRecord = (JRecord) functionHelper.getResultObject();
+        outputRecord.setField(Datatypes.Tweet.ID, inputRecord.getValueByName(Datatypes.Tweet.ID));
+
+        JRecord userRecord = (JRecord) inputRecord.getValueByName(Datatypes.Tweet.USER);
+        outputRecord.setField(Datatypes.ProcessedTweet.USER_NAME,
+                userRecord.getValueByName(Datatypes.Tweet.SCREEN_NAME));
+
+        outputRecord.setField(Datatypes.ProcessedTweet.LOCATION, location);
+        outputRecord.setField(Datatypes.Tweet.CREATED_AT, inputRecord.getValueByName(Datatypes.Tweet.CREATED_AT));
+        outputRecord.setField(Datatypes.Tweet.MESSAGE, text);
+        outputRecord.setField(Datatypes.ProcessedTweet.TOPICS, list);
+
+        inputRecord.addField(Datatypes.ProcessedTweet.TOPICS, list);
+        functionHelper.setResult(outputRecord);
+    }
+
+}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFactory.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFactory.java
new file mode 100644
index 0000000..a2bec69
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library;
+
+
+public class AddHashTagsInPlaceFactory implements IFunctionFactory {
+
+    @Override
+    public IExternalScalarFunction getExternalFunction() {
+        return new AddHashTagsInPlaceFunction();
+    }
+
+}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFunction.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFunction.java
new file mode 100644
index 0000000..a3f6f702
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFunction.java
@@ -0,0 +1,54 @@
+/*  1
+ * Copyright 2009-2013 by The Regents of the University of California   2
+             * Licensed under the Apache License, Version 2.0 (the "License");  3
+ * you may not use this file except in compliance with the License. 4
+ * you may obtain a copy of the License from    5
+ *  6
+ *     http://www.apache.org/licenses/LICENSE-2.0   
+ *  
+ * Unless required by applicable law or agreed to in writing, software 
+ * distributed under the License is distributed on an "AS IS" BASIS,    
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and  
+ * limitations under the License.   
+ */
+package edu.uci.ics.asterix.external.library;
+
+import edu.uci.ics.asterix.external.library.java.JObjects.JRecord;
+import edu.uci.ics.asterix.external.library.java.JObjects.JString;
+import edu.uci.ics.asterix.external.library.java.JObjects.JUnorderedList;
+import edu.uci.ics.asterix.external.library.java.JTypeTag;
+import edu.uci.ics.asterix.external.util.Datatypes;
+
+public class AddHashTagsInPlaceFunction implements IExternalScalarFunction {
+
+    private JUnorderedList list = null;
+
+    @Override
+    public void initialize(IFunctionHelper functionHelper) {
+        list = new JUnorderedList(functionHelper.getObject(JTypeTag.STRING));
+    }
+
+    @Override
+    public void deinitialize() {
+    }
+
+    @Override
+    public void evaluate(IFunctionHelper functionHelper) throws Exception {
+        list.clear();
+        JRecord inputRecord = (JRecord) functionHelper.getArgument(0);
+        JString text = (JString) inputRecord.getValueByName(Datatypes.Tweet.MESSAGE);
+
+        String[] tokens = text.getValue().split(" ");
+        for (String tk : tokens) {
+            if (tk.startsWith("#")) {
+                JString newField = (JString) functionHelper.getObject(JTypeTag.STRING);
+                newField.setValue(tk);
+                list.add(newField);
+            }
+        }
+        inputRecord.addField(Datatypes.ProcessedTweet.TOPICS, list);
+        functionHelper.setResult(inputRecord);
+    }
+
+}
\ No newline at end of file
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/UpperCaseFunction.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/UpperCaseFunction.java
index 58995c2..ec04541 100644
--- a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/UpperCaseFunction.java
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/UpperCaseFunction.java
@@ -53,9 +53,6 @@
         JRecord result = (JRecord) functionHelper.getResultObject();
         result.setField("id", id);
         result.setField("text", text);
-        JString newField = (JString) functionHelper.getObject(JTypeTag.STRING);
-        newField.setValue(text.getValue().substring(random.nextInt(text.getValue().length())));
-        result.addField("substring", newField);
         functionHelper.setResult(result);
     }
 }
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapter.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapter.java
index 07f1a40..7a2597e 100644
--- a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapter.java
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapter.java
@@ -23,8 +23,9 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter.DataExchangeMode;
 import edu.uci.ics.asterix.external.dataset.adapter.StreamBasedAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -43,8 +44,8 @@
     private DummyGenerator generator;
 
     public TestTypedAdapter(ITupleParserFactory parserFactory, ARecordType sourceDatatype, IHyracksTaskContext ctx,
-            Map<String, String> configuration) throws IOException {
-        super(parserFactory, sourceDatatype, ctx);
+            Map<String, String> configuration, int partition) throws IOException {
+        super(parserFactory, sourceDatatype, ctx, partition);
         pos = new PipedOutputStream();
         pis = new PipedInputStream(pos);
         this.configuration = configuration;
@@ -131,4 +132,9 @@
         generator.stop();
     }
 
+    @Override
+    public boolean handleException(Exception e) {
+        return false;
+    }
+
 }
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java
index b042e9c..5416ce2 100644
--- a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -17,19 +17,23 @@
 import java.util.Map;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory.SupportedOperation;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
 
-public class TestTypedAdapterFactory implements ITypedAdapterFactory {
+public class TestTypedAdapterFactory implements IFeedAdapterFactory {
 
     /**
      * 
@@ -38,7 +42,7 @@
 
     public static final String NAME = "test_typed_adapter";
 
-    private static ARecordType adapterOutputType = initOutputType();
+    private ARecordType outputType;
 
     public static final String KEY_NUM_OUTPUT_RECORDS = "num_output_records";
 
@@ -50,7 +54,7 @@
     }
 
     private static ARecordType initOutputType() {
-        String[] fieldNames = new String[] { "tweetid", "message-text" };
+        String[] fieldNames = new String[] { "id", "message-text" };
         IAType[] fieldTypes = new IAType[] { BuiltinType.AINT64, BuiltinType.ASTRING };
         ARecordType outputType = null;
         try {
@@ -67,29 +71,36 @@
     }
 
     @Override
-    public AdapterType getAdapterType() {
-        return AdapterType.TYPED;
-    }
-
-    @Override
     public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
         return new AlgebricksCountPartitionConstraint(1);
     }
 
     @Override
     public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
-        ITupleParserFactory tupleParserFactory = new AdmSchemafullRecordParserFactory(adapterOutputType);
-        return new TestTypedAdapter(tupleParserFactory, adapterOutputType, ctx, configuration);
+        ITupleParserFactory tupleParserFactory = new AsterixTupleParserFactory(configuration, outputType,
+                InputDataFormat.ADM);
+        return new TestTypedAdapter(tupleParserFactory, outputType, ctx, configuration, partition);
     }
 
     @Override
     public ARecordType getAdapterOutputType() {
-        return adapterOutputType;
+        return outputType;
     }
 
     @Override
-    public void configure(Map<String, String> configuration) throws Exception {
+    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
         this.configuration = configuration;
+        this.outputType = outputType;
+    }
+
+    @Override
+    public boolean isRecordTrackingEnabled() {
+        return false;
+    }
+
+    @Override
+    public IIntakeProgressTracker createIntakeProgressTracker() {
+        return null;
     }
 
 }
diff --git a/asterix-external-data/src/test/resources/text_functions.xml b/asterix-external-data/src/test/resources/library_descriptor.xml
similarity index 75%
rename from asterix-external-data/src/test/resources/text_functions.xml
rename to asterix-external-data/src/test/resources/library_descriptor.xml
index 8c7a92c..e35288f 100644
--- a/asterix-external-data/src/test/resources/text_functions.xml
+++ b/asterix-external-data/src/test/resources/library_descriptor.xml
@@ -11,6 +11,22 @@
 		</libraryFunction>
 		<libraryFunction>
 			<function_type>SCALAR</function_type>
+			<name>addHashTags</name>
+			<arguments>Tweet</arguments>
+			<return_type>ProcessedTweet</return_type>
+			<definition>edu.uci.ics.asterix.external.library.AddHashTagsFactory
+			</definition>
+		</libraryFunction>
+		<libraryFunction>
+			<function_type>SCALAR</function_type>
+			<name>addHashTagsInPlace</name>
+			<arguments>Tweet</arguments>
+			<return_type>ProcessedTweet</return_type>
+			<definition>edu.uci.ics.asterix.external.library.AddHashTagsInPlaceFactory
+			</definition>
+		</libraryFunction>
+		<libraryFunction>
+			<function_type>SCALAR</function_type>
 			<name>mysum</name>
 			<arguments>AINT32,AINT32</arguments>
 			<return_type>AINT32</return_type>
@@ -53,7 +69,8 @@
 	<libraryAdapters>
 		<libraryAdapter>
 			<name>test_typed_adapter</name>
-			<factory_class>edu.uci.ics.asterix.external.library.adapter.TestTypedAdapterFactory</factory_class>
+			<factory_class>edu.uci.ics.asterix.external.library.adapter.TestTypedAdapterFactory
+			</factory_class>
 		</libraryAdapter>
 	</libraryAdapters>
 </externalLibrary>