external udfs support: taking checkpoint
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization_udfs@1736 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/test/resources/nontagged/custord/local/all-scan.aql b/asterix-app/src/test/resources/nontagged/custord/local/all-scan.aql
index 1bd4b73..cd51b5e 100644
--- a/asterix-app/src/test/resources/nontagged/custord/local/all-scan.aql
+++ b/asterix-app/src/test/resources/nontagged/custord/local/all-scan.aql
@@ -52,4 +52,4 @@
let $c3 := int32("320")
let $c4 := int64("640")
return {"int8": $c1,"int16": $c2,"int32": $c3, "int8co": $o.int8co, "int64": $c4}
-*/
\ No newline at end of file
+*/
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JTypeObjectFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JTypeObjectFactory.java
index ae30695..ad56981 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JTypeObjectFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JTypeObjectFactory.java
@@ -1,10 +1,31 @@
package edu.uci.ics.asterix.external.library;
+import java.util.ArrayList;
+
import edu.uci.ics.asterix.external.library.java.IJObject;
+import edu.uci.ics.asterix.external.library.java.JObjects.JBoolean;
+import edu.uci.ics.asterix.external.library.java.JObjects.JCircle;
+import edu.uci.ics.asterix.external.library.java.JObjects.JDate;
+import edu.uci.ics.asterix.external.library.java.JObjects.JDateTime;
+import edu.uci.ics.asterix.external.library.java.JObjects.JDouble;
+import edu.uci.ics.asterix.external.library.java.JObjects.JDuration;
+import edu.uci.ics.asterix.external.library.java.JObjects.JFloat;
import edu.uci.ics.asterix.external.library.java.JObjects.JInt;
+import edu.uci.ics.asterix.external.library.java.JObjects.JInterval;
+import edu.uci.ics.asterix.external.library.java.JObjects.JLine;
+import edu.uci.ics.asterix.external.library.java.JObjects.JLong;
+import edu.uci.ics.asterix.external.library.java.JObjects.JOrderedList;
+import edu.uci.ics.asterix.external.library.java.JObjects.JPoint;
+import edu.uci.ics.asterix.external.library.java.JObjects.JPoint3D;
+import edu.uci.ics.asterix.external.library.java.JObjects.JPolygon;
import edu.uci.ics.asterix.external.library.java.JObjects.JRecord;
+import edu.uci.ics.asterix.external.library.java.JObjects.JRectangle;
import edu.uci.ics.asterix.external.library.java.JObjects.JString;
+import edu.uci.ics.asterix.external.library.java.JObjects.JTime;
+import edu.uci.ics.asterix.external.library.java.JObjects.JUnorderedList;
+import edu.uci.ics.asterix.om.types.AOrderedListType;
import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.AUnorderedListType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.util.container.IObjectFactory;
@@ -20,6 +41,63 @@
case STRING:
retValue = new JString("");
break;
+ case FLOAT:
+ retValue = new JFloat(0);
+ break;
+ case DOUBLE:
+ retValue = new JDouble(0);
+ break;
+ case BOOLEAN:
+ retValue = new JBoolean(false);
+ break;
+ case CIRCLE:
+ retValue = new JCircle(new JPoint(0, 0), 0);
+ break;
+ case POINT:
+ retValue = new JPoint(0, 0);
+ break;
+ case POINT3D:
+ retValue = new JPoint3D(0, 0, 0);
+ break;
+ case POLYGON:
+ retValue = new JPolygon(new ArrayList<JPoint>());
+ break;
+ case LINE:
+ retValue = new JLine(new JPoint(0, 0), new JPoint(0, 0));
+ break;
+ case RECTANGLE:
+ retValue = new JRectangle(new JPoint(0, 0), new JPoint(1, 1));
+ break;
+ case DATE:
+ retValue = new JDate(0);
+ break;
+ case DATETIME:
+ retValue = new JDateTime(0);
+ break;
+ case DURATION:
+ retValue = new JDuration(0, 0);
+ break;
+ case INTERVAL:
+ retValue = new JInterval(0, 0);
+ break;
+ case TIME:
+ retValue = new JTime(0);
+ break;
+ case INT64:
+ retValue = new JLong(0);
+ break;
+ case ORDEREDLIST:
+ AOrderedListType ot = (AOrderedListType) type;
+ IAType orderedItemType = ot.getItemType();
+ IJObject orderedItemObject = create(orderedItemType);
+ retValue = new JOrderedList(orderedItemObject);
+ break;
+ case UNORDEREDLIST:
+ AUnorderedListType ut = (AUnorderedListType) type;
+ IAType unorderedItemType = ut.getItemType();
+ IJObject unorderedItemObject = create(unorderedItemType);
+ retValue = new JUnorderedList(unorderedItemObject);
+ break;
case RECORD:
IAType[] fieldTypes = ((ARecordType) type).getFieldTypes();
IJObject[] fieldObjects = new IJObject[fieldTypes.length];
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java
index 637d4d4..8aed123 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java
@@ -14,8 +14,7 @@
*/
package edu.uci.ics.asterix.external.library;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
@@ -88,7 +87,7 @@
if (obj.getType().getTypeTag().equals(ATypeTag.RECORD)) {
ARecordType recType = (ARecordType) obj.getType();
if (recType.isOpen()) {
- writeOpenRecord((JRecord) result);
+ writeOpenRecord((JRecord) result, outputProvider.getDataOutput());
} else {
resultSerde = AqlSerializerDeserializerProvider.INSTANCE.getNonTaggedSerializerDeserializer(recType);
resultSerde.serialize(obj, outputProvider.getDataOutput());
@@ -100,7 +99,7 @@
reset();
}
- private void writeOpenRecord(JRecord jRecord) throws AsterixException, IOException {
+ private void writeOpenRecord(JRecord jRecord, DataOutput dataOutput) throws AsterixException, IOException {
ARecord aRecord = (ARecord) jRecord.getIAObject();
RecordBuilder recordBuilder = new RecordBuilder();
ARecordType recordType = aRecord.getType();
@@ -114,12 +113,19 @@
for (IJObject field : jRecord.getFields()) {
fieldValue.reset();
switch (field.getTypeTag()) {
- case INT32:
- AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32).serialize(
- field.getIAObject(), fieldValue.getDataOutput());
+ case RECORD:
+ ARecordType recType = (ARecordType) field.getIAObject().getType();
+ if (recType.isOpen()) {
+ fieldValue.getDataOutput().writeByte(recType.getTypeTag().serialize());
+ writeOpenRecord((JRecord) field, fieldValue.getDataOutput());
+ } else {
+ AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(
+ field.getIAObject().getType()).serialize(field.getIAObject(),
+ fieldValue.getDataOutput());
+ }
break;
- case STRING:
- AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING)
+ default:
+ AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(field.getIAObject().getType())
.serialize(field.getIAObject(), fieldValue.getDataOutput());
break;
}
@@ -136,7 +142,7 @@
fieldIndex++;
}
- recordBuilder.write(outputProvider.getDataOutput(), false);
+ recordBuilder.write(dataOutput, false);
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java
index 56ef4fb..52071af 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java
@@ -316,7 +316,6 @@
}
}
IAType[] fieldTypes = recordType.getFieldTypes();
- int fieldValueLength = 0;
ATypeTag fieldValueTypeTag = null;
IAType fieldType = fieldTypes[fieldNumber];
@@ -325,13 +324,13 @@
fieldType = ((AUnionType) fieldTypes[fieldNumber]).getUnionList().get(
NonTaggedFormatUtil.OPTIONAL_TYPE_INDEX_IN_UNION_LIST);
fieldValueTypeTag = fieldType.getTypeTag();
- fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(recordBits,
- fieldOffsets[fieldNumber], typeTag, false);
+ // fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(recordBits,
+ // fieldOffsets[fieldNumber], typeTag, false);
}
} else {
- typeTag = fieldTypes[fieldNumber].getTypeTag();
+ fieldValueTypeTag = fieldTypes[fieldNumber].getTypeTag();
}
- closedFields[fieldNumber] = getJType(typeTag, fieldType, dis, objectPool);
+ closedFields[fieldNumber] = getJType(fieldValueTypeTag, fieldType, dis, objectPool);
}
}
if (isExpanded) {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JTypes.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JTypes.java
deleted file mode 100644
index f5f8045..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JTypes.java
+++ /dev/null
@@ -1,404 +0,0 @@
-package edu.uci.ics.asterix.external.library.java;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
-import edu.uci.ics.asterix.external.library.JTypeObjectFactory;
-import edu.uci.ics.asterix.om.base.AInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableOrderedList;
-import edu.uci.ics.asterix.om.base.AMutableRecord;
-import edu.uci.ics.asterix.om.base.AMutableString;
-import edu.uci.ics.asterix.om.base.ARecord;
-import edu.uci.ics.asterix.om.base.AString;
-import edu.uci.ics.asterix.om.base.IAObject;
-import edu.uci.ics.asterix.om.types.AOrderedListType;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
-import edu.uci.ics.asterix.om.util.container.IObjectPool;
-import edu.uci.ics.asterix.om.util.container.ListObjectPool;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public class JTypes {
-
- public static final class JInt implements IJObject {
-
- private AMutableInt32 value = new AMutableInt32(0);
-
- public void setValue(int v) {
- this.value.setValue(v);
- }
-
- public int getValue() {
- return value.getIntegerValue().intValue();
- }
-
- @Override
- public ATypeTag getTypeTag() {
- return value.getType().getTypeTag();
- }
-
- @Override
- public IAObject getIAObject() {
- return value;
- }
- }
-
- public static final class JString implements IJObject {
-
- private AMutableString value = new AMutableString("");
-
- public void setValue(String v) {
- this.value.setValue(v);
- }
-
- public String getValue() {
- return value.getStringValue();
- }
-
- @Override
- public ATypeTag getTypeTag() {
- return value.getType().getTypeTag();
- }
-
- @Override
- public IAObject getIAObject() {
- return value;
- }
- }
-
- public static final class JList implements IJObject {
-
- private AOrderedListType listType;
- private AMutableOrderedList value;
-
- public JList(AOrderedListType listType) {
- this.listType = listType;
- this.value = new AMutableOrderedList(listType);
- }
-
- public void add(IJObject element) {
- value.add(element.getIAObject());
- }
-
- @Override
- public ATypeTag getTypeTag() {
- return value.getType().getTypeTag();
- }
-
- @Override
- public IAObject getIAObject() {
- return value;
- }
- }
-
- public static final class JRecord implements IJObject {
-
- private AMutableRecord value;
- private byte[] recordBytes;
- private ARecordType recordType;
- private List<IJObject> fields;
- private List<String> fieldNames;
- private List<IAType> fieldTypes;
- private final IObjectPool<IJObject, IAType> objectPool = new ListObjectPool<IJObject, IAType>(
- new JTypeObjectFactory());
- private int numFieldsAdded = 0;
- private List<Boolean> openField;
-
- private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
- private final static byte SER_RECORD_TYPE_TAG = ATypeTag.RECORD.serialize();
-
- public JRecord(ARecordType recordType) {
- this.recordType = recordType;
- this.fields = new ArrayList<IJObject>();
- initFieldInfo();
- }
-
- public JRecord(ARecordType recordType, IJObject[] fields) {
- this.recordType = recordType;
- this.fields = new ArrayList<IJObject>();
- for (IJObject jObject : fields) {
- this.fields.add(jObject);
- }
- initFieldInfo();
- }
-
- private void initFieldInfo() {
- this.openField = new ArrayList<Boolean>();
- fieldNames = new ArrayList<String>();
- for (String name : recordType.getFieldNames()) {
- fieldNames.add(name);
- openField.add(false);
- }
- fieldTypes = new ArrayList<IAType>();
- for (IAType type : recordType.getFieldTypes()) {
- fieldTypes.add(type);
- }
-
- }
-
- private IAObject[] getIAObjectArray(List<IJObject> fields) {
- IAObject[] retValue = new IAObject[fields.size()];
- int index = 0;
- for (IJObject jObject : fields) {
- retValue[index++] = getIAObject(jObject);
- }
- return retValue;
- }
-
- private IAObject getIAObject(IJObject jObject) {
- IAObject retVal = null;
- switch (jObject.getTypeTag()) {
- case INT32:
- case STRING:
- retVal = jObject.getIAObject();
- break;
- case RECORD:
- ARecordType recType = ((JRecord) jObject).getRecordType();
- IAObject[] fields = new IAObject[((JRecord) jObject).getFields().size()];
- int index = 0;
- for (IJObject field : ((JRecord) jObject).getFields()) {
- fields[index++] = getIAObject(field);
- }
-
- retVal = new AMutableRecord(recType, fields);
- }
- return retVal;
- }
-
- public void addField(String fieldName, IJObject fieldValue) {
- int pos = getFieldPosByName(fieldName);
- if (pos >= 0) {
- throw new IllegalArgumentException("field already defined");
- }
- numFieldsAdded++;
- fields.add(fieldValue);
- fieldNames.add(fieldName);
- fieldTypes.add(fieldValue.getIAObject().getType());
- openField.add(true);
- }
-
- public IJObject getValueByName(String fieldName) throws AsterixException, IOException {
- int fieldPos = getFieldPosByName(fieldName);
-
- if (recordBytes == null) {
- IJObject jtype = getJObject(value.getValueByPos(fieldPos));
- fields.set(fieldPos, jtype);
- return jtype;
- }
-
- if (recordBytes[0] == SER_NULL_TYPE_TAG) {
- return null;
- }
-
- if (recordBytes[0] != SER_RECORD_TYPE_TAG) {
- throw new AsterixException("Field accessor is not defined for values of type"
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(recordBytes[0]));
- }
-
- int fieldValueOffset = ARecordSerializerDeserializer.getFieldOffsetById(recordBytes, fieldPos,
- getNullBitMapSize(), recordType.isOpen());
-
- if (fieldValueOffset < 0) {
- return null;
- }
-
- IAType fieldValueType = recordType.getFieldTypes()[fieldPos];
- ATypeTag fieldValueTypeTag = null;
- int fieldValueLength = 0;
- if (fieldValueType.getTypeTag().equals(ATypeTag.UNION)) {
- if (NonTaggedFormatUtil.isOptionalField((AUnionType) fieldValueType)) {
- fieldValueTypeTag = ((AUnionType) fieldValueType).getUnionList()
- .get(NonTaggedFormatUtil.OPTIONAL_TYPE_INDEX_IN_UNION_LIST).getTypeTag();
- fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(recordBytes, fieldValueOffset,
- fieldValueTypeTag, false);
- // out.writeByte(fieldValueTypeTag.serialize());
- } else {
- // union .. the general case
- throw new NotImplementedException();
- }
- } else {
- fieldValueTypeTag = fieldValueType.getTypeTag();
- fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(recordBytes, fieldValueOffset,
- fieldValueTypeTag, false);
- // out.writeByte(fieldValueTypeTag.serialize());
- }
-
- IJObject fieldValue = getJType(fieldValueTypeTag, recordBytes, fieldValueOffset, fieldValueLength, fieldPos);
- fields.set(fieldPos, fieldValue);
- return fieldValue;
- }
-
- public void setValue(byte[] recordBytes) {
- this.recordBytes = recordBytes;
- }
-
- public void setValueAtPos(int pos, IJObject jtype) {
- fields.set(pos, jtype);
- }
-
- public void setValue(AMutableRecord mutableRecord) {
- this.value = mutableRecord;
- this.recordType = mutableRecord.getType();
- }
-
- @Override
- public ATypeTag getTypeTag() {
- return recordType.getTypeTag();
- }
-
- public void setField(String fieldName, IJObject fieldValue) {
- int pos = getFieldPosByName(fieldName);
- switch (fields.get(pos).getTypeTag()) {
- case INT32:
- ((JInt) fields.get(pos)).setValue(((AMutableInt32) fieldValue.getIAObject()).getIntegerValue()
- .intValue());
- break;
- case STRING:
- ((JString) fields.get(pos)).setValue(((AMutableString) fieldValue.getIAObject()).getStringValue());
- break;
- case RECORD:
- ((JRecord) fields.get(pos)).setValue(((AMutableRecord) fieldValue));
- break;
- }
- }
-
- private int getFieldPosByName(String fieldName) {
- int index = 0;
- for (String name : fieldNames) {
- if (name.equals(fieldName)) {
- return index;
- }
- index++;
- }
- return -1;
- }
-
- private IJObject getJType(ATypeTag typeTag, byte[] argument, int offset, int len, int fieldIndex)
- throws HyracksDataException {
- IJObject jObject;
- switch (typeTag) {
- case INT32: {
- int v = valueFromBytes(argument, offset, len);
- jObject = objectPool.allocate(BuiltinType.AINT32);
- ((JInt) jObject).setValue(v);
- break;
-
- }
- case STRING: {
- String v = AStringSerializerDeserializer.INSTANCE.deserialize(
- new DataInputStream(new ByteArrayInputStream(argument, offset, len))).getStringValue();
- jObject = objectPool.allocate(BuiltinType.ASTRING);
- ((JString) jObject).setValue(v);
- break;
- }
- case RECORD:
- ARecordType fieldRecordType = (ARecordType) recordType.getFieldTypes()[fieldIndex];
- jObject = objectPool.allocate(fieldRecordType);
- byte[] recBytes = new byte[len];
- System.arraycopy(argument, offset, recBytes, 0, len);
- ((JRecord) jObject).setValue(argument);
- break;
- default:
- throw new IllegalStateException("Argument type: " + typeTag);
- }
- return jObject;
- }
-
- private IJObject getJObject(IAObject iaobject) throws HyracksDataException {
- ATypeTag typeTag = iaobject.getType().getTypeTag();
- IJObject jtype;
- switch (typeTag) {
- case INT32: {
- int v = ((AInt32) iaobject).getIntegerValue().intValue();
- jtype = new JInt();
- ((JInt) jtype).setValue(v);
- break;
- }
- case STRING: {
- jtype = new JString();
- ((JString) jtype).setValue(((AString) iaobject).getStringValue());
- break;
- }
- case RECORD:
- ARecordType fieldRecordType = ((ARecord) iaobject).getType();
- jtype = new JRecord(fieldRecordType);
- ((JRecord) jtype).setValue((AMutableRecord) iaobject);
- break;
- default:
- throw new IllegalStateException("Argument type: " + typeTag);
- }
- return jtype;
- }
-
- private static int valueFromBytes(byte[] bytes, int offset, int length) {
- return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16)
- + ((bytes[offset + 2] & 0xff) << 8) + ((bytes[offset + 3] & 0xff) << 0);
- }
-
- public ARecordType getRecordType() {
- return recordType;
- }
-
- public List<IJObject> getFields() {
- return fields;
- }
-
- @Override
- public IAObject getIAObject() {
- if (value == null || numFieldsAdded > 0) {
- value = new AMutableRecord(recordType, getIAObjectArray(fields));
- }
- return value;
- }
-
- public void close() {
- objectPool.reset();
- if (numFieldsAdded > 0) {
- int totalFields = fieldNames.size();
- for (int i = 0; i < numFieldsAdded; i++) {
- fieldNames.remove(totalFields - 1 - i);
- fieldTypes.remove(totalFields - 1 - i);
- fields.remove(totalFields - 1 - i);
- }
- numFieldsAdded = 0;
- }
- }
-
- private int getNullBitMapSize() {
- int nullBitmapSize = 0;
- if (NonTaggedFormatUtil.hasNullableField(recordType)) {
- nullBitmapSize = (int) Math.ceil(recordType.getFieldNames().length / 8.0);
- } else {
- nullBitmapSize = 0;
- }
- return nullBitmapSize;
- }
-
- public List<Boolean> getOpenField() {
- return openField;
- }
-
- public List<String> getFieldNames() {
- return fieldNames;
- }
-
- public List<IAType> getFieldTypes() {
- return fieldTypes;
- }
-
- }
-}
\ No newline at end of file