checkpoint
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
index 1854245..d50ea72 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -40,7 +40,7 @@
  * A factory class for creating an instance of HDFSAdapter
  */
 @SuppressWarnings("deprecation")
-public class HDFSAdapterFactory extends FileSystemAdapterFactory {
+public class HDFSAdapterFactory extends StreamBasedAdapterFactory {
     private static final long serialVersionUID = 1L;
 
     public static final String HDFS_ADAPTER_NAME = "hdfs";
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
index 16e6ef7..b29e150 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -38,7 +38,7 @@
  * A factory class for creating an instance of HiveAdapter
  */
 @SuppressWarnings("deprecation")
-public class HiveAdapterFactory extends FileSystemAdapterFactory {
+public class HiveAdapterFactory extends StreamBasedAdapterFactory {
     private static final long serialVersionUID = 1L;
 
     public static final String HDFS_ADAPTER_NAME = "hdfs";
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
index 3d8bedf..5f23f96 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -32,7 +32,7 @@
  * NCFileSystemAdapter reads external data residing on the local file system of
  * an NC.
  */
-public class NCFileSystemAdapterFactory extends FileSystemAdapterFactory {
+public class NCFileSystemAdapterFactory extends StreamBasedAdapterFactory {
     private static final long serialVersionUID = 1L;
 
     public static final String NC_FILE_SYSTEM_ADAPTER_NAME = "localfs";
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/FileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
similarity index 80%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/FileSystemAdapterFactory.java
rename to asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
index 0fa428d..1287de4 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/FileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
@@ -9,6 +9,7 @@
 import edu.uci.ics.asterix.external.util.DNSResolverFactory;
 import edu.uci.ics.asterix.external.util.INodeResolver;
 import edu.uci.ics.asterix.external.util.INodeResolverFactory;
+import edu.uci.ics.asterix.metadata.feeds.ConditionalPushTupleParserFactory;
 import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
@@ -25,7 +26,10 @@
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
 
-public abstract class FileSystemAdapterFactory implements IAdapterFactory {
+public abstract class StreamBasedAdapterFactory implements IAdapterFactory {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = Logger.getLogger(StreamBasedAdapterFactory.class.getName());
 
     protected Map<String, Object> configuration;
     protected static INodeResolver nodeResolver;
@@ -36,13 +40,11 @@
     public static final String KEY_DELIMITER = "delimiter";
     public static final String KEY_PATH = "path";
     public static final String KEY_SOURCE_DATATYPE = "source-datatype";
-
     public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
     public static final String FORMAT_ADM = "adm";
-
     public static final String NODE_RESOLVER_FACTORY_PROPERTY = "node.Resolver";
-
-    private static final Logger LOGGER = Logger.getLogger(FileSystemAdapterFactory.class.getName());
+    public static final String BATCH_SIZE = "batch-size";
+    public static final String BATCH_INTERVAL = "batch-interval";
 
     protected ITupleParserFactory parserFactory;
     protected ITupleParser parser;
@@ -56,7 +58,8 @@
         typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
     }
 
-    protected ITupleParserFactory getDelimitedDataTupleParserFactory(ARecordType recordType) throws AsterixException {
+    protected ITupleParserFactory getDelimitedDataTupleParserFactory(ARecordType recordType, boolean conditionalPush)
+            throws AsterixException {
         int n = recordType.getFieldTypes().length;
         IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
         for (int i = 0; i < n; i++) {
@@ -73,12 +76,16 @@
         }
 
         Character delimiter = delimiterValue.charAt(0);
-        return new NtDelimitedDataTupleParserFactory(recordType, fieldParserFactories, delimiter);
+
+        return conditionalPush ? new ConditionalPushTupleParserFactory(recordType, fieldParserFactories, delimiter,
+                configuration) : new NtDelimitedDataTupleParserFactory(recordType, fieldParserFactories, delimiter);
     }
 
-    protected ITupleParserFactory getADMDataTupleParserFactory(ARecordType recordType) throws AsterixException {
+    protected ITupleParserFactory getADMDataTupleParserFactory(ARecordType recordType, boolean conditionalPush)
+            throws AsterixException {
         try {
-            return new AdmSchemafullRecordParserFactory(recordType);
+            return conditionalPush ? new ConditionalPushTupleParserFactory(recordType, configuration)
+                    : new AdmSchemafullRecordParserFactory(recordType);
         } catch (Exception e) {
             throw new AsterixException(e);
         }
@@ -86,15 +93,21 @@
     }
 
     protected void configureFormat(IAType sourceDatatype) throws Exception {
+        String propValue = (String) configuration.get(BATCH_SIZE);
+        int batchSize = propValue != null ? Integer.parseInt(propValue) : -1;
+        propValue = (String) configuration.get(BATCH_INTERVAL);
+        long batchInterval = propValue != null ? Long.parseLong(propValue) : -1;
+        boolean conditionalPush = batchSize > 0 || batchInterval > 0;
+
         String parserFactoryClassname = (String) configuration.get(KEY_PARSER_FACTORY);
         if (parserFactoryClassname == null) {
             String specifiedFormat = (String) configuration.get(KEY_FORMAT);
             if (specifiedFormat == null) {
                 throw new IllegalArgumentException(" Unspecified data format");
             } else if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
-                parserFactory = getDelimitedDataTupleParserFactory((ARecordType) sourceDatatype);
+                parserFactory = getDelimitedDataTupleParserFactory((ARecordType) sourceDatatype, conditionalPush);
             } else if (FORMAT_ADM.equalsIgnoreCase((String) configuration.get(KEY_FORMAT))) {
-                parserFactory = getADMDataTupleParserFactory((ARecordType) sourceDatatype);
+                parserFactory = getADMDataTupleParserFactory((ARecordType) sourceDatatype, conditionalPush);
             } else {
                 throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported");
             }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
index e3deba9..e162ffe 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
@@ -150,4 +150,8 @@
         return configuration;
     }
 
+    public void setWriter(IFrameWriter writer) {
+
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
index a11197b..8efe919 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
@@ -99,7 +99,6 @@
 
     }
 
-    @SuppressWarnings("unchecked")
     private void writeRecord(AMutableRecord record, DataOutput dataOutput, IARecordBuilder recordBuilder)
             throws IOException, AsterixException {
         ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
index e17db67..be4debe 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
@@ -3,6 +3,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 
+import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -21,6 +22,7 @@
     protected final ITupleParser tupleParser;
     protected final IAType sourceDatatype;
     protected IHyracksTaskContext ctx;
+    protected AdapterRuntimeManager runtimeManager;
 
     public StreamBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx) {
         this.tupleParser = parserFactory.createTupleParser(ctx);
@@ -33,4 +35,5 @@
         InputStream in = getInputStream(partition);
         tupleParser.parse(in, writer);
     }
+
 }
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/EchoDelayFactory.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/EchoDelayFactory.java
new file mode 100644
index 0000000..d15d661
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/EchoDelayFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library;
+
+
+public class EchoDelayFactory implements IFunctionFactory {
+
+    @Override
+    public IExternalScalarFunction getExternalFunction() {
+        return new EchoDelayFunction();
+    }
+
+}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/EchoDelayFunction.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/EchoDelayFunction.java
new file mode 100644
index 0000000..5f9be77
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/EchoDelayFunction.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library;
+
+import java.util.Random;
+
+import edu.uci.ics.asterix.external.library.java.JObjects.JRecord;
+
+public class EchoDelayFunction implements IExternalScalarFunction {
+
+    private Random rand = new Random();
+    private long sleepIntervalMin;
+    private long sleepIntervalMax;
+    private int range;
+
+    @Override
+    public void initialize(IFunctionHelper functionHelper) {
+        sleepIntervalMin = 50;
+        sleepIntervalMax = 100;
+        range = (new Long(sleepIntervalMax - sleepIntervalMin)).intValue();
+    }
+
+    @Override
+    public void deinitialize() {
+    }
+
+    @Override
+    public void evaluate(IFunctionHelper functionHelper) throws Exception {
+        JRecord inputRecord = (JRecord) functionHelper.getArgument(0);
+        long sleepInterval = rand.nextInt(range);
+     //   Thread.sleep(5);
+        functionHelper.setResult(inputRecord);
+    }
+}
diff --git a/asterix-external-data/src/test/resources/text_functions.xml b/asterix-external-data/src/test/resources/text_functions.xml
index bee6604..6d00029 100644
--- a/asterix-external-data/src/test/resources/text_functions.xml
+++ b/asterix-external-data/src/test/resources/text_functions.xml
@@ -41,5 +41,13 @@
 			<definition>edu.uci.ics.asterix.external.library.AllTypesFactory
 			</definition>
 		</function>
+		<function>
+			<function_type>SCALAR</function_type>
+			<name>echoDelay</name>
+			<arguments>TweetMessageType</arguments>
+			<return_type>TweetMessageType</return_type>
+			<definition>edu.uci.ics.asterix.external.library.EchoDelayFactory
+			</definition>
+		</function>
 	</functions>
 </library>