Added AsterixListAccessor which provides convenient access to lists (was way overdue). Simplified the scan collection unnesting function based on it.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization@597 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/common/AsterixListAccessor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/common/AsterixListAccessor.java
new file mode 100644
index 0000000..fd6b691
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/common/AsterixListAccessor.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.evaluators.common;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AUnorderedListSerializerDeserializer;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+
+/**
+ * Utility class for accessing serialized unordered and ordered lists. 
+ */
+public class AsterixListAccessor {
+
+	protected byte[] listBytes;
+	protected int start;
+	protected ATypeTag listType;
+	protected ATypeTag itemType;
+	protected int size;
+	
+	public ATypeTag getListType() {
+		return listType;
+	}
+
+	public ATypeTag getItemType() {
+		return itemType;
+	}
+
+	public boolean itemsAreSelfDescribing() {
+		return itemType == ATypeTag.ANY;
+	}
+
+	public void reset(byte[] listBytes, int start) throws AsterixException {
+		this.listBytes = listBytes;
+		this.start = start;
+		listType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(listBytes[start]);		
+		if (listType != ATypeTag.UNORDEREDLIST && listType != ATypeTag.ORDEREDLIST) {
+			throw new AsterixException("Unsupported type: " + listType);
+		}
+		itemType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(listBytes[start + 1]);
+		if (listType == ATypeTag.UNORDEREDLIST) {
+			size = AUnorderedListSerializerDeserializer.getNumberOfItems(listBytes, start);
+		} else {
+			size = AOrderedListSerializerDeserializer.getNumberOfItems(listBytes, start);
+		}
+	}
+
+	public int size() {
+		return size;
+	}
+	
+	public int getItemOffset(int itemIndex) throws AsterixException {
+		if (listType == ATypeTag.UNORDEREDLIST) {
+			return AUnorderedListSerializerDeserializer.getItemOffset(listBytes, start, itemIndex);
+		} else {
+			return AOrderedListSerializerDeserializer.getItemOffset(listBytes, start, itemIndex);
+		}
+	}
+	
+	public int getItemLength(int itemOffset) throws AsterixException {
+		ATypeTag itemType = getItemType(itemOffset);
+		return NonTaggedFormatUtil.getFieldValueLength(listBytes, itemOffset, itemType, itemsAreSelfDescribing());
+	}
+	
+	public ATypeTag getItemType(int itemOffset) throws AsterixException {
+		if (itemType == ATypeTag.ANY) {
+			return EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(listBytes[itemOffset]);
+		} else {
+			return itemType;
+		}
+	}
+	
+	public void writeItem(int itemIndex, DataOutput dos) throws AsterixException, IOException {
+		int itemOffset = getItemOffset(itemIndex);
+		int itemLength = getItemLength(itemOffset);
+		if (itemsAreSelfDescribing()) {
+			++itemLength;
+		} else {
+			dos.writeByte(itemType.serialize());
+		}
+		dos.write(listBytes, itemOffset, itemLength);
+	}
+	
+	public byte[] getByteArray() {
+		return listBytes;
+	}
+	
+	public int getStart() {
+		return start;
+	}
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
index 7963b00..a66f775 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.asterix.runtime.unnestingfunctions.std;
 
 import java.io.DataOutput;
@@ -5,16 +20,9 @@
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.common.functions.FunctionConstants;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AUnorderedListSerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.asterix.runtime.evaluators.common.AsterixListAccessor;
 import edu.uci.ics.asterix.runtime.unnestingfunctions.base.AbstractUnnestingFunctionDynamicDescriptor;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
@@ -22,7 +30,6 @@
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyUnnestingFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyUnnestingFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
 import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
@@ -51,14 +58,7 @@
     private static class ScanCollectionUnnestingFunctionFactory implements ICopyUnnestingFunctionFactory {
 
         private static final long serialVersionUID = 1L;
-
         private ICopyEvaluatorFactory listEvalFactory;
-        private final static byte SER_ORDEREDLIST_TYPE_TAG = ATypeTag.ORDEREDLIST.serialize();
-        private final static byte SER_UNORDEREDLIST_TYPE_TAG = ATypeTag.UNORDEREDLIST.serialize();
-        private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
-        private ATypeTag itemTag;
-        private byte serItemTypeTag;
-        private boolean selfDescList = false;
 
         public ScanCollectionUnnestingFunctionFactory(ICopyEvaluatorFactory arg) {
             this.listEvalFactory = arg;
@@ -71,84 +71,37 @@
 
             return new ICopyUnnestingFunction() {
 
+            	private final AsterixListAccessor listAccessor = new AsterixListAccessor();
                 private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
                 private ICopyEvaluator argEval = listEvalFactory.createEvaluator(inputVal);
-                @SuppressWarnings("unchecked")
-                private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-                        .getSerializerDeserializer(BuiltinType.ANULL);
-                private int numItems;
-                private int pos;
-                private int itemOffset;
-                private int itemLength;
-                private byte serListTag;
+                private int itemIndex;
 
                 @Override
                 public void init(IFrameTupleReference tuple) throws AlgebricksException {
                     try {
                         inputVal.reset();
                         argEval.evaluate(tuple);
-                        byte[] serList = inputVal.getByteArray();
-
-                        if (serList[0] == SER_NULL_TYPE_TAG) {
-                            nullSerde.serialize(ANull.NULL, out);
-                            return;
-                        }
-
-                        if (serList[0] != SER_ORDEREDLIST_TYPE_TAG && serList[0] != SER_UNORDEREDLIST_TYPE_TAG) {
-                            throw new AlgebricksException("Scan collection is not defined for values of type"
-                                    + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serList[0]));
-                        }
-
-                        serListTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0])
-                                .serialize();
-                        if (serListTag == SER_ORDEREDLIST_TYPE_TAG)
-                            numItems = AOrderedListSerializerDeserializer.getNumberOfItems(inputVal.getByteArray());
-                        else
-                            numItems = AUnorderedListSerializerDeserializer.getNumberOfItems(inputVal.getByteArray());
-
-                        itemTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serList[1]);
-                        if (itemTag == ATypeTag.ANY)
-                            selfDescList = true;
-                        else
-                            serItemTypeTag = serList[1];
-
-                        pos = 0;
-                    } catch (IOException e) {
-                        throw new AlgebricksException(e);
+                        listAccessor.reset(inputVal.getByteArray(), 0);
+                        itemIndex = 0;
+                    } catch (AsterixException e) {
+                    	throw new AlgebricksException(e);
                     }
                 }
 
                 @Override
                 public boolean step() throws AlgebricksException {
-
                     try {
-                        if (pos < numItems) {
-                            byte[] serList = inputVal.getByteArray();
-
-                            try {
-                                if (serListTag == SER_ORDEREDLIST_TYPE_TAG) {
-                                    itemOffset = AOrderedListSerializerDeserializer.getItemOffset(serList, pos);
-                                } else {
-                                    itemOffset = AUnorderedListSerializerDeserializer.getItemOffset(serList, pos);
-                                }
-                                if (selfDescList)
-                                    itemTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serList[itemOffset]);
-                                itemLength = NonTaggedFormatUtil.getFieldValueLength(serList, itemOffset, itemTag,
-                                        selfDescList);
-                                if (!selfDescList)
-                                    out.writeByte(serItemTypeTag);
-                                out.write(serList, itemOffset, itemLength + (!selfDescList ? 0 : 1));
-                            } catch (AsterixException e) {
-                                throw new AlgebricksException(e);
-                            }
-                            ++pos;
+                        if (itemIndex < listAccessor.size()) {
+                        	listAccessor.writeItem(itemIndex, out);
+                        	++itemIndex;
                             return true;
-                        } else
-                            return false;
-
+                        }                        
                     } catch (IOException e) {
                         throw new AlgebricksException(e);
+                    } catch (AsterixException e) {
+                    	throw new AlgebricksException(e);
                     }
+                    return false;
                 }
 
             };