preliminary changes for dynamic type cast and tests

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_opentype@257 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
index aa22be5..2824d27 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
@@ -15,6 +15,7 @@
 import edu.uci.ics.asterix.optimizer.rules.IfElseToSwitchCaseFunctionRule;
 import edu.uci.ics.asterix.optimizer.rules.InlineAssignIntoAggregateRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceBTreeIndexSearchRule;
+import edu.uci.ics.asterix.optimizer.rules.IntroduceCastRecordRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceRTreeIndexSearchRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule;
 import edu.uci.ics.asterix.optimizer.rules.LoadRecordFieldsRule;
@@ -82,6 +83,7 @@
         normalization.add(new ExtractDistinctByExpressionsRule());
         normalization.add(new ExtractOrderExpressionsRule());
         normalization.add(new TopDownTypeInferenceRule());
+        normalization.add(new IntroduceCastRecordRule());
         normalization.add(new ConstantFoldingRule());
         normalization.add(new UnnestToDataScanRule());
         normalization.add(new IfElseToSwitchCaseFunctionRule());
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ConstantFoldingRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ConstantFoldingRule.java
index faae5eb..36783ce 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ConstantFoldingRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ConstantFoldingRule.java
@@ -147,7 +147,8 @@
                 return new Pair<Boolean, ILogicalExpression>(changed, expr);
             }
             // TODO: currently ARecord is always a closed record
-            if (expr.getFunctionIdentifier().equals(AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR)) {
+            if (expr.getFunctionIdentifier().equals(AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR)
+                    || expr.getFunctionIdentifier().equals(AsterixBuiltinFunctions.CAST_RECORD)) {
                 return new Pair<Boolean, ILogicalExpression>(false, null);
             }
             if (expr.getFunctionIdentifier().equals(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME)) {
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceCastRecordRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceCastRecordRule.java
new file mode 100644
index 0000000..a63451a
--- /dev/null
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceCastRecordRule.java
@@ -0,0 +1,116 @@
+package edu.uci.ics.asterix.optimizer.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.asterix.aql.util.FunctionUtils;
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class IntroduceCastRecordRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        /**
+         * pattern match: sink insert assign
+         * 
+         * resulting plan: sink-insert-project-assign
+         * 
+         */
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
+        if (op1.getOperatorTag() != LogicalOperatorTag.SINK)
+            return false;
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
+        if (op2.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE)
+            return false;
+        AbstractLogicalOperator op3 = (AbstractLogicalOperator) op2.getInputs().get(0).getValue();
+        if (op3.getOperatorTag() != LogicalOperatorTag.ASSIGN)
+            return false;
+
+        InsertDeleteOperator insertDeleteOperator = (InsertDeleteOperator) op2;
+        AssignOperator oldAssignOperator = (AssignOperator) op3;
+
+        AqlDataSource dataSource = (AqlDataSource) insertDeleteOperator.getDataSource();
+        IAType[] schemaTypes = (IAType[]) dataSource.getSchemaTypes();
+        ARecordType requiredRecordType = (ARecordType) schemaTypes[schemaTypes.length - 1];
+
+        List<LogicalVariable> usedVariables = new ArrayList<LogicalVariable>();
+        VariableUtilities.getUsedVariables(oldAssignOperator, usedVariables);
+        LogicalVariable inputRecordVar = usedVariables.get(0);
+        IVariableTypeEnvironment env = oldAssignOperator.computeInputTypeEnvironment(context);
+        ARecordType inputRecordType = (ARecordType) env.getVarType(inputRecordVar);
+
+        boolean needCast = needCast(requiredRecordType, inputRecordType);
+        if (needCast) {
+            // insert
+            // project
+            // assign
+            // assign
+            AbstractFunctionCallExpression cast = new ScalarFunctionCallExpression(
+                    FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.CAST_RECORD));
+            ARecordType[] types = new ARecordType[2];
+            types[0] = requiredRecordType;
+            types[1] = inputRecordType;
+            cast.getArguments().add(
+                    new MutableObject<ILogicalExpression>(new VariableReferenceExpression(inputRecordVar)));
+            cast.setOpaqueParameters(types);
+            LogicalVariable newAssignVar = context.newVar();
+            AssignOperator newAssignOperator = new AssignOperator(newAssignVar, new MutableObject<ILogicalExpression>(
+                    cast));
+            newAssignOperator.getInputs().add(new MutableObject<ILogicalOperator>(op3));
+
+            List<LogicalVariable> projectVariables = new ArrayList<LogicalVariable>();
+            VariableUtilities.getProducedVariables(oldAssignOperator, projectVariables);
+            projectVariables.add(newAssignVar);
+            ProjectOperator projectOperator = new ProjectOperator(projectVariables);
+            projectOperator.getInputs().add(new MutableObject<ILogicalOperator>(newAssignOperator));
+
+            ILogicalExpression payloadExpr = new VariableReferenceExpression(newAssignVar);
+            MutableObject<ILogicalExpression> payloadRef = new MutableObject<ILogicalExpression>(payloadExpr);
+            InsertDeleteOperator newInserDeleteOperator = new InsertDeleteOperator(
+                    insertDeleteOperator.getDataSource(), payloadRef, insertDeleteOperator.getPrimaryKeyExpressions(),
+                    insertDeleteOperator.getOperation());
+            newInserDeleteOperator.getInputs().add(new MutableObject<ILogicalOperator>(projectOperator));
+            insertDeleteOperator.getInputs().clear();
+            op1.getInputs().get(0).setValue(newInserDeleteOperator);
+            return true;
+        }
+        return false;
+
+    }
+
+    private boolean needCast(ARecordType reqType, ARecordType inputType) {
+        if (!reqType.equals(inputType))
+            return true;
+        else
+            return false;
+    }
+
+}
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/TopDownTypeInferenceRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/TopDownTypeInferenceRule.java
index 958f5f6..4ff2e65 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/TopDownTypeInferenceRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/TopDownTypeInferenceRule.java
@@ -36,8 +36,8 @@
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
         /**
-         * pattern match: sink/insert/assign
-         * record type is propagated from insert data source to the record-constructor expression
+         * pattern match: sink/insert/assign record type is propagated from
+         * insert data source to the record-constructor expression
          */
         AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
         if (op1.getOperatorTag() != LogicalOperatorTag.SINK)
