New feed fixes: Updated the ExternalFunctionProvider to improve handling "setResult"
Change-Id: I7c026e2f3b927bda2628835c15318d6c96f8b043
Reviewed-on: https://asterix-gerrit.ics.uci.edu/321
Reviewed-by: Ian Maxon <imaxon@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java
index 9e13e21..76abeb4 100755
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java
@@ -15,10 +15,12 @@
package edu.uci.ics.asterix.external.library;
import edu.uci.ics.asterix.om.functions.IExternalFunctionInfo;
+import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class ExternalFunctionProvider {
@@ -38,6 +40,8 @@
}
class ExternalScalarFunction extends ExternalFunction implements IExternalScalarFunction, ICopyEvaluator {
+ private final static byte SER_RECORD_TYPE_TAG = ATypeTag.RECORD.serialize();
+ private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
public ExternalScalarFunction(IExternalFunctionInfo finfo, ICopyEvaluatorFactory args[],
IDataOutputProvider outputProvider) throws AlgebricksException {
@@ -57,12 +61,22 @@
functionHelper.reset();
} catch (Exception e) {
e.printStackTrace();
- //throw new AlgebricksException(e);
+ throw new AlgebricksException(e);
}
}
public void evaluate(IFunctionHelper argumentProvider) throws Exception {
((IExternalScalarFunction) externalFunction).evaluate(argumentProvider);
+ /*
+ * Make sure that if "setResult" is not called,
+ * or the result object is null we let Hyracks storage manager know
+ * we want to discard a null object
+ */
+ byte byteOutput = ((ArrayBackedValueStorage) out).getByteArray()[0];
+ if (!argumentProvider.isValidResult() || byteOutput == SER_NULL_TYPE_TAG) {
+ out.getDataOutput().writeByte(SER_NULL_TYPE_TAG);
+ }
}
+
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java
index b2d2061..f16ff4a 100755
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java
@@ -14,12 +14,12 @@
*/
package edu.uci.ics.asterix.external.library;
-import java.io.IOException;
-
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.external.library.java.IJObject;
import edu.uci.ics.asterix.external.library.java.JTypeTag;
+import java.io.IOException;
+
public interface IFunctionHelper {
public IJObject getArgument(int index);
@@ -28,6 +28,8 @@
public void setResult(IJObject result) throws IOException, AsterixException;
+ public boolean isValidResult();
+
public IJObject getObject(JTypeTag jtypeTag);
public void reset();
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java
index e2e694a..f272008 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java
@@ -14,13 +14,10 @@
*/
package edu.uci.ics.asterix.external.library;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.external.library.java.IJObject;
import edu.uci.ics.asterix.external.library.java.JObjectPointableVisitor;
+import edu.uci.ics.asterix.external.library.java.JObjects.JNull;
import edu.uci.ics.asterix.external.library.java.JTypeTag;
import edu.uci.ics.asterix.om.functions.IExternalFunctionInfo;
import edu.uci.ics.asterix.om.pointables.AFlatValuePointable;
@@ -33,10 +30,15 @@
import edu.uci.ics.asterix.om.util.container.IObjectPool;
import edu.uci.ics.asterix.om.util.container.ListObjectPool;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
import edu.uci.ics.hyracks.data.std.api.IValueReference;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
public class JavaFunctionHelper implements IFunctionHelper {
private final IExternalFunctionInfo finfo;
@@ -49,6 +51,8 @@
private final PointableAllocator pointableAllocator;
private final Map<Integer, TypeInfo> poolTypeInfo;
+ private boolean isValidResult = false;
+
public JavaFunctionHelper(IExternalFunctionInfo finfo, IDataOutputProvider outputProvider)
throws AlgebricksException {
this.finfo = finfo;
@@ -72,14 +76,31 @@
@Override
public void setResult(IJObject result) throws IOException, AsterixException {
- try {
- result.serialize(outputProvider.getDataOutput(), true);
- result.reset();
- } catch (IOException | AlgebricksException e) {
- throw new HyracksDataException(e);
+ if (result == null) {
+ JNull.INSTANCE.serialize(outputProvider.getDataOutput(), true);
+ isValidResult = false;
+ } else {
+ try {
+ isValidResult = true;
+ result.serialize(outputProvider.getDataOutput(), true);
+ result.reset();
+ } catch (IOException | AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
}
}
+ /**
+ * Gets the value of the result flag
+ *
+ * @return
+ * boolean True is the setResult is called and result is not null
+ */
+ @Override
+ public boolean isValidResult() {
+ return this.isValidResult;
+ }
+
public void setArgument(int index, IValueReference valueReference) throws IOException, AsterixException {
IVisitablePointable pointable = null;
IJObject jObject = null;
@@ -134,6 +155,19 @@
case STRING:
retValue = objectPool.allocate(BuiltinType.ASTRING);
break;
+ case DOUBLE:
+ retValue = objectPool.allocate(BuiltinType.ADOUBLE);
+ break;
+ case NULL:
+ retValue = JNull.INSTANCE;
+ break;
+ default:
+ try {
+ throw new NotImplementedException("Object of type " + jtypeTag.name() + " not supported.");
+ } catch (IllegalStateException e) {
+ e.printStackTrace();
+ }
+ break;
}
return retValue;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java
index c2e4cf3..cde4495 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java
@@ -62,7 +62,6 @@
import edu.uci.ics.asterix.external.library.java.JObjects.JString;
import edu.uci.ics.asterix.external.library.java.JObjects.JTime;
import edu.uci.ics.asterix.external.library.java.JObjects.JUnorderedList;
-import edu.uci.ics.asterix.external.util.TweetProcessor;
import edu.uci.ics.asterix.om.base.ACircle;
import edu.uci.ics.asterix.om.base.ADuration;
import edu.uci.ics.asterix.om.base.ALine;
@@ -233,9 +232,10 @@
v = AStringSerializerDeserializer.INSTANCE.deserialize(
new DataInputStream(new ByteArrayInputStream(b, s+1, l-1))).getStringValue();
//v = new String(b, s+1, l, "UTF-8");
- TweetProcessor.getNormalizedString(v);
+ JObjectUtil.getNormalizedString(v);
+
IJObject jObject = objectPool.allocate(BuiltinType.ASTRING);
- ((JString) jObject).setValue(TweetProcessor.getNormalizedString(v));
+ ((JString) jObject).setValue(JObjectUtil.getNormalizedString(v));
return jObject;
}
}
@@ -555,8 +555,8 @@
throw new IllegalArgumentException("Cannot parse list item of type "
+ listType.getTypeTag());
default:
- typeInfo.reset(((AbstractCollectionType) listType).getItemType(),
- ((AbstractCollectionType) listType).getTypeTag());
+ IAType itemType = ((AbstractCollectionType) listType).getItemType();
+ typeInfo.reset(itemType, itemType.getTypeTag());
listItem = pointableVisitor.visit((AFlatValuePointable) itemPointable, typeInfo);
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java
index f5f404a..95a9efa 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java
@@ -14,10 +14,6 @@
*/
package edu.uci.ics.asterix.external.library.java;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
@@ -56,8 +52,39 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
public class JObjectUtil {
+ /**
+ * Normalize an input string by removing linebreaks, and replace them with space
+ * Also remove non-readable special characters
+ *
+ * @param originalString
+ * The input String
+ * @return
+ * String - the normalized string
+ */
+ public static String getNormalizedString(String originalString) {
+ int len = originalString.length();
+ char asciiBuff[] = new char[len];
+ int j = 0;
+ for (int i = 0; i < len; i++) {
+ char c = originalString.charAt(i);
+ if (c == '\n' || c == '\t' || c == '\r') {
+ asciiBuff[j] = ' ';
+ j++;
+ } else if (c > 0 && c <= 0x7f) {
+ asciiBuff[j] = c;
+ j++;
+ }
+ }
+
+ return new String(asciiBuff).trim();
+ }
+
public static IJObject getJType(ATypeTag typeTag, IAType type, ByteArrayAccessibleDataInputStream dis,
IObjectPool<IJObject, IAType> objectPool) throws IOException, AsterixException {
IJObject jObject;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java
index 61a60b6..19d7f01 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java
@@ -14,18 +14,6 @@
*/
package edu.uci.ics.asterix.external.library.java;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
import edu.uci.ics.asterix.builders.IAsterixListBuilder;
import edu.uci.ics.asterix.builders.RecordBuilder;
import edu.uci.ics.asterix.builders.UnorderedListBuilder;
@@ -77,6 +65,7 @@
import edu.uci.ics.asterix.om.base.AMutableString;
import edu.uci.ics.asterix.om.base.AMutableTime;
import edu.uci.ics.asterix.om.base.AMutableUnorderedList;
+import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.base.APoint;
import edu.uci.ics.asterix.om.base.ARectangle;
import edu.uci.ics.asterix.om.base.AString;
@@ -90,6 +79,18 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
public class JObjects {
public static abstract class JObject implements IJObject {
@@ -113,6 +114,47 @@
}
+ /*
+ * This class is necessary to be able to serialize null objects
+ * in cases of setting "null" results
+ *
+ *
+ */
+ public static class JNull implements IJObject {
+ public final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
+
+ public final static JNull INSTANCE = new JNull();
+
+ private JNull() {
+ }
+
+ @Override
+ public ATypeTag getTypeTag() {
+ return ATypeTag.NULL;
+ }
+
+ @Override
+ public IAObject getIAObject() {
+ return ANull.NULL;
+ }
+
+ @Override
+ public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+ if (writeTypeTag) {
+ try {
+ dataOutput.writeByte(SER_NULL_TYPE_TAG);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ }
+
public static final class JByte extends JObject {
public JByte(byte value) {
@@ -1026,6 +1068,10 @@
return fields;
}
+ public Map<String, IJObject> getOpenFields() {
+ return this.openFields;
+ }
+
public RecordBuilder getRecordBuilder() {
RecordBuilder recordBuilder = new RecordBuilder();
recordBuilder.reset(recordType);
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JTypeTag.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JTypeTag.java
index 9cf45e8..2465705 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JTypeTag.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JTypeTag.java
@@ -22,5 +22,6 @@
DOUBLE,
FLOAT,
LIST,
- OBJECT
+ OBJECT,
+ NULL
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TweetProcessor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TweetProcessor.java
index b0ff3cb..a1e784b 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TweetProcessor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TweetProcessor.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.asterix.external.util;
+import edu.uci.ics.asterix.external.library.java.JObjectUtil;
import twitter4j.Status;
import twitter4j.User;
import edu.uci.ics.asterix.om.base.AMutableDouble;
@@ -43,11 +44,11 @@
public AMutableRecord processNextTweet(Status tweet) {
User user = tweet.getUser();
- ((AMutableString) mutableUserFields[0]).setValue(getNormalizedString(user.getScreenName()));
- ((AMutableString) mutableUserFields[1]).setValue(getNormalizedString(user.getLang()));
+ ((AMutableString) mutableUserFields[0]).setValue(JObjectUtil.getNormalizedString(user.getScreenName()));
+ ((AMutableString) mutableUserFields[1]).setValue(JObjectUtil.getNormalizedString(user.getLang()));
((AMutableInt32) mutableUserFields[2]).setValue(user.getFriendsCount());
((AMutableInt32) mutableUserFields[3]).setValue(user.getStatusesCount());
- ((AMutableString) mutableUserFields[4]).setValue(getNormalizedString(user.getName()));
+ ((AMutableString) mutableUserFields[4]).setValue(JObjectUtil.getNormalizedString(user.getName()));
((AMutableInt32) mutableUserFields[5]).setValue(user.getFollowersCount());
((AMutableString) mutableTweetFields[0]).setValue(tweet.getId() + "");
@@ -62,8 +63,9 @@
((AMutableDouble) mutableTweetFields[2]).setValue(0);
((AMutableDouble) mutableTweetFields[3]).setValue(0);
}
- ((AMutableString) mutableTweetFields[4]).setValue(getNormalizedString(tweet.getCreatedAt().toString()));
- ((AMutableString) mutableTweetFields[5]).setValue(getNormalizedString(tweet.getText()));
+ ((AMutableString) mutableTweetFields[4]).setValue(JObjectUtil.getNormalizedString(
+ tweet.getCreatedAt().toString()));
+ ((AMutableString) mutableTweetFields[5]).setValue(JObjectUtil.getNormalizedString(tweet.getText()));
for (int i = 0; i < 6; i++) {
mutableRecord.setValueAtPos(i, mutableTweetFields[i]);
@@ -73,12 +75,6 @@
}
- public static String getNormalizedString(String originalString) {
- String asciiText = originalString.replaceAll("[^\\x00-\\x7F]", "").replaceAll("\n", " ");
- return asciiText.trim();
-
- }
-
public AMutableRecord getMutableRecord() {
return mutableRecord;
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
index 11016e0..ca6fb4d 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
@@ -14,17 +14,6 @@
*/
package edu.uci.ics.asterix.runtime.formats;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-
import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.exceptions.AsterixRuntimeException;
import edu.uci.ics.asterix.common.parse.IParseFileSplitsDecl;
@@ -358,6 +347,16 @@
import edu.uci.ics.hyracks.dataflow.common.data.parsers.LongParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
public class NonTaggedDataFormat implements IDataFormat {