New feed fixes: Updated the ExternalFunctionProvider to improve handling "setResult"

Change-Id: I7c026e2f3b927bda2628835c15318d6c96f8b043
Reviewed-on: https://asterix-gerrit.ics.uci.edu/321
Reviewed-by: Ian Maxon <imaxon@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java
index 9e13e21..76abeb4 100755
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java
@@ -15,10 +15,12 @@
 package edu.uci.ics.asterix.external.library;
 
 import edu.uci.ics.asterix.om.functions.IExternalFunctionInfo;
+import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class ExternalFunctionProvider {
@@ -38,6 +40,8 @@
 }
 
 class ExternalScalarFunction extends ExternalFunction implements IExternalScalarFunction, ICopyEvaluator {
+    private final static byte SER_RECORD_TYPE_TAG = ATypeTag.RECORD.serialize();
+    private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
 
     public ExternalScalarFunction(IExternalFunctionInfo finfo, ICopyEvaluatorFactory args[],
             IDataOutputProvider outputProvider) throws AlgebricksException {
@@ -57,12 +61,22 @@
             functionHelper.reset();
         } catch (Exception e) {
             e.printStackTrace();
-            //throw new AlgebricksException(e);
+            throw new AlgebricksException(e);
         }
     }
 
     public void evaluate(IFunctionHelper argumentProvider) throws Exception {
         ((IExternalScalarFunction) externalFunction).evaluate(argumentProvider);
+        /*
+         * Make sure that if "setResult" is not called,
+         * or the result object is null we let Hyracks storage manager know
+         * we want to discard a null object
+         */
+        byte byteOutput = ((ArrayBackedValueStorage) out).getByteArray()[0];
+        if (!argumentProvider.isValidResult() || byteOutput == SER_NULL_TYPE_TAG) {
+            out.getDataOutput().writeByte(SER_NULL_TYPE_TAG);
+        }
     }
 
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java
index b2d2061..f16ff4a 100755
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java
@@ -14,12 +14,12 @@
  */
 package edu.uci.ics.asterix.external.library;
 
-import java.io.IOException;
-
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.external.library.java.IJObject;
 import edu.uci.ics.asterix.external.library.java.JTypeTag;
 
+import java.io.IOException;
+
 public interface IFunctionHelper {
 
     public IJObject getArgument(int index);
@@ -28,6 +28,8 @@
 
     public void setResult(IJObject result) throws IOException, AsterixException;
 
+    public boolean isValidResult();
+
     public IJObject getObject(JTypeTag jtypeTag);
 
     public void reset();
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java
index e2e694a..f272008 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java
@@ -14,13 +14,10 @@
  */
 package edu.uci.ics.asterix.external.library;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.external.library.java.IJObject;
 import edu.uci.ics.asterix.external.library.java.JObjectPointableVisitor;
+import edu.uci.ics.asterix.external.library.java.JObjects.JNull;
 import edu.uci.ics.asterix.external.library.java.JTypeTag;
 import edu.uci.ics.asterix.om.functions.IExternalFunctionInfo;
 import edu.uci.ics.asterix.om.pointables.AFlatValuePointable;
@@ -33,10 +30,15 @@
 import edu.uci.ics.asterix.om.util.container.IObjectPool;
 import edu.uci.ics.asterix.om.util.container.ListObjectPool;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
 import edu.uci.ics.hyracks.data.std.api.IValueReference;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
 public class JavaFunctionHelper implements IFunctionHelper {
 
     private final IExternalFunctionInfo finfo;
@@ -49,6 +51,8 @@
     private final PointableAllocator pointableAllocator;
     private final Map<Integer, TypeInfo> poolTypeInfo;
 
+    private boolean isValidResult = false;
+
     public JavaFunctionHelper(IExternalFunctionInfo finfo, IDataOutputProvider outputProvider)
             throws AlgebricksException {
         this.finfo = finfo;
@@ -72,14 +76,31 @@
 
     @Override
     public void setResult(IJObject result) throws IOException, AsterixException {
-        try {
-            result.serialize(outputProvider.getDataOutput(), true);
-            result.reset();
-        } catch (IOException  | AlgebricksException e) {
-            throw new HyracksDataException(e);
+        if (result == null) {
+            JNull.INSTANCE.serialize(outputProvider.getDataOutput(), true);
+            isValidResult = false;
+        } else {
+            try {
+                isValidResult = true;
+                result.serialize(outputProvider.getDataOutput(), true);
+                result.reset();
+            } catch (IOException | AlgebricksException e) {
+                throw new HyracksDataException(e);
+            }
         }
     }
 
+    /**
+     * Gets the value of the result flag
+     *
+     * @return
+     *    boolean True is the setResult is called and result is not null
+     */
+    @Override
+    public boolean isValidResult() {
+        return this.isValidResult;
+    }
+
     public void setArgument(int index, IValueReference valueReference) throws IOException, AsterixException {
         IVisitablePointable pointable = null;
         IJObject jObject = null;
@@ -134,6 +155,19 @@
             case STRING:
                 retValue = objectPool.allocate(BuiltinType.ASTRING);
                 break;
+            case DOUBLE:
+                retValue = objectPool.allocate(BuiltinType.ADOUBLE);
+                break;
+            case NULL:
+                retValue = JNull.INSTANCE;
+                break;
+            default:
+                try {
+                    throw new NotImplementedException("Object of type " + jtypeTag.name() + " not supported.");
+                } catch (IllegalStateException e) {
+                    e.printStackTrace();
+                }
+                break;
         }
         return retValue;
     }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java
index c2e4cf3..cde4495 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java
@@ -62,7 +62,6 @@
 import edu.uci.ics.asterix.external.library.java.JObjects.JString;
 import edu.uci.ics.asterix.external.library.java.JObjects.JTime;
 import edu.uci.ics.asterix.external.library.java.JObjects.JUnorderedList;
-import edu.uci.ics.asterix.external.util.TweetProcessor;
 import edu.uci.ics.asterix.om.base.ACircle;
 import edu.uci.ics.asterix.om.base.ADuration;
 import edu.uci.ics.asterix.om.base.ALine;
@@ -233,9 +232,10 @@
             v = AStringSerializerDeserializer.INSTANCE.deserialize(
                     new DataInputStream(new ByteArrayInputStream(b, s+1, l-1))).getStringValue();
             //v = new String(b, s+1, l, "UTF-8");
-            TweetProcessor.getNormalizedString(v);
+            JObjectUtil.getNormalizedString(v);
+
             IJObject jObject = objectPool.allocate(BuiltinType.ASTRING);
-            ((JString) jObject).setValue(TweetProcessor.getNormalizedString(v));
+            ((JString) jObject).setValue(JObjectUtil.getNormalizedString(v));
             return jObject;
         }
     }
@@ -555,8 +555,8 @@
                             throw new IllegalArgumentException("Cannot parse list item of type "
                                     + listType.getTypeTag());
                         default:
-                            typeInfo.reset(((AbstractCollectionType) listType).getItemType(),
-                                    ((AbstractCollectionType) listType).getTypeTag());
+                            IAType itemType = ((AbstractCollectionType) listType).getItemType();
+                            typeInfo.reset(itemType, itemType.getTypeTag());
                             listItem = pointableVisitor.visit((AFlatValuePointable) itemPointable, typeInfo);
 
                     }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java
index f5f404a..95a9efa 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java
@@ -14,10 +14,6 @@
  */
 package edu.uci.ics.asterix.external.library.java;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
@@ -56,8 +52,39 @@
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 public class JObjectUtil {
 
+    /**
+     *  Normalize an input string by removing linebreaks, and replace them with space
+     *  Also remove non-readable special characters
+     *
+     * @param originalString
+     *      The input String
+     * @return
+     *      String - the normalized string
+     */
+    public static String getNormalizedString(String originalString) {
+        int len = originalString.length();
+        char asciiBuff[] = new char[len];
+        int j = 0;
+        for (int i = 0; i < len; i++) {
+            char c = originalString.charAt(i);
+            if (c == '\n' || c == '\t' || c == '\r') {
+                asciiBuff[j] = ' ';
+                j++;
+            } else if (c > 0 && c <= 0x7f) {
+                asciiBuff[j] = c;
+                j++;
+            }
+        }
+
+        return new String(asciiBuff).trim();
+    }
+
     public static IJObject getJType(ATypeTag typeTag, IAType type, ByteArrayAccessibleDataInputStream dis,
             IObjectPool<IJObject, IAType> objectPool) throws IOException, AsterixException {
         IJObject jObject;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java
index 61a60b6..19d7f01 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java
@@ -14,18 +14,6 @@
  */
 package edu.uci.ics.asterix.external.library.java;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
 import edu.uci.ics.asterix.builders.IAsterixListBuilder;
 import edu.uci.ics.asterix.builders.RecordBuilder;
 import edu.uci.ics.asterix.builders.UnorderedListBuilder;
@@ -77,6 +65,7 @@
 import edu.uci.ics.asterix.om.base.AMutableString;
 import edu.uci.ics.asterix.om.base.AMutableTime;
 import edu.uci.ics.asterix.om.base.AMutableUnorderedList;
+import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.base.APoint;
 import edu.uci.ics.asterix.om.base.ARectangle;
 import edu.uci.ics.asterix.om.base.AString;
@@ -90,6 +79,18 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
 public class JObjects {
 
     public static abstract class JObject implements IJObject {
@@ -113,6 +114,47 @@
 
     }
 
+    /*
+     *  This class is necessary to be able to serialize null objects
+      *  in cases of setting "null" results
+     *
+     *
+     */
+    public static class JNull implements IJObject {
+        public final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
+
+        public final static JNull INSTANCE = new JNull();
+
+        private JNull() {
+        }
+
+        @Override
+        public ATypeTag getTypeTag() {
+            return ATypeTag.NULL;
+        }
+
+        @Override
+        public IAObject getIAObject() {
+            return ANull.NULL;
+        }
+
+        @Override
+        public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+            if (writeTypeTag) {
+                try {
+                    dataOutput.writeByte(SER_NULL_TYPE_TAG);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        }
+
+        @Override
+        public void reset() {
+        }
+
+    }
+
     public static final class JByte extends JObject {
 
         public JByte(byte value) {
@@ -1026,6 +1068,10 @@
             return fields;
         }
 
+        public Map<String, IJObject> getOpenFields() {
+            return this.openFields;
+        }
+
         public RecordBuilder getRecordBuilder() {
             RecordBuilder recordBuilder = new RecordBuilder();
             recordBuilder.reset(recordType);
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JTypeTag.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JTypeTag.java
index 9cf45e8..2465705 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JTypeTag.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JTypeTag.java
@@ -22,5 +22,6 @@
     DOUBLE,
     FLOAT,
     LIST,
-    OBJECT
+    OBJECT,
+    NULL
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TweetProcessor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TweetProcessor.java
index b0ff3cb..a1e784b 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TweetProcessor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TweetProcessor.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.asterix.external.util;
 
+import edu.uci.ics.asterix.external.library.java.JObjectUtil;
 import twitter4j.Status;
 import twitter4j.User;
 import edu.uci.ics.asterix.om.base.AMutableDouble;
@@ -43,11 +44,11 @@
 
     public AMutableRecord processNextTweet(Status tweet) {
         User user = tweet.getUser();
-        ((AMutableString) mutableUserFields[0]).setValue(getNormalizedString(user.getScreenName()));
-        ((AMutableString) mutableUserFields[1]).setValue(getNormalizedString(user.getLang()));
+        ((AMutableString) mutableUserFields[0]).setValue(JObjectUtil.getNormalizedString(user.getScreenName()));
+        ((AMutableString) mutableUserFields[1]).setValue(JObjectUtil.getNormalizedString(user.getLang()));
         ((AMutableInt32) mutableUserFields[2]).setValue(user.getFriendsCount());
         ((AMutableInt32) mutableUserFields[3]).setValue(user.getStatusesCount());
-        ((AMutableString) mutableUserFields[4]).setValue(getNormalizedString(user.getName()));
+        ((AMutableString) mutableUserFields[4]).setValue(JObjectUtil.getNormalizedString(user.getName()));
         ((AMutableInt32) mutableUserFields[5]).setValue(user.getFollowersCount());
 
         ((AMutableString) mutableTweetFields[0]).setValue(tweet.getId() + "");
@@ -62,8 +63,9 @@
             ((AMutableDouble) mutableTweetFields[2]).setValue(0);
             ((AMutableDouble) mutableTweetFields[3]).setValue(0);
         }
-        ((AMutableString) mutableTweetFields[4]).setValue(getNormalizedString(tweet.getCreatedAt().toString()));
-        ((AMutableString) mutableTweetFields[5]).setValue(getNormalizedString(tweet.getText()));
+        ((AMutableString) mutableTweetFields[4]).setValue(JObjectUtil.getNormalizedString(
+                tweet.getCreatedAt().toString()));
+        ((AMutableString) mutableTweetFields[5]).setValue(JObjectUtil.getNormalizedString(tweet.getText()));
 
         for (int i = 0; i < 6; i++) {
             mutableRecord.setValueAtPos(i, mutableTweetFields[i]);
@@ -73,12 +75,6 @@
 
     }
 
-    public static String getNormalizedString(String originalString) {
-        String asciiText = originalString.replaceAll("[^\\x00-\\x7F]", "").replaceAll("\n", " ");
-        return asciiText.trim();
-
-    }
-
     public AMutableRecord getMutableRecord() {
         return mutableRecord;
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
index 11016e0..ca6fb4d 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
@@ -14,17 +14,6 @@
  */
 package edu.uci.ics.asterix.runtime.formats;
 
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-
 import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.exceptions.AsterixRuntimeException;
 import edu.uci.ics.asterix.common.parse.IParseFileSplitsDecl;
@@ -358,6 +347,16 @@
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.LongParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 public class NonTaggedDataFormat implements IDataFormat {