@@ -93,7 +93,8 @@
                         ILogicalExpression expr = expressionPointers.get(position).getValue();
                         if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
                             ScalarFunctionCallExpression funcExpr = (ScalarFunctionCallExpression) expr;
-                            changed = TypeComputerUtilities.setRequiredType(funcExpr, requiredRecordType);
+                            changed = TypeComputerUtilities.setRequiredAndInputTypes(funcExpr, requiredRecordType,
+                                    inputRecordType);
                             List<Mutable<ILogicalExpression>> args = funcExpr.getArguments();
                             int openPartStart = requiredRecordType.getFieldTypes().length * 2;
                             for (int j = openPartStart; j < args.size(); j++) {
@@ -104,6 +105,7 @@
                                 }
                             }
                         }
+                        context.computeAndSetTypeEnvironmentForOperator(originalAssign);
                     }
                 }
                 if (currentOperator.getInputs().size() > 0)
diff --git a/asterix-app/src/test/resources/logging.properties b/asterix-app/src/test/resources/logging.properties
index deb88307..4cf5545 100644
--- a/asterix-app/src/test/resources/logging.properties
+++ b/asterix-app/src/test/resources/logging.properties
@@ -61,5 +61,5 @@
 # messages:
 
 #edu.uci.ics.asterix.level = FINE
-#edu.uci.ics.algebricks.level = FINE
-#edu.uci.ics.hyracks.level = INFO
+edu.uci.ics.algebricks.level = FINE
+edu.uci.ics.hyracks.level = INFO
diff --git a/asterix-app/src/test/resources/runtimets/ignore.txt b/asterix-app/src/test/resources/runtimets/ignore.txt
index 0964e88..b60aa97 100644
--- a/asterix-app/src/test/resources/runtimets/ignore.txt
+++ b/asterix-app/src/test/resources/runtimets/ignore.txt
@@ -17,4 +17,4 @@
 open-closed
 dml/insert-into-empty-dataset-with-index_02.aql
 dml/insert-into-empty-dataset-with-index_01.aql
-dml/load-from-hdfs.aql
+dml/load-from-hdfs.aql
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/dml/opentype-c2o.aql b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-c2o.aql
new file mode 100644
index 0000000..13bfa77
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-c2o.aql
@@ -0,0 +1,44 @@
+drop dataverse testdv2 if exists;
+create dataverse testdv2;
+use dataverse testdv2;
+
+create type testtype as closed {
+  id: string,
+  name: string,
+  hobby: string
+}
+
+create type testtype2 as open {
+  id: string
+}
+
+create dataset testds(testtype) partitioned by key id;
+
+create dataset testds2(testtype2) partitioned by key id;
+ 
+insert into dataset testds (
+{ "id": "001", "name": "Person Three", "hobby": "music"}
+);
+
+insert into dataset testds (
+{ "id": "002", "name": "Person One", "hobby": "sports"}
+);
+
+insert into dataset testds (
+{ "id": "003", "name": "Person Two", "hobby": "movie"}
+);
+
+insert into dataset testds (
+{ "id": "004", "name": "Person Four", "hobby": "swimming"}
+);
+ 
+insert into dataset testds2 (
+ for $d in dataset("testds") 
+	return $d
+);
+
+write output to nc1:"rttest/dml_opentype-c2o.adm";
+
+for $d in dataset("testds2") 
+order by $d.id
+return $d
diff --git a/asterix-app/src/test/resources/runtimets/queries/dml/opentype-insert.aql b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-insert.aql
index 0c71f54..19dd1d6 100644
--- a/asterix-app/src/test/resources/runtimets/queries/dml/opentype-insert.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-insert.aql
@@ -1,3 +1,4 @@
+drop dataverse testdv2 if exists;
 create dataverse testdv2;
 use dataverse testdv2;
 
diff --git a/asterix-app/src/test/resources/runtimets/queries/dml/opentype-o2c.aql b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-o2c.aql
new file mode 100644
index 0000000..2657718
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-o2c.aql
@@ -0,0 +1,45 @@
+drop dataverse testdv2 if exists;
+create dataverse testdv2;
+use dataverse testdv2;
+
+create type testtype as open {
+  id: string,
+  name: string
+}
+
+create type testtype2 as closed {
+  id: string,
+  name: string,
+  hobby: string
+}
+
+create dataset testds(testtype) partitioned by key id;
+
+create dataset testds2(testtype2) partitioned by key id;
+ 
+insert into dataset testds (
+{ "id": "001", "name": "Person Three", "hobby": "music"}
+);
+
+insert into dataset testds (
+{ "id": "002", "name": "Person Three", "hobby": "football"}
+);
+
+insert into dataset testds (
+{ "id": "003", "name": "Person Three", "hobby": "movie"}
+);
+
+insert into dataset testds (
+{ "id": "004", "name": "Person Three", "hobby": "swimming"}
+);
+
+insert into dataset testds2 (
+ for $d in dataset("testds") 
+	return $d
+);
+
+write output to nc1:"rttest/dml_opentype-o2c.adm";
+
+for $d in dataset("testds") 
+order by $d.id
+return $d
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-closed/heterog-list01.aql b/asterix-app/src/test/resources/runtimets/queries/open-closed/heterog-list01.aql
index c54d020..d1edc3d 100644
--- a/asterix-app/src/test/resources/runtimets/queries/open-closed/heterog-list01.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/open-closed/heterog-list01.aql
@@ -15,7 +15,7 @@
 descrpt:string
 }
 
-create type TestType as {
+create type TestType as closed {
 id:int32,
 description:string,
 name:string,
@@ -30,3 +30,9 @@
 "name":"Cake",
 "batters":[{"id":345,"descprt":"Regular"},{"id":445,"descprt":"Chocolate"}] }
 );
+
+write output to nc1:"rttest/open-closed_heteror-list01.adm";
+
+for $d in dataset("T1") 
+return $d
+
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-closed/open-closed-01.aql b/asterix-app/src/test/resources/runtimets/queries/open-closed/open-closed-01.aql
index 0b6d0dd..6373569 100644
--- a/asterix-app/src/test/resources/runtimets/queries/open-closed/open-closed-01.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/open-closed/open-closed-01.aql
@@ -18,7 +18,9 @@
 
 create dataset testds(testType) partitioned by key id;
 
+
 insert into dataset testds({"id": 123, "name": "John Doe", "hobbies": {{ "scuba", "music" }} }
 );
 
+write output to nc1:"rttest/open-closed_open-closed-01.adm";
 for $l in dataset("testds") return $l
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-closed/open-closed-02.aql b/asterix-app/src/test/resources/runtimets/queries/open-closed/open-closed-02.aql
index 6963bee..c037cc8 100644
--- a/asterix-app/src/test/resources/runtimets/queries/open-closed/open-closed-02.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/open-closed/open-closed-02.aql
@@ -19,6 +19,7 @@
 
 create dataset testds(testType) partitioned by key id;
 
-insert into dataset testds({"id": 123, "name": "John Doe", "hobbies": {{ "scuba", "music" }} });
+insert into dataset testds({"id": 123);
 
+write output to nc1:"rttest/open-closed_open-closed-02.adm";
 for $l in dataset("testds") return $l
diff --git a/asterix-app/src/test/resources/runtimets/results/dml/opentype-c2o.adm b/asterix-app/src/test/resources/runtimets/results/dml/opentype-c2o.adm
new file mode 100644
index 0000000..834bcef
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/dml/opentype-c2o.adm
@@ -0,0 +1,4 @@
+{ "id": "001", "name": "Person Three", "hobby": "music" }
+{ "id": "002", "name": "Person One", "hobby": "sports" }
+{ "id": "003", "name": "Person Two", "hobby": "movie" }
+{ "id": "004", "name": "Person Four", "hobby": "swimming" }
diff --git a/asterix-app/src/test/resources/runtimets/results/dml/opentype-o2c.adm b/asterix-app/src/test/resources/runtimets/results/dml/opentype-o2c.adm
new file mode 100644
index 0000000..0c1e6d3
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/dml/opentype-o2c.adm
@@ -0,0 +1,4 @@
+{ "id": "001", "name": "Person Three", "hobby": "music" }
+{ "id": "002", "name": "Person Three", "hobby": "football" }
+{ "id": "003", "name": "Person Three", "hobby": "movie" }
+{ "id": "004", "name": "Person Three", "hobby": "swimming" }
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
index 5f8334d..9aeac2b 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.asterix.om.typecomputer.impl.ARectangleTypeComputer;
 import edu.uci.ics.asterix.om.typecomputer.impl.AStringTypeComputer;
 import edu.uci.ics.asterix.om.typecomputer.impl.BinaryBooleanOrNullFunctionTypeComputer;
+import edu.uci.ics.asterix.om.typecomputer.impl.CastRecordResultTypeComputer;
 import edu.uci.ics.asterix.om.typecomputer.impl.ClosedRecordConstructorResultType;
 import edu.uci.ics.asterix.om.typecomputer.impl.FieldAccessByIndexResultType;
 import edu.uci.ics.asterix.om.typecomputer.impl.InjectFailureTypeComputer;
@@ -81,7 +82,6 @@
         SI
     }
 
-    private final static Map<AsterixFunction, IFunctionInfo> asterixFunctionToInfo = new HashMap<AsterixFunction, IFunctionInfo>();
     private final static Map<FunctionIdentifier, IFunctionInfo> asterixFunctionIdToInfo = new HashMap<FunctionIdentifier, IFunctionInfo>();
 
     // it is supposed to be an identity mapping
@@ -334,6 +334,8 @@
             true);
     public final static FunctionIdentifier INJECT_FAILURE = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "inject-failure", 2, true);
+    public final static FunctionIdentifier CAST_RECORD = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "cast-record", 1, true);
 
     public static final FunctionIdentifier EQ = AlgebricksBuiltinFunctions.EQ;
     public static final FunctionIdentifier LE = AlgebricksBuiltinFunctions.LE;
@@ -351,13 +353,13 @@
         IFunctionInfo finfo = asterixFunctionIdToInfo.get(fid);
         if (finfo == null) {
             finfo = new AsterixFunctionInfo(fid, fid.isBuiltin());
-            if(fid.isBuiltin()){
+            if (fid.isBuiltin()) {
                 asterixFunctionIdToInfo.put(fid, finfo);
             }
         }
         return finfo;
     }
-    
+
     public static AsterixFunctionInfo lookupFunction(FunctionIdentifier fid) {
         return (AsterixFunctionInfo) asterixFunctionIdToInfo.get(fid);
     }
@@ -542,6 +544,7 @@
         add(SWITCH_CASE, NonTaggedSwitchCaseComputer.INSTANCE);
         add(REG_EXP, ABooleanTypeComputer.INSTANCE);
         add(INJECT_FAILURE, InjectFailureTypeComputer.INSTANCE);
+        add(CAST_RECORD, CastRecordResultTypeComputer.INSTANCE);
 
         add(TID, AInt32TypeComputer.INSTANCE);
         add(TIME_CONSTRUCTOR, OptionalATimeTypeComputer.INSTANCE);
@@ -676,8 +679,6 @@
         IFunctionInfo finfo = getAsterixFunctionInfo(fi);
         return finfo == null ? null : finfo.getFunctionIdentifier();
     }
-    
-    
 
     public static AggregateFunctionCallExpression makeAggregateFunctionExpression(FunctionIdentifier fi,
             List<Mutable<ILogicalExpression>> args) {
@@ -697,7 +698,7 @@
 
     public static boolean isAggregateFunctionSerializable(FunctionIdentifier fi) {
         IFunctionInfo finfo = getAsterixFunctionInfo(fi);
-        return aggregateToSerializableAggregate.get(finfo) != null ;
+        return aggregateToSerializableAggregate.get(finfo) != null;
     }
 
     public static AggregateFunctionCallExpression makeSerializableAggregateFunctionExpression(FunctionIdentifier fi,
@@ -706,8 +707,9 @@
         IFunctionInfo finfo = getAsterixFunctionInfo(fi);
         IFunctionInfo serializableFinfo = aggregateToSerializableAggregate.get(finfo);
         if (serializableFinfo == null)
-            throw new IllegalStateException("no serializable implementation for aggregate function " + serializableFinfo);
-       
+            throw new IllegalStateException("no serializable implementation for aggregate function "
+                    + serializableFinfo);
+
         IFunctionInfo fiLocal = aggregateToLocalAggregate.get(serializableFinfo);
         IFunctionInfo fiGlobal = aggregateToGlobalAggregate.get(serializableFinfo);
 
@@ -735,7 +737,8 @@
         builtinFunctionsSet.put(functionInfo, functionInfo);
         funTypeComputer.put(functionInfo, typeComputer);
         asterixFunctionIdToInfo.put(fi, functionInfo);
-       // AsterixFunction asterixFunction = new AsterixFunction(fi.getName(), fi.getArity());
+        // AsterixFunction asterixFunction = new AsterixFunction(fi.getName(),
+        // fi.getArity());
     }
 
     private static void addAgg(FunctionIdentifier fi) {
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/base/TypeComputerUtilities.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/base/TypeComputerUtilities.java
index abd4d66..e8786ec 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/base/TypeComputerUtilities.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/base/TypeComputerUtilities.java
@@ -21,13 +21,14 @@
         return openType;
     }
 
-    public static boolean setRequiredType(AbstractFunctionCallExpression expr, ARecordType requiredRecordType) {
+    public static boolean setRequiredAndInputTypes(AbstractFunctionCallExpression expr, ARecordType requiredRecordType,
+            ARecordType inputRecordType) {
         boolean changed = false;
         Object opaqueParameter = expr.getOpaqueParameters();
         if (opaqueParameter == null) {
-            opaqueParameter = requiredRecordType;
-            Object[] opaqueParameters = new Object[1];
-            opaqueParameters[0] = opaqueParameter;
+            Object[] opaqueParameters = new Object[2];
+            opaqueParameters[0] = requiredRecordType;
+            opaqueParameters[1] = inputRecordType;
             expr.setOpaqueParameters(opaqueParameters);
             changed = true;
         }
@@ -43,4 +44,13 @@
             return null;
     }
 
+    public static ARecordType getInputType(AbstractFunctionCallExpression expr) {
+        Object[] type = expr.getOpaqueParameters();
+        if (type != null) {
+            ARecordType recordType = (ARecordType) type[1];
+            return recordType;
+        } else
+            return null;
+    }
+
 }
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/CastRecordResultTypeComputer.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/CastRecordResultTypeComputer.java
new file mode 100644
index 0000000..40560fa
--- /dev/null
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/CastRecordResultTypeComputer.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.asterix.om.typecomputer.impl;
+
+import edu.uci.ics.asterix.om.typecomputer.base.IResultTypeComputer;
+import edu.uci.ics.asterix.om.typecomputer.base.TypeComputerUtilities;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+
+public class CastRecordResultTypeComputer implements IResultTypeComputer {
+
+    public static final CastRecordResultTypeComputer INSTANCE = new CastRecordResultTypeComputer();
+
+    @Override
+    public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
+            IMetadataProvider<?, ?> metadataProvider) throws AlgebricksException {
+        ScalarFunctionCallExpression funcExpr = (ScalarFunctionCallExpression) expression;
+        return TypeComputerUtilities.getRequiredType(funcExpr);
+    }
+}
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/RecordConstructorResultType.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/RecordConstructorResultType.java
index 8327f30..d20f43b 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/RecordConstructorResultType.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/RecordConstructorResultType.java
@@ -7,6 +7,7 @@
 import edu.uci.ics.asterix.om.base.AString;
 import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
 import edu.uci.ics.asterix.om.typecomputer.base.IResultTypeComputer;
+import edu.uci.ics.asterix.om.typecomputer.base.TypeComputerUtilities;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -32,6 +33,9 @@
     public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
             IMetadataProvider<?, ?> metadataProvider) throws AlgebricksException {
         AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression;
+        IAType reqType = TypeComputerUtilities.getRequiredType(f);
+        if (reqType != null)
+            return reqType;
         int n = f.getArguments().size() / 2;
         String[] fieldNames = new String[n];
         IAType[] fieldTypes = new IAType[n];
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/CastRecordDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/CastRecordDescriptor.java
new file mode 100644
index 0000000..f7d3887
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/CastRecordDescriptor.java
@@ -0,0 +1,71 @@
+package edu.uci.ics.asterix.runtime.evaluators.functions;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import edu.uci.ics.asterix.runtime.util.ARecordAccessor;
+import edu.uci.ics.asterix.runtime.util.ARecordCaster;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class CastRecordDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+
+    protected static final FunctionIdentifier FID_CAST = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "cast-record", 1, true);
+
+    private static final long serialVersionUID = 1L;
+    private ARecordType reqType;
+    private ARecordType inputType;
+
+    public void reset(ARecordType reqType, ARecordType inputType) {
+        this.reqType = reqType;
+        this.inputType = inputType;
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID_CAST;
+    }
+
+    @Override
+    public IEvaluatorFactory createEvaluatorFactory(final IEvaluatorFactory[] args) {
+        final IEvaluatorFactory recordEvalFactory = args[0];
+
+        return new IEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+                final DataOutput out = output.getDataOutput();
+                final ArrayBackedValueStorage recordBuffer = new ArrayBackedValueStorage();
+                final IEvaluator recEvaluator = recordEvalFactory.createEvaluator(recordBuffer);
+
+                return new IEvaluator() {
+                    final ARecordAccessor recAccessor = new ARecordAccessor(inputType);
+                    final ARecordCaster caster = new ARecordCaster();
+
+                    @Override
+                    public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+                        try {
+                            recordBuffer.reset();
+                            recEvaluator.evaluate(tuple);
+                            recAccessor.reset(recordBuffer.getBytes(), recordBuffer.getStartIndex(),
+                                    recordBuffer.getLength());
+                            caster.castRecord(recAccessor, reqType, out);
+                        } catch (IOException ioe) {
+                            throw new AlgebricksException(ioe);
+                        }
+                    }
+                };
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
index 467cd03..dd984d9 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
@@ -32,6 +32,7 @@
 import edu.uci.ics.asterix.om.functions.FunctionManagerHolder;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionManager;
+import edu.uci.ics.asterix.om.typecomputer.base.TypeComputerUtilities;
 import edu.uci.ics.asterix.om.types.AOrderedListType;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
@@ -76,6 +77,7 @@
 import edu.uci.ics.asterix.runtime.evaluators.constructors.ATimeConstructorDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.AndDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.AnyCollectionMemberDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.CastRecordDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.ClosedRecordConstructorDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.ContainsDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.CountHashedGramTokensDescriptor;
@@ -311,6 +313,7 @@
         temp.add(new SwitchCaseDescriptor());
         temp.add(new RegExpDescriptor());
         temp.add(new InjectFailureDescriptor());
+        temp.add(new CastRecordDescriptor());
 
         IFunctionManager mgr = new FunctionManagerImpl();
         for (IFunctionDescriptor fd : temp) {
@@ -362,8 +365,8 @@
                 } catch (HyracksDataException e) {
                     throw new AlgebricksException(e);
                 }
-                IEvaluatorFactory fldIndexEvalFactory = new ConstantEvalFactory(Arrays.copyOf(abvs.getBytes(), abvs
-                        .getLength()));
+                IEvaluatorFactory fldIndexEvalFactory = new ConstantEvalFactory(Arrays.copyOf(abvs.getBytes(),
+                        abvs.getLength()));
                 IEvaluatorFactory evalFactory = new FieldAccessByIndexEvalFactory(recordEvalFactory,
                         fldIndexEvalFactory, recType);
                 return evalFactory;
@@ -388,8 +391,8 @@
         } catch (HyracksDataException e) {
             throw new AlgebricksException(e);
         }
-        IEvaluatorFactory dimensionEvalFactory = new ConstantEvalFactory(Arrays.copyOf(abvs1.getBytes(), abvs1
-                .getLength()));
+        IEvaluatorFactory dimensionEvalFactory = new ConstantEvalFactory(Arrays.copyOf(abvs1.getBytes(),
+                abvs1.getLength()));
 
         for (int i = 0; i < numOfFields; i++) {
             ArrayBackedValueStorage abvs2 = new ArrayBackedValueStorage();
@@ -400,8 +403,8 @@
             } catch (HyracksDataException e) {
                 throw new AlgebricksException(e);
             }
-            IEvaluatorFactory coordinateEvalFactory = new ConstantEvalFactory(Arrays.copyOf(abvs2.getBytes(), abvs2
-                    .getLength()));
+            IEvaluatorFactory coordinateEvalFactory = new ConstantEvalFactory(Arrays.copyOf(abvs2.getBytes(),
+                    abvs2.getLength()));
 
             evalFactories[i] = new CreateMBREvalFactory(evalFactory, dimensionEvalFactory, coordinateEvalFactory);
         }
@@ -427,8 +430,8 @@
                 } catch (HyracksDataException e) {
                     throw new AlgebricksException(e);
                 }
-                IEvaluatorFactory fldIndexEvalFactory = new ConstantEvalFactory(Arrays.copyOf(abvs.getBytes(), abvs
-                        .getLength()));
+                IEvaluatorFactory fldIndexEvalFactory = new ConstantEvalFactory(Arrays.copyOf(abvs.getBytes(),
+                        abvs.getLength()));
                 IEvaluatorFactory evalFactory = new FieldAccessByIndexEvalFactory(recordEvalFactory,
                         fldIndexEvalFactory, recType);
                 IFunctionInfo finfoAccess = AsterixBuiltinFunctions
@@ -469,10 +472,15 @@
                 ((ListifyAggregateDescriptor) fd).reset(new AOrderedListType(itemType, null));
             }
         }
+        if (fd.getIdentifier().equals(AsterixBuiltinFunctions.CAST_RECORD)) {
+            ARecordType rt = (ARecordType) TypeComputerUtilities.getRequiredType((AbstractFunctionCallExpression) expr);
+            ARecordType it = (ARecordType) TypeComputerUtilities.getInputType((AbstractFunctionCallExpression) expr);
+            ((CastRecordDescriptor) fd).reset(rt, it);
+        }
         if (fd.getIdentifier().equals(AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR)) {
             ARecordType rt = (ARecordType) context.getType(expr);
-            ((OpenRecordConstructorDescriptor) fd).reset(rt, computeOpenFields((AbstractFunctionCallExpression) expr,
-                    rt));
+            ((OpenRecordConstructorDescriptor) fd).reset(rt,
+                    computeOpenFields((AbstractFunctionCallExpression) expr, rt));
         }
         if (fd.getIdentifier().equals(AsterixBuiltinFunctions.CLOSED_RECORD_CONSTRUCTOR)) {
             ((ClosedRecordConstructorDescriptor) fd).reset((ARecordType) context.getType(expr));
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ARecordAccessor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ARecordAccessor.java
new file mode 100644
index 0000000..f02ecee
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ARecordAccessor.java
@@ -0,0 +1,270 @@
+package edu.uci.ics.asterix.runtime.util;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.asterix.dataflow.data.nontagged.AqlNullWriterFactory;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AUnionType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IValueReference;
+
+public class ARecordAccessor implements IValueReference {
+
+    private List<SimpleValueReference> fieldNames = new ArrayList<SimpleValueReference>();
+    private List<SimpleValueReference> fieldTypeTags = new ArrayList<SimpleValueReference>();
+    private List<SimpleValueReference> fieldValues = new ArrayList<SimpleValueReference>();
+
+    private byte[] typeBuffer = new byte[32768];
+    private ResetableByteArrayOutputStream typeBos = new ResetableByteArrayOutputStream();
+    private DataOutputStream typeDos = new DataOutputStream(typeBos);
+
+    private byte[] dataBuffer = new byte[32768];
+    private ResetableByteArrayOutputStream dataBos = new ResetableByteArrayOutputStream();
+    private DataOutputStream dataDos = new DataOutputStream(dataBos);
+
+    private int closedPartTypeInfoSize = 0;
+    private ARecordType inputRecType;
+
+    private int numberOfSchemaFields;
+    private int offsetArrayOffset;
+    private int[] fieldOffsets;
+    private int fieldCursor = -1;
+    private ATypeTag typeTag = ATypeTag.ANY;
+    private SimpleValueReference nullReference = new SimpleValueReference();
+
+    private byte[] data;
+    private int start;
+    private int len;
+
+    public ARecordAccessor(ARecordType inputType) {
+        this.inputRecType = inputType;
+        IAType[] fieldTypes = inputType.getFieldTypes();
+        String[] fieldNameStrs = inputType.getFieldNames();
+        numberOfSchemaFields = fieldTypes.length;
+
+        // initialize the buffer for closed parts(fieldName bytes+ type bytes) +
+        // constant(null bytes)
+        typeBos.setByteArray(typeBuffer, 0);
+        try {
+            for (int i = 0; i < numberOfSchemaFields; i++) {
+                ATypeTag ftypeTag = fieldTypes[i].getTypeTag();
+
+                // add type tag Reference
+                int tagStart = typeBos.size();
+                typeDos.writeByte(ftypeTag.serialize());
+                int tagEnd = typeBos.size();
+                SimpleValueReference typeTagReference = new SimpleValueReference();
+                typeTagReference.reset(typeBuffer, tagStart, tagEnd - tagStart);
+                fieldTypeTags.add(typeTagReference);
+
+                // add type name Reference (including a astring type tag)
+                int nameStart = typeBos.size();
+                typeDos.writeByte(ATypeTag.STRING.serialize());
+                typeDos.writeUTF(fieldNameStrs[i]);
+                int nameEnd = typeBos.size();
+                SimpleValueReference typeNameReference = new SimpleValueReference();
+                typeNameReference.reset(typeBuffer, nameStart, nameEnd - nameStart);
+                fieldNames.add(typeNameReference);
+            }
+
+            // initialize a constant: null value bytes reference
+            int nullFieldStart = typeBos.size();
+            INullWriter nullWriter = AqlNullWriterFactory.INSTANCE.createNullWriter();
+            nullWriter.writeNull(typeDos);
+            int nullFieldEnd = typeBos.size();
+            nullReference.reset(typeBuffer, nullFieldStart, nullFieldEnd - nullFieldStart);
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+        closedPartTypeInfoSize = typeBos.size();
+        fieldOffsets = new int[numberOfSchemaFields];
+    }
+
+    private void reset() {
+        typeBos.setByteArray(typeBuffer, closedPartTypeInfoSize);
+        dataBos.setByteArray(dataBuffer, 0);
+        fieldCursor = -1;
+    }
+
+    public void reset(byte[] b, int start, int len) {
+        // clear the previous states
+        reset();
+        this.data = b;
+        this.start = start;
+        this.len = len;
+
+        boolean isExpanded = false;
+        int openPartOffset = 0;
+        int s = start;
+        int recordOffset = s;
+        if (inputRecType == null) {
+            openPartOffset = s + AInt32SerializerDeserializer.getInt(b, s + 6);
+            s += 8;
+            isExpanded = true;
+        } else {
+            if (inputRecType.isOpen()) {
+                isExpanded = b[s + 5] == 1 ? true : false;
+                if (isExpanded) {
+                    openPartOffset = s + AInt32SerializerDeserializer.getInt(b, s + 6);
+                    s += 10;
+                } else
+                    s += 6;
+            } else
+                s += 5;
+        }
+        try {
+            if (numberOfSchemaFields > 0) {
+                s += 4;
+                int nullBitMapOffset = 0;
+                boolean hasNullableFields = NonTaggedFormatUtil.hasNullableField(inputRecType);
+                if (hasNullableFields) {
+                    nullBitMapOffset = s;
+                    offsetArrayOffset = s
+                            + (this.numberOfSchemaFields % 8 == 0 ? numberOfSchemaFields / 8
+                                    : numberOfSchemaFields / 8 + 1);
+                } else {
+                    offsetArrayOffset = s;
+                }
+                for (int i = 0; i < numberOfSchemaFields; i++) {
+                    fieldOffsets[i] = AInt32SerializerDeserializer.getInt(b, offsetArrayOffset) + recordOffset;
+                    offsetArrayOffset += 4;
+                }
+                for (int fieldNumber = 0; fieldNumber < numberOfSchemaFields; fieldNumber++) {
+                    next();
+                    if (hasNullableFields) {
+                        byte b1 = b[nullBitMapOffset + fieldNumber / 8];
+                        int p = 1 << (7 - (fieldNumber % 8));
+                        if ((b1 & p) == 0) {
+                            // set null value (including type tag inside)
+                            nextFieldValue().reset(nullReference);
+                            continue;
+                        }
+                    }
+                    IAType[] fieldTypes = inputRecType.getFieldTypes();
+                    ATypeTag tag = ATypeTag.ANY;
+                    int fieldValueLength = 0;
+
+                    if (fieldTypes[fieldNumber].getTypeTag() == ATypeTag.UNION) {
+                        if (NonTaggedFormatUtil.isOptionalField((AUnionType) fieldTypes[fieldNumber])) {
+                            tag = ((AUnionType) fieldTypes[fieldNumber]).getUnionList()
+                                    .get(NonTaggedFormatUtil.OPTIONAL_TYPE_INDEX_IN_UNION_LIST).getTypeTag();
+                            fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(b, fieldOffsets[fieldNumber],
+                                    tag, false);
+                        }
+                    } else {
+                        tag = fieldTypes[fieldNumber].getTypeTag();
+                        fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(b, fieldOffsets[fieldNumber], tag,
+                                false);
+                    }
+                    // set field value (including the type tag)
+                    int fstart = dataBos.size();
+                    dataDos.writeByte(tag.serialize());
+                    dataDos.write(b, fieldOffsets[fieldNumber], fieldValueLength);
+                    int fend = dataBos.size();
+                    nextFieldValue().reset(dataBuffer, fstart, fend - fstart);
+                }
+            }
+            if (isExpanded) {
+                int numberOfOpenFields = AInt32SerializerDeserializer.getInt(b, openPartOffset);
+                int fieldOffset = openPartOffset + 4 + (8 * numberOfOpenFields);
+                for (int i = 0; i < numberOfOpenFields; i++) {
+                    next();
+                    // set the field name (including a type tag, which is
+                    // astring)
+                    int fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(b, fieldOffset, ATypeTag.STRING,
+                            false);
+                    int fnstart = dataBos.size();
+                    dataDos.writeByte(ATypeTag.STRING.serialize());
+                    dataDos.write(b, fieldOffset, fieldValueLength);
+                    int fnend = dataBos.size();
+                    nextFieldName().reset(dataBuffer, fnstart, fnend - fnstart);
+                    fieldOffset += fieldValueLength;
+
+                    // set the field type tag
+                    nextFieldType().reset(b, fieldOffset, 1);
+                    typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b[fieldOffset]);
+
+                    // set the field value (already including type tag)
+                    fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(b, fieldOffset, typeTag, true);
+                    nextFieldValue().reset(b, fieldOffset, fieldValueLength);
+                    fieldOffset += fieldValueLength;
+                }
+            }
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    private void next() {
+        fieldCursor++;
+    }
+
+    private SimpleValueReference nextFieldName() {
+        if (fieldCursor < fieldNames.size()) {
+            return fieldNames.get(fieldCursor);
+        } else {
+            SimpleValueReference fieldNameReference = new SimpleValueReference();
+            fieldNames.add(fieldNameReference);
+            return fieldNameReference;
+        }
+    }
+
+    private SimpleValueReference nextFieldType() {
+        if (fieldCursor < fieldTypeTags.size()) {
+            return fieldTypeTags.get(fieldCursor);
+        } else {
+            SimpleValueReference fieldTypeReference = new SimpleValueReference();
+            fieldTypeTags.add(fieldTypeReference);
+            return fieldTypeReference;
+        }
+    }
+
+    private SimpleValueReference nextFieldValue() {
+        if (fieldCursor < fieldValues.size()) {
+            return fieldValues.get(fieldCursor);
+        } else {
+            SimpleValueReference fieldValueReference = new SimpleValueReference();
+            fieldValues.add(fieldValueReference);
+            return fieldValueReference;
+        }
+    }
+
+    public int getCursor() {
+        return fieldCursor;
+    }
+
+    public List<SimpleValueReference> getFieldNames() {
+        return fieldNames;
+    }
+
+    public List<SimpleValueReference> getFieldTypeTags() {
+        return fieldTypeTags;
+    }
+
+    public List<SimpleValueReference> getFieldValues() {
+        return fieldValues;
+    }
+
+    @Override
+    public byte[] getBytes() {
+        return data;
+    }
+
+    @Override
+    public int getStartIndex() {
+        return start;
+    }
+
+    @Override
+    public int getLength() {
+        return len;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ARecordCaster.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ARecordCaster.java
new file mode 100644
index 0000000..08d6876
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ARecordCaster.java
@@ -0,0 +1,188 @@
+package edu.uci.ics.asterix.runtime.util;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.asterix.builders.RecordBuilder;
+import edu.uci.ics.asterix.dataflow.data.nontagged.AqlNullWriterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AUnionType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+
+public class ARecordCaster {
+
+    // describe closed fields in the required type
+    private int[] fieldPermutation;
+
+    // describe fields (open or not) in the input records
+    private boolean[] openFields;
+
+    private List<SimpleValueReference> reqFieldNames = new ArrayList<SimpleValueReference>();
+    private List<SimpleValueReference> reqFieldTypeTags = new ArrayList<SimpleValueReference>();
+    private ARecordType cachedReqType = null;
+
+    private byte[] buffer = new byte[32768];
+    private ResetableByteArrayOutputStream bos = new ResetableByteArrayOutputStream();
+    private DataOutputStream dos = new DataOutputStream(bos);
+
+    private RecordBuilder recBuilder = new RecordBuilder();
+    private SimpleValueReference nullReference = new SimpleValueReference();
+
+    public ARecordCaster() {
+        try {
+            bos.setByteArray(buffer, 0);
+            int start = bos.size();
+            INullWriter nullWriter = AqlNullWriterFactory.INSTANCE.createNullWriter();
+            nullWriter.writeNull(dos);
+            int end = bos.size();
+            nullReference.reset(buffer, start, end - start);
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public void castRecord(ARecordAccessor recordAccessor, ARecordType reqType, DataOutput output) throws IOException {
+        List<SimpleValueReference> fieldNames = recordAccessor.getFieldNames();
+        List<SimpleValueReference> fieldTypeTags = recordAccessor.getFieldTypeTags();
+        List<SimpleValueReference> fieldValues = recordAccessor.getFieldValues();
+
+        if (openFields == null || fieldNames.size() > openFields.length) {
+            openFields = new boolean[fieldNames.size()];
+        }
+        if (cachedReqType == null || !reqType.equals(cachedReqType)) {
+            loadRequiredType(reqType);
+        }
+
+        // clear the previous states
+        reset();
+        matchClosedPart(fieldNames, fieldTypeTags, fieldValues);
+        writeOutput(fieldNames, fieldTypeTags, fieldValues, output);
+    }
+
+    private void reset() {
+        for (int i = 0; i < openFields.length; i++)
+            openFields[i] = true;
+        for (int i = 0; i < fieldPermutation.length; i++)
+            fieldPermutation[i] = -1;
+    }
+
+    private void loadRequiredType(ARecordType reqType) throws IOException {
+        reqFieldNames.clear();
+        reqFieldTypeTags.clear();
+
+        cachedReqType = reqType;
+        int numSchemaFields = reqType.getFieldTypes().length;
+        IAType[] fieldTypes = reqType.getFieldTypes();
+        String[] fieldNames = reqType.getFieldNames();
+        fieldPermutation = new int[numSchemaFields];
+
+        bos.setByteArray(buffer, nullReference.getStartIndex() + nullReference.getLength());
+        for (int i = 0; i < numSchemaFields; i++) {
+            ATypeTag ftypeTag = fieldTypes[i].getTypeTag();
+            String fname = fieldNames[i];
+
+            // add type tag pointable
+            int tagStart = bos.size();
+            dos.writeByte(ftypeTag.serialize());
+            int tagEnd = bos.size();
+            SimpleValueReference typeTagPointable = new SimpleValueReference();
+            typeTagPointable.reset(buffer, tagStart, tagEnd - tagStart);
+            reqFieldTypeTags.add(typeTagPointable);
+
+            // add type name pointable (including a string type tag)
+            int nameStart = bos.size();
+            dos.write(ATypeTag.STRING.serialize());
+            dos.writeUTF(fname);
+            int nameEnd = bos.size();
+            SimpleValueReference typeNamePointable = new SimpleValueReference();
+            typeNamePointable.reset(buffer, nameStart, nameEnd - nameStart);
+            reqFieldNames.add(typeNamePointable);
+        }
+    }
+
+    private void matchClosedPart(List<SimpleValueReference> fieldNames, List<SimpleValueReference> fieldTypeTags,
+            List<SimpleValueReference> fieldValues) {
+        // forward match: match from actual to required
+        boolean matched = false;
+        for (int i = 0; i < fieldNames.size(); i++) {
+            SimpleValueReference fieldName = fieldNames.get(i);
+            SimpleValueReference fieldTypeTag = fieldTypeTags.get(i);
+            matched = false;
+            for (int j = 0; j < reqFieldNames.size(); j++) {
+                SimpleValueReference reqFieldName = reqFieldNames.get(j);
+                SimpleValueReference reqFieldTypeTag = reqFieldTypeTags.get(j);
+                if (fieldName.equals(reqFieldName) && fieldTypeTag.equals(reqFieldTypeTag)) {
+                    fieldPermutation[j] = i;
+                    openFields[i] = false;
+                    matched = true;
+                    break;
+                }
+            }
+            if (matched)
+                continue;
+            // the input has extra fields
+            if (!cachedReqType.isOpen())
+                throw new IllegalStateException("type mismatch: including extra closed fields");
+        }
+
+        // backward match: match from required to actual
+        for (int i = 0; i < reqFieldNames.size(); i++) {
+            SimpleValueReference reqFieldName = reqFieldNames.get(i);
+            SimpleValueReference reqFieldTypeTag = reqFieldTypeTags.get(i);
+            matched = false;
+            for (int j = 0; j < fieldNames.size(); j++) {
+                SimpleValueReference fieldName = fieldNames.get(j);
+                SimpleValueReference fieldTypeTag = fieldTypeTags.get(j);
+                if (fieldName.equals(reqFieldName) && fieldTypeTag.equals(reqFieldTypeTag)) {
+                    matched = true;
+                    break;
+                }
+            }
+            if (matched)
+                continue;
+
+            IAType t = cachedReqType.getFieldTypes()[i];
+            if (t.getTypeTag() == ATypeTag.UNION && NonTaggedFormatUtil.isOptionalField((AUnionType) t)) {
+                // add a null field into the end of field name list and type
+                // list
+                fieldNames.add(reqFieldName);
+                fieldTypeTags.add(reqFieldTypeTag);
+                fieldValues.add(nullReference);
+                fieldPermutation[i] = fieldNames.size() - 1;
+            } else {
+                // no matched field in the input for a required closed field
+                throw new IllegalStateException("type mismatch: miss a required closed field");
+            }
+        }
+    }
+
+    private void writeOutput(List<SimpleValueReference> fieldNames, List<SimpleValueReference> fieldTypeTags,
+            List<SimpleValueReference> fieldValues, DataOutput output) throws IOException {
+        // reset the states of the record builder
+        recBuilder.reset(cachedReqType);
+        recBuilder.init();
+
+        // write the closed part
+        for (int i = 0; i < fieldPermutation.length; i++) {
+            int pos = fieldPermutation[i];
+            SimpleValueReference field = fieldValues.get(pos);
+            recBuilder.addField(i, field);
+        }
+
+        // write the open part
+        for (int i = 0; i < openFields.length; i++) {
+            if (openFields[i]) {
+                SimpleValueReference name = fieldNames.get(i);
+                SimpleValueReference field = fieldValues.get(i);
+                recBuilder.addField(name, field);
+            }
+        }
+        recBuilder.write(output, true);
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ResetableByteArrayInputStream.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ResetableByteArrayInputStream.java
new file mode 100644
index 0000000..7f8f0e9
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ResetableByteArrayInputStream.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.util;
+
+import java.io.ByteArrayInputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ResetableByteArrayInputStream extends ByteArrayInputStream {
+    private static final Logger LOGGER = Logger.getLogger(ResetableByteArrayInputStream.class.getName());
+
+    private byte[] data;
+    private int position;
+
+    public ResetableByteArrayInputStream(byte[] data) {
+        super(data);
+    }
+
+    public void setByteArray(byte[] data, int position) {
+        this.data = data;
+        this.position = position;
+    }
+
+    @Override
+    public int read() {
+        int remaining = data.length - position;
+        int value = remaining > 0 ? (data[position++] & 0xff) : -1;
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("read(): value: " + value + " remaining: " + remaining + " position: " + position);
+        }
+        return value;
+    }
+
+    @Override
+    public int read(byte[] bytes, int offset, int length) {
+        int remaining = data.length - position;
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("read(bytes[], int, int): remaining: " + remaining + " offset: " + offset + " length: "
+                    + length + " position: " + position);
+        }
+        if (remaining == 0) {
+            return -1;
+        }
+        int l = Math.min(length, remaining);
+        System.arraycopy(data, position, bytes, offset, l);
+        position += l;
+        return l;
+    }
+}
\ No newline at end of file
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ResetableByteArrayOutputStream.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ResetableByteArrayOutputStream.java
new file mode 100644
index 0000000..2958314
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ResetableByteArrayOutputStream.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.util;
+
+import java.io.ByteArrayOutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ResetableByteArrayOutputStream extends ByteArrayOutputStream {
+    private static final Logger LOGGER = Logger.getLogger(ResetableByteArrayOutputStream.class.getName());
+
+    private byte[] data;
+    private int position;
+
+    public ResetableByteArrayOutputStream() {
+    }
+
+    public void setByteArray(byte[] data, int position) {
+        this.data = data;
+        this.position = position;
+    }
+
+    @Override
+    public void write(int b) {
+        int remaining = data.length - position;
+        if (position + 1 > data.length - 1)
+            throw new IndexOutOfBoundsException();
+        data[position] = (byte) b;
+        position++;
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("write(): value: " + b + " remaining: " + remaining + " position: " + position);
+        }
+    }
+
+    @Override
+    public void write(byte[] bytes, int offset, int length) {
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("write(bytes[], int, int) offset: " + offset + " length: " + length + " position: "
+                    + position);
+        }
+        if (position + length > data.length - 1)
+            throw new IndexOutOfBoundsException();
+        System.arraycopy(bytes, offset, data, position, length);
+        position += length;
+    }
+    
+    @Override
+    public int size(){
+        return position;
+    }
+}
\ No newline at end of file
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/SimpleValueReference.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/SimpleValueReference.java
new file mode 100644
index 0000000..bb54624
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/SimpleValueReference.java
@@ -0,0 +1,55 @@
+package edu.uci.ics.asterix.runtime.util;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IValueReference;
+
+public class SimpleValueReference implements IValueReference {
+
+    private byte[] data;
+    private int start;
+    private int len;
+
+    public void reset(byte[] data, int start, int len) {
+        this.data = data;
+        this.start = start;
+        this.len = len;
+    }
+
+    public void reset(IValueReference ivf) {
+        this.data = ivf.getBytes();
+        this.start = ivf.getStartIndex();
+        this.len = ivf.getLength();
+    }
+
+    @Override
+    public byte[] getBytes() {
+        return data;
+    }
+
+    @Override
+    public int getStartIndex() {
+        return start;
+    }
+
+    @Override
+    public int getLength() {
+        return len;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof IValueReference))
+            return false;
+        IValueReference ivf = (IValueReference) o;
+        byte[] odata = ivf.getBytes();
+        int ostart = ivf.getStartIndex();
+        int olen = ivf.getLength();
+
+        if (len != olen)
+            return false;
+        for (int i = 0; i < len; i++) {
+            if (data[start + i] != odata[ostart + i])
+                return false;
+        }
+        return true;
+    }
+}