preliminary changes for dynamic type cast and tests
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_opentype@257 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
index aa22be5..2824d27 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
@@ -15,6 +15,7 @@
import edu.uci.ics.asterix.optimizer.rules.IfElseToSwitchCaseFunctionRule;
import edu.uci.ics.asterix.optimizer.rules.InlineAssignIntoAggregateRule;
import edu.uci.ics.asterix.optimizer.rules.IntroduceBTreeIndexSearchRule;
+import edu.uci.ics.asterix.optimizer.rules.IntroduceCastRecordRule;
import edu.uci.ics.asterix.optimizer.rules.IntroduceRTreeIndexSearchRule;
import edu.uci.ics.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule;
import edu.uci.ics.asterix.optimizer.rules.LoadRecordFieldsRule;
@@ -82,6 +83,7 @@
normalization.add(new ExtractDistinctByExpressionsRule());
normalization.add(new ExtractOrderExpressionsRule());
normalization.add(new TopDownTypeInferenceRule());
+ normalization.add(new IntroduceCastRecordRule());
normalization.add(new ConstantFoldingRule());
normalization.add(new UnnestToDataScanRule());
normalization.add(new IfElseToSwitchCaseFunctionRule());
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ConstantFoldingRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ConstantFoldingRule.java
index faae5eb..36783ce 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ConstantFoldingRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ConstantFoldingRule.java
@@ -147,7 +147,8 @@
return new Pair<Boolean, ILogicalExpression>(changed, expr);
}
// TODO: currently ARecord is always a closed record
- if (expr.getFunctionIdentifier().equals(AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR)) {
+ if (expr.getFunctionIdentifier().equals(AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR)
+ || expr.getFunctionIdentifier().equals(AsterixBuiltinFunctions.CAST_RECORD)) {
return new Pair<Boolean, ILogicalExpression>(false, null);
}
if (expr.getFunctionIdentifier().equals(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME)) {
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceCastRecordRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceCastRecordRule.java
new file mode 100644
index 0000000..a63451a
--- /dev/null
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceCastRecordRule.java
@@ -0,0 +1,116 @@
+package edu.uci.ics.asterix.optimizer.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.asterix.aql.util.FunctionUtils;
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class IntroduceCastRecordRule implements IAlgebraicRewriteRule {
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ /**
+ * pattern match: sink insert assign
+ *
+ * resulting plan: sink-insert-project-assign
+ *
+ */
+ AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
+ if (op1.getOperatorTag() != LogicalOperatorTag.SINK)
+ return false;
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
+ if (op2.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE)
+ return false;
+ AbstractLogicalOperator op3 = (AbstractLogicalOperator) op2.getInputs().get(0).getValue();
+ if (op3.getOperatorTag() != LogicalOperatorTag.ASSIGN)
+ return false;
+
+ InsertDeleteOperator insertDeleteOperator = (InsertDeleteOperator) op2;
+ AssignOperator oldAssignOperator = (AssignOperator) op3;
+
+ AqlDataSource dataSource = (AqlDataSource) insertDeleteOperator.getDataSource();
+ IAType[] schemaTypes = (IAType[]) dataSource.getSchemaTypes();
+ ARecordType requiredRecordType = (ARecordType) schemaTypes[schemaTypes.length - 1];
+
+ List<LogicalVariable> usedVariables = new ArrayList<LogicalVariable>();
+ VariableUtilities.getUsedVariables(oldAssignOperator, usedVariables);
+ LogicalVariable inputRecordVar = usedVariables.get(0);
+ IVariableTypeEnvironment env = oldAssignOperator.computeInputTypeEnvironment(context);
+ ARecordType inputRecordType = (ARecordType) env.getVarType(inputRecordVar);
+
+ boolean needCast = needCast(requiredRecordType, inputRecordType);
+ if (needCast) {
+ // insert
+ // project
+ // assign
+ // assign
+ AbstractFunctionCallExpression cast = new ScalarFunctionCallExpression(
+ FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.CAST_RECORD));
+ ARecordType[] types = new ARecordType[2];
+ types[0] = requiredRecordType;
+ types[1] = inputRecordType;
+ cast.getArguments().add(
+ new MutableObject<ILogicalExpression>(new VariableReferenceExpression(inputRecordVar)));
+ cast.setOpaqueParameters(types);
+ LogicalVariable newAssignVar = context.newVar();
+ AssignOperator newAssignOperator = new AssignOperator(newAssignVar, new MutableObject<ILogicalExpression>(
+ cast));
+ newAssignOperator.getInputs().add(new MutableObject<ILogicalOperator>(op3));
+
+ List<LogicalVariable> projectVariables = new ArrayList<LogicalVariable>();
+ VariableUtilities.getProducedVariables(oldAssignOperator, projectVariables);
+ projectVariables.add(newAssignVar);
+ ProjectOperator projectOperator = new ProjectOperator(projectVariables);
+ projectOperator.getInputs().add(new MutableObject<ILogicalOperator>(newAssignOperator));
+
+ ILogicalExpression payloadExpr = new VariableReferenceExpression(newAssignVar);
+ MutableObject<ILogicalExpression> payloadRef = new MutableObject<ILogicalExpression>(payloadExpr);
+ InsertDeleteOperator newInserDeleteOperator = new InsertDeleteOperator(
+ insertDeleteOperator.getDataSource(), payloadRef, insertDeleteOperator.getPrimaryKeyExpressions(),
+ insertDeleteOperator.getOperation());
+ newInserDeleteOperator.getInputs().add(new MutableObject<ILogicalOperator>(projectOperator));
+ insertDeleteOperator.getInputs().clear();
+ op1.getInputs().get(0).setValue(newInserDeleteOperator);
+ return true;
+ }
+ return false;
+
+ }
+
+ private boolean needCast(ARecordType reqType, ARecordType inputType) {
+ if (!reqType.equals(inputType))
+ return true;
+ else
+ return false;
+ }
+
+}
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/TopDownTypeInferenceRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/TopDownTypeInferenceRule.java
index 958f5f6..4ff2e65 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/TopDownTypeInferenceRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/TopDownTypeInferenceRule.java
@@ -36,8 +36,8 @@
public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
/**
- * pattern match: sink/insert/assign
- * record type is propagated from insert data source to the record-constructor expression
+ * pattern match: sink/insert/assign record type is propagated from
+ * insert data source to the record-constructor expression
*/
AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
if (op1.getOperatorTag() != LogicalOperatorTag.SINK)
@@ -93,7 +93,8 @@
ILogicalExpression expr = expressionPointers.get(position).getValue();
if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
ScalarFunctionCallExpression funcExpr = (ScalarFunctionCallExpression) expr;
- changed = TypeComputerUtilities.setRequiredType(funcExpr, requiredRecordType);
+ changed = TypeComputerUtilities.setRequiredAndInputTypes(funcExpr, requiredRecordType,
+ inputRecordType);
List<Mutable<ILogicalExpression>> args = funcExpr.getArguments();
int openPartStart = requiredRecordType.getFieldTypes().length * 2;
for (int j = openPartStart; j < args.size(); j++) {
@@ -104,6 +105,7 @@
}
}
}
+ context.computeAndSetTypeEnvironmentForOperator(originalAssign);
}
}
if (currentOperator.getInputs().size() > 0)
diff --git a/asterix-app/src/test/resources/logging.properties b/asterix-app/src/test/resources/logging.properties
index deb88307..4cf5545 100644
--- a/asterix-app/src/test/resources/logging.properties
+++ b/asterix-app/src/test/resources/logging.properties
@@ -61,5 +61,5 @@
# messages:
#edu.uci.ics.asterix.level = FINE
-#edu.uci.ics.algebricks.level = FINE
-#edu.uci.ics.hyracks.level = INFO
+edu.uci.ics.algebricks.level = FINE
+edu.uci.ics.hyracks.level = INFO
diff --git a/asterix-app/src/test/resources/runtimets/ignore.txt b/asterix-app/src/test/resources/runtimets/ignore.txt
index 0964e88..b60aa97 100644
--- a/asterix-app/src/test/resources/runtimets/ignore.txt
+++ b/asterix-app/src/test/resources/runtimets/ignore.txt
@@ -17,4 +17,4 @@
open-closed
dml/insert-into-empty-dataset-with-index_02.aql
dml/insert-into-empty-dataset-with-index_01.aql
-dml/load-from-hdfs.aql
+dml/load-from-hdfs.aql
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/dml/opentype-c2o.aql b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-c2o.aql
new file mode 100644
index 0000000..13bfa77
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-c2o.aql
@@ -0,0 +1,44 @@
+drop dataverse testdv2 if exists;
+create dataverse testdv2;
+use dataverse testdv2;
+
+create type testtype as closed {
+ id: string,
+ name: string,
+ hobby: string
+}
+
+create type testtype2 as open {
+ id: string
+}
+
+create dataset testds(testtype) partitioned by key id;
+
+create dataset testds2(testtype2) partitioned by key id;
+
+insert into dataset testds (
+{ "id": "001", "name": "Person Three", "hobby": "music"}
+);
+
+insert into dataset testds (
+{ "id": "002", "name": "Person One", "hobby": "sports"}
+);
+
+insert into dataset testds (
+{ "id": "003", "name": "Person Two", "hobby": "movie"}
+);
+
+insert into dataset testds (
+{ "id": "004", "name": "Person Four", "hobby": "swimming"}
+);
+
+insert into dataset testds2 (
+ for $d in dataset("testds")
+ return $d
+);
+
+write output to nc1:"rttest/dml_opentype-c2o.adm";
+
+for $d in dataset("testds2")
+order by $d.id
+return $d
diff --git a/asterix-app/src/test/resources/runtimets/queries/dml/opentype-insert.aql b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-insert.aql
index 0c71f54..19dd1d6 100644
--- a/asterix-app/src/test/resources/runtimets/queries/dml/opentype-insert.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-insert.aql
@@ -1,3 +1,4 @@
+drop dataverse testdv2 if exists;
create dataverse testdv2;
use dataverse testdv2;
diff --git a/asterix-app/src/test/resources/runtimets/queries/dml/opentype-o2c.aql b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-o2c.aql
new file mode 100644
index 0000000..2657718
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-o2c.aql
@@ -0,0 +1,45 @@
+drop dataverse testdv2 if exists;
+create dataverse testdv2;
+use dataverse testdv2;
+
+create type testtype as open {
+ id: string,
+ name: string
+}
+
+create type testtype2 as closed {
+ id: string,
+ name: string,
+ hobby: string
+}
+
+create dataset testds(testtype) partitioned by key id;
+
+create dataset testds2(testtype2) partitioned by key id;
+
+insert into dataset testds (
+{ "id": "001", "name": "Person Three", "hobby": "music"}
+);
+
+insert into dataset testds (
+{ "id": "002", "name": "Person Three", "hobby": "football"}
+);
+
+insert into dataset testds (
+{ "id": "003", "name": "Person Three", "hobby": "movie"}
+);
+
+insert into dataset testds (
+{ "id": "004", "name": "Person Three", "hobby": "swimming"}
+);
+
+insert into dataset testds2 (
+ for $d in dataset("testds")
+ return $d
+);
+
+write output to nc1:"rttest/dml_opentype-o2c.adm";
+
+for $d in dataset("testds")
+order by $d.id
+return $d
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-closed/heterog-list01.aql b/asterix-app/src/test/resources/runtimets/queries/open-closed/heterog-list01.aql
index c54d020..d1edc3d 100644
--- a/asterix-app/src/test/resources/runtimets/queries/open-closed/heterog-list01.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/open-closed/heterog-list01.aql
@@ -15,7 +15,7 @@
descrpt:string
}
-create type TestType as {
+create type TestType as closed {
id:int32,
description:string,
name:string,
@@ -30,3 +30,9 @@
"name":"Cake",
"batters":[{"id":345,"descprt":"Regular"},{"id":445,"descprt":"Chocolate"}] }
);
+
+write output to nc1:"rttest/open-closed_heteror-list01.adm";
+
+for $d in dataset("T1")
+return $d
+
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-closed/open-closed-01.aql b/asterix-app/src/test/resources/runtimets/queries/open-closed/open-closed-01.aql
index 0b6d0dd..6373569 100644
--- a/asterix-app/src/test/resources/runtimets/queries/open-closed/open-closed-01.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/open-closed/open-closed-01.aql
@@ -18,7 +18,9 @@
create dataset testds(testType) partitioned by key id;
+
insert into dataset testds({"id": 123, "name": "John Doe", "hobbies": {{ "scuba", "music" }} }
);
+write output to nc1:"rttest/open-closed_open-closed-01.adm";
for $l in dataset("testds") return $l
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-closed/open-closed-02.aql b/asterix-app/src/test/resources/runtimets/queries/open-closed/open-closed-02.aql
index 6963bee..c037cc8 100644
--- a/asterix-app/src/test/resources/runtimets/queries/open-closed/open-closed-02.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/open-closed/open-closed-02.aql
@@ -19,6 +19,7 @@
create dataset testds(testType) partitioned by key id;
-insert into dataset testds({"id": 123, "name": "John Doe", "hobbies": {{ "scuba", "music" }} });
+insert into dataset testds({"id": 123);
+write output to nc1:"rttest/open-closed_open-closed-02.adm";
for $l in dataset("testds") return $l
diff --git a/asterix-app/src/test/resources/runtimets/results/dml/opentype-c2o.adm b/asterix-app/src/test/resources/runtimets/results/dml/opentype-c2o.adm
new file mode 100644
index 0000000..834bcef
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/dml/opentype-c2o.adm
@@ -0,0 +1,4 @@
+{ "id": "001", "name": "Person Three", "hobby": "music" }
+{ "id": "002", "name": "Person One", "hobby": "sports" }
+{ "id": "003", "name": "Person Two", "hobby": "movie" }
+{ "id": "004", "name": "Person Four", "hobby": "swimming" }
diff --git a/asterix-app/src/test/resources/runtimets/results/dml/opentype-o2c.adm b/asterix-app/src/test/resources/runtimets/results/dml/opentype-o2c.adm
new file mode 100644
index 0000000..0c1e6d3
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/dml/opentype-o2c.adm
@@ -0,0 +1,4 @@
+{ "id": "001", "name": "Person Three", "hobby": "music" }
+{ "id": "002", "name": "Person Three", "hobby": "football" }
+{ "id": "003", "name": "Person Three", "hobby": "movie" }
+{ "id": "004", "name": "Person Three", "hobby": "swimming" }
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
index 5f8334d..9aeac2b 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
@@ -22,6 +22,7 @@
import edu.uci.ics.asterix.om.typecomputer.impl.ARectangleTypeComputer;
import edu.uci.ics.asterix.om.typecomputer.impl.AStringTypeComputer;
import edu.uci.ics.asterix.om.typecomputer.impl.BinaryBooleanOrNullFunctionTypeComputer;
+import edu.uci.ics.asterix.om.typecomputer.impl.CastRecordResultTypeComputer;
import edu.uci.ics.asterix.om.typecomputer.impl.ClosedRecordConstructorResultType;
import edu.uci.ics.asterix.om.typecomputer.impl.FieldAccessByIndexResultType;
import edu.uci.ics.asterix.om.typecomputer.impl.InjectFailureTypeComputer;
@@ -81,7 +82,6 @@
SI
}
- private final static Map<AsterixFunction, IFunctionInfo> asterixFunctionToInfo = new HashMap<AsterixFunction, IFunctionInfo>();
private final static Map<FunctionIdentifier, IFunctionInfo> asterixFunctionIdToInfo = new HashMap<FunctionIdentifier, IFunctionInfo>();
// it is supposed to be an identity mapping
@@ -334,6 +334,8 @@
true);
public final static FunctionIdentifier INJECT_FAILURE = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"inject-failure", 2, true);
+ public final static FunctionIdentifier CAST_RECORD = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "cast-record", 1, true);
public static final FunctionIdentifier EQ = AlgebricksBuiltinFunctions.EQ;
public static final FunctionIdentifier LE = AlgebricksBuiltinFunctions.LE;
@@ -351,13 +353,13 @@
IFunctionInfo finfo = asterixFunctionIdToInfo.get(fid);
if (finfo == null) {
finfo = new AsterixFunctionInfo(fid, fid.isBuiltin());
- if(fid.isBuiltin()){
+ if (fid.isBuiltin()) {
asterixFunctionIdToInfo.put(fid, finfo);
}
}
return finfo;
}
-
+
public static AsterixFunctionInfo lookupFunction(FunctionIdentifier fid) {
return (AsterixFunctionInfo) asterixFunctionIdToInfo.get(fid);
}
@@ -542,6 +544,7 @@
add(SWITCH_CASE, NonTaggedSwitchCaseComputer.INSTANCE);
add(REG_EXP, ABooleanTypeComputer.INSTANCE);
add(INJECT_FAILURE, InjectFailureTypeComputer.INSTANCE);
+ add(CAST_RECORD, CastRecordResultTypeComputer.INSTANCE);
add(TID, AInt32TypeComputer.INSTANCE);
add(TIME_CONSTRUCTOR, OptionalATimeTypeComputer.INSTANCE);
@@ -676,8 +679,6 @@
IFunctionInfo finfo = getAsterixFunctionInfo(fi);
return finfo == null ? null : finfo.getFunctionIdentifier();
}
-
-
public static AggregateFunctionCallExpression makeAggregateFunctionExpression(FunctionIdentifier fi,
List<Mutable<ILogicalExpression>> args) {
@@ -697,7 +698,7 @@
public static boolean isAggregateFunctionSerializable(FunctionIdentifier fi) {
IFunctionInfo finfo = getAsterixFunctionInfo(fi);
- return aggregateToSerializableAggregate.get(finfo) != null ;
+ return aggregateToSerializableAggregate.get(finfo) != null;
}
public static AggregateFunctionCallExpression makeSerializableAggregateFunctionExpression(FunctionIdentifier fi,
@@ -706,8 +707,9 @@
IFunctionInfo finfo = getAsterixFunctionInfo(fi);
IFunctionInfo serializableFinfo = aggregateToSerializableAggregate.get(finfo);
if (serializableFinfo == null)
- throw new IllegalStateException("no serializable implementation for aggregate function " + serializableFinfo);
-
+ throw new IllegalStateException("no serializable implementation for aggregate function "
+ + serializableFinfo);
+
IFunctionInfo fiLocal = aggregateToLocalAggregate.get(serializableFinfo);
IFunctionInfo fiGlobal = aggregateToGlobalAggregate.get(serializableFinfo);
@@ -735,7 +737,8 @@
builtinFunctionsSet.put(functionInfo, functionInfo);
funTypeComputer.put(functionInfo, typeComputer);
asterixFunctionIdToInfo.put(fi, functionInfo);
- // AsterixFunction asterixFunction = new AsterixFunction(fi.getName(), fi.getArity());
+ // AsterixFunction asterixFunction = new AsterixFunction(fi.getName(),
+ // fi.getArity());
}
private static void addAgg(FunctionIdentifier fi) {
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/base/TypeComputerUtilities.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/base/TypeComputerUtilities.java
index abd4d66..e8786ec 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/base/TypeComputerUtilities.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/base/TypeComputerUtilities.java
@@ -21,13 +21,14 @@
return openType;
}
- public static boolean setRequiredType(AbstractFunctionCallExpression expr, ARecordType requiredRecordType) {
+ public static boolean setRequiredAndInputTypes(AbstractFunctionCallExpression expr, ARecordType requiredRecordType,
+ ARecordType inputRecordType) {
boolean changed = false;
Object opaqueParameter = expr.getOpaqueParameters();
if (opaqueParameter == null) {
- opaqueParameter = requiredRecordType;
- Object[] opaqueParameters = new Object[1];
- opaqueParameters[0] = opaqueParameter;
+ Object[] opaqueParameters = new Object[2];
+ opaqueParameters[0] = requiredRecordType;
+ opaqueParameters[1] = inputRecordType;
expr.setOpaqueParameters(opaqueParameters);
changed = true;
}
@@ -43,4 +44,13 @@
return null;
}
+ public static ARecordType getInputType(AbstractFunctionCallExpression expr) {
+ Object[] type = expr.getOpaqueParameters();
+ if (type != null) {
+ ARecordType recordType = (ARecordType) type[1];
+ return recordType;
+ } else
+ return null;
+ }
+
}
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/CastRecordResultTypeComputer.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/CastRecordResultTypeComputer.java
new file mode 100644
index 0000000..40560fa
--- /dev/null
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/CastRecordResultTypeComputer.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.asterix.om.typecomputer.impl;
+
+import edu.uci.ics.asterix.om.typecomputer.base.IResultTypeComputer;
+import edu.uci.ics.asterix.om.typecomputer.base.TypeComputerUtilities;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+
+public class CastRecordResultTypeComputer implements IResultTypeComputer {
+
+ public static final CastRecordResultTypeComputer INSTANCE = new CastRecordResultTypeComputer();
+
+ @Override
+ public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
+ IMetadataProvider<?, ?> metadataProvider) throws AlgebricksException {
+ ScalarFunctionCallExpression funcExpr = (ScalarFunctionCallExpression) expression;
+ return TypeComputerUtilities.getRequiredType(funcExpr);
+ }
+}
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/RecordConstructorResultType.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/RecordConstructorResultType.java
index 8327f30..d20f43b 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/RecordConstructorResultType.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/RecordConstructorResultType.java
@@ -7,6 +7,7 @@
import edu.uci.ics.asterix.om.base.AString;
import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
import edu.uci.ics.asterix.om.typecomputer.base.IResultTypeComputer;
+import edu.uci.ics.asterix.om.typecomputer.base.TypeComputerUtilities;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -32,6 +33,9 @@
public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
IMetadataProvider<?, ?> metadataProvider) throws AlgebricksException {
AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression;
+ IAType reqType = TypeComputerUtilities.getRequiredType(f);
+ if (reqType != null)
+ return reqType;
int n = f.getArguments().size() / 2;
String[] fieldNames = new String[n];
IAType[] fieldTypes = new IAType[n];
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/CastRecordDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/CastRecordDescriptor.java
new file mode 100644
index 0000000..f7d3887
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/CastRecordDescriptor.java
@@ -0,0 +1,71 @@
+package edu.uci.ics.asterix.runtime.evaluators.functions;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import edu.uci.ics.asterix.runtime.util.ARecordAccessor;
+import edu.uci.ics.asterix.runtime.util.ARecordCaster;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class CastRecordDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+
+ protected static final FunctionIdentifier FID_CAST = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "cast-record", 1, true);
+
+ private static final long serialVersionUID = 1L;
+ private ARecordType reqType;
+ private ARecordType inputType;
+
+ public void reset(ARecordType reqType, ARecordType inputType) {
+ this.reqType = reqType;
+ this.inputType = inputType;
+ }
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return FID_CAST;
+ }
+
+ @Override
+ public IEvaluatorFactory createEvaluatorFactory(final IEvaluatorFactory[] args) {
+ final IEvaluatorFactory recordEvalFactory = args[0];
+
+ return new IEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+ final DataOutput out = output.getDataOutput();
+ final ArrayBackedValueStorage recordBuffer = new ArrayBackedValueStorage();
+ final IEvaluator recEvaluator = recordEvalFactory.createEvaluator(recordBuffer);
+
+ return new IEvaluator() {
+ final ARecordAccessor recAccessor = new ARecordAccessor(inputType);
+ final ARecordCaster caster = new ARecordCaster();
+
+ @Override
+ public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+ try {
+ recordBuffer.reset();
+ recEvaluator.evaluate(tuple);
+ recAccessor.reset(recordBuffer.getBytes(), recordBuffer.getStartIndex(),
+ recordBuffer.getLength());
+ caster.castRecord(recAccessor, reqType, out);
+ } catch (IOException ioe) {
+ throw new AlgebricksException(ioe);
+ }
+ }
+ };
+ }
+ };
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
index 467cd03..dd984d9 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
@@ -32,6 +32,7 @@
import edu.uci.ics.asterix.om.functions.FunctionManagerHolder;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionManager;
+import edu.uci.ics.asterix.om.typecomputer.base.TypeComputerUtilities;
import edu.uci.ics.asterix.om.types.AOrderedListType;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
@@ -76,6 +77,7 @@
import edu.uci.ics.asterix.runtime.evaluators.constructors.ATimeConstructorDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.AndDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.AnyCollectionMemberDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.CastRecordDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.ClosedRecordConstructorDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.ContainsDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.CountHashedGramTokensDescriptor;
@@ -311,6 +313,7 @@
temp.add(new SwitchCaseDescriptor());
temp.add(new RegExpDescriptor());
temp.add(new InjectFailureDescriptor());
+ temp.add(new CastRecordDescriptor());
IFunctionManager mgr = new FunctionManagerImpl();
for (IFunctionDescriptor fd : temp) {
@@ -362,8 +365,8 @@
} catch (HyracksDataException e) {
throw new AlgebricksException(e);
}
- IEvaluatorFactory fldIndexEvalFactory = new ConstantEvalFactory(Arrays.copyOf(abvs.getBytes(), abvs
- .getLength()));
+ IEvaluatorFactory fldIndexEvalFactory = new ConstantEvalFactory(Arrays.copyOf(abvs.getBytes(),
+ abvs.getLength()));
IEvaluatorFactory evalFactory = new FieldAccessByIndexEvalFactory(recordEvalFactory,
fldIndexEvalFactory, recType);
return evalFactory;
@@ -388,8 +391,8 @@
} catch (HyracksDataException e) {
throw new AlgebricksException(e);
}
- IEvaluatorFactory dimensionEvalFactory = new ConstantEvalFactory(Arrays.copyOf(abvs1.getBytes(), abvs1
- .getLength()));
+ IEvaluatorFactory dimensionEvalFactory = new ConstantEvalFactory(Arrays.copyOf(abvs1.getBytes(),
+ abvs1.getLength()));
for (int i = 0; i < numOfFields; i++) {
ArrayBackedValueStorage abvs2 = new ArrayBackedValueStorage();
@@ -400,8 +403,8 @@
} catch (HyracksDataException e) {
throw new AlgebricksException(e);
}
- IEvaluatorFactory coordinateEvalFactory = new ConstantEvalFactory(Arrays.copyOf(abvs2.getBytes(), abvs2
- .getLength()));
+ IEvaluatorFactory coordinateEvalFactory = new ConstantEvalFactory(Arrays.copyOf(abvs2.getBytes(),
+ abvs2.getLength()));
evalFactories[i] = new CreateMBREvalFactory(evalFactory, dimensionEvalFactory, coordinateEvalFactory);
}
@@ -427,8 +430,8 @@
} catch (HyracksDataException e) {
throw new AlgebricksException(e);
}
- IEvaluatorFactory fldIndexEvalFactory = new ConstantEvalFactory(Arrays.copyOf(abvs.getBytes(), abvs
- .getLength()));
+ IEvaluatorFactory fldIndexEvalFactory = new ConstantEvalFactory(Arrays.copyOf(abvs.getBytes(),
+ abvs.getLength()));
IEvaluatorFactory evalFactory = new FieldAccessByIndexEvalFactory(recordEvalFactory,
fldIndexEvalFactory, recType);
IFunctionInfo finfoAccess = AsterixBuiltinFunctions
@@ -469,10 +472,15 @@
((ListifyAggregateDescriptor) fd).reset(new AOrderedListType(itemType, null));
}
}
+ if (fd.getIdentifier().equals(AsterixBuiltinFunctions.CAST_RECORD)) {
+ ARecordType rt = (ARecordType) TypeComputerUtilities.getRequiredType((AbstractFunctionCallExpression) expr);
+ ARecordType it = (ARecordType) TypeComputerUtilities.getInputType((AbstractFunctionCallExpression) expr);
+ ((CastRecordDescriptor) fd).reset(rt, it);
+ }
if (fd.getIdentifier().equals(AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR)) {
ARecordType rt = (ARecordType) context.getType(expr);
- ((OpenRecordConstructorDescriptor) fd).reset(rt, computeOpenFields((AbstractFunctionCallExpression) expr,
- rt));
+ ((OpenRecordConstructorDescriptor) fd).reset(rt,
+ computeOpenFields((AbstractFunctionCallExpression) expr, rt));
}
if (fd.getIdentifier().equals(AsterixBuiltinFunctions.CLOSED_RECORD_CONSTRUCTOR)) {
((ClosedRecordConstructorDescriptor) fd).reset((ARecordType) context.getType(expr));
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ARecordAccessor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ARecordAccessor.java
new file mode 100644
index 0000000..f02ecee
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ARecordAccessor.java
@@ -0,0 +1,270 @@
+package edu.uci.ics.asterix.runtime.util;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.asterix.dataflow.data.nontagged.AqlNullWriterFactory;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AUnionType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IValueReference;
+
+public class ARecordAccessor implements IValueReference {
+
+ private List<SimpleValueReference> fieldNames = new ArrayList<SimpleValueReference>();
+ private List<SimpleValueReference> fieldTypeTags = new ArrayList<SimpleValueReference>();
+ private List<SimpleValueReference> fieldValues = new ArrayList<SimpleValueReference>();
+
+ private byte[] typeBuffer = new byte[32768];
+ private ResetableByteArrayOutputStream typeBos = new ResetableByteArrayOutputStream();
+ private DataOutputStream typeDos = new DataOutputStream(typeBos);
+
+ private byte[] dataBuffer = new byte[32768];
+ private ResetableByteArrayOutputStream dataBos = new ResetableByteArrayOutputStream();
+ private DataOutputStream dataDos = new DataOutputStream(dataBos);
+
+ private int closedPartTypeInfoSize = 0;
+ private ARecordType inputRecType;
+
+ private int numberOfSchemaFields;
+ private int offsetArrayOffset;
+ private int[] fieldOffsets;
+ private int fieldCursor = -1;
+ private ATypeTag typeTag = ATypeTag.ANY;
+ private SimpleValueReference nullReference = new SimpleValueReference();
+
+ private byte[] data;
+ private int start;
+ private int len;
+
+ public ARecordAccessor(ARecordType inputType) {
+ this.inputRecType = inputType;
+ IAType[] fieldTypes = inputType.getFieldTypes();
+ String[] fieldNameStrs = inputType.getFieldNames();
+ numberOfSchemaFields = fieldTypes.length;
+
+ // initialize the buffer for closed parts(fieldName bytes+ type bytes) +
+ // constant(null bytes)
+ typeBos.setByteArray(typeBuffer, 0);
+ try {
+ for (int i = 0; i < numberOfSchemaFields; i++) {
+ ATypeTag ftypeTag = fieldTypes[i].getTypeTag();
+
+ // add type tag Reference
+ int tagStart = typeBos.size();
+ typeDos.writeByte(ftypeTag.serialize());
+ int tagEnd = typeBos.size();
+ SimpleValueReference typeTagReference = new SimpleValueReference();
+ typeTagReference.reset(typeBuffer, tagStart, tagEnd - tagStart);
+ fieldTypeTags.add(typeTagReference);
+
+ // add type name Reference (including a astring type tag)
+ int nameStart = typeBos.size();
+ typeDos.writeByte(ATypeTag.STRING.serialize());
+ typeDos.writeUTF(fieldNameStrs[i]);
+ int nameEnd = typeBos.size();
+ SimpleValueReference typeNameReference = new SimpleValueReference();
+ typeNameReference.reset(typeBuffer, nameStart, nameEnd - nameStart);
+ fieldNames.add(typeNameReference);
+ }
+
+ // initialize a constant: null value bytes reference
+ int nullFieldStart = typeBos.size();
+ INullWriter nullWriter = AqlNullWriterFactory.INSTANCE.createNullWriter();
+ nullWriter.writeNull(typeDos);
+ int nullFieldEnd = typeBos.size();
+ nullReference.reset(typeBuffer, nullFieldStart, nullFieldEnd - nullFieldStart);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ closedPartTypeInfoSize = typeBos.size();
+ fieldOffsets = new int[numberOfSchemaFields];
+ }
+
+ private void reset() {
+ typeBos.setByteArray(typeBuffer, closedPartTypeInfoSize);
+ dataBos.setByteArray(dataBuffer, 0);
+ fieldCursor = -1;
+ }
+
+ public void reset(byte[] b, int start, int len) {
+ // clear the previous states
+ reset();
+ this.data = b;
+ this.start = start;
+ this.len = len;
+
+ boolean isExpanded = false;
+ int openPartOffset = 0;
+ int s = start;
+ int recordOffset = s;
+ if (inputRecType == null) {
+ openPartOffset = s + AInt32SerializerDeserializer.getInt(b, s + 6);
+ s += 8;
+ isExpanded = true;
+ } else {
+ if (inputRecType.isOpen()) {
+ isExpanded = b[s + 5] == 1 ? true : false;
+ if (isExpanded) {
+ openPartOffset = s + AInt32SerializerDeserializer.getInt(b, s + 6);
+ s += 10;
+ } else
+ s += 6;
+ } else
+ s += 5;
+ }
+ try {
+ if (numberOfSchemaFields > 0) {
+ s += 4;
+ int nullBitMapOffset = 0;
+ boolean hasNullableFields = NonTaggedFormatUtil.hasNullableField(inputRecType);
+ if (hasNullableFields) {
+ nullBitMapOffset = s;
+ offsetArrayOffset = s
+ + (this.numberOfSchemaFields % 8 == 0 ? numberOfSchemaFields / 8
+ : numberOfSchemaFields / 8 + 1);
+ } else {
+ offsetArrayOffset = s;
+ }
+ for (int i = 0; i < numberOfSchemaFields; i++) {
+ fieldOffsets[i] = AInt32SerializerDeserializer.getInt(b, offsetArrayOffset) + recordOffset;
+ offsetArrayOffset += 4;
+ }
+ for (int fieldNumber = 0; fieldNumber < numberOfSchemaFields; fieldNumber++) {
+ next();
+ if (hasNullableFields) {
+ byte b1 = b[nullBitMapOffset + fieldNumber / 8];
+ int p = 1 << (7 - (fieldNumber % 8));
+ if ((b1 & p) == 0) {
+ // set null value (including type tag inside)
+ nextFieldValue().reset(nullReference);
+ continue;
+ }
+ }
+ IAType[] fieldTypes = inputRecType.getFieldTypes();
+ ATypeTag tag = ATypeTag.ANY;
+ int fieldValueLength = 0;
+
+ if (fieldTypes[fieldNumber].getTypeTag() == ATypeTag.UNION) {
+ if (NonTaggedFormatUtil.isOptionalField((AUnionType) fieldTypes[fieldNumber])) {
+ tag = ((AUnionType) fieldTypes[fieldNumber]).getUnionList()
+ .get(NonTaggedFormatUtil.OPTIONAL_TYPE_INDEX_IN_UNION_LIST).getTypeTag();
+ fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(b, fieldOffsets[fieldNumber],
+ tag, false);
+ }
+ } else {
+ tag = fieldTypes[fieldNumber].getTypeTag();
+ fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(b, fieldOffsets[fieldNumber], tag,
+ false);
+ }
+ // set field value (including the type tag)
+ int fstart = dataBos.size();
+ dataDos.writeByte(tag.serialize());
+ dataDos.write(b, fieldOffsets[fieldNumber], fieldValueLength);
+ int fend = dataBos.size();
+ nextFieldValue().reset(dataBuffer, fstart, fend - fstart);
+ }
+ }
+ if (isExpanded) {
+ int numberOfOpenFields = AInt32SerializerDeserializer.getInt(b, openPartOffset);
+ int fieldOffset = openPartOffset + 4 + (8 * numberOfOpenFields);
+ for (int i = 0; i < numberOfOpenFields; i++) {
+ next();
+ // set the field name (including a type tag, which is
+ // astring)
+ int fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(b, fieldOffset, ATypeTag.STRING,
+ false);
+ int fnstart = dataBos.size();
+ dataDos.writeByte(ATypeTag.STRING.serialize());
+ dataDos.write(b, fieldOffset, fieldValueLength);
+ int fnend = dataBos.size();
+ nextFieldName().reset(dataBuffer, fnstart, fnend - fnstart);
+ fieldOffset += fieldValueLength;
+
+ // set the field type tag
+ nextFieldType().reset(b, fieldOffset, 1);
+ typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b[fieldOffset]);
+
+ // set the field value (already including type tag)
+ fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(b, fieldOffset, typeTag, true);
+ nextFieldValue().reset(b, fieldOffset, fieldValueLength);
+ fieldOffset += fieldValueLength;
+ }
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private void next() {
+ fieldCursor++;
+ }
+
+ private SimpleValueReference nextFieldName() {
+ if (fieldCursor < fieldNames.size()) {
+ return fieldNames.get(fieldCursor);
+ } else {
+ SimpleValueReference fieldNameReference = new SimpleValueReference();
+ fieldNames.add(fieldNameReference);
+ return fieldNameReference;
+ }
+ }
+
+ private SimpleValueReference nextFieldType() {
+ if (fieldCursor < fieldTypeTags.size()) {
+ return fieldTypeTags.get(fieldCursor);
+ } else {
+ SimpleValueReference fieldTypeReference = new SimpleValueReference();
+ fieldTypeTags.add(fieldTypeReference);
+ return fieldTypeReference;
+ }
+ }
+
+ private SimpleValueReference nextFieldValue() {
+ if (fieldCursor < fieldValues.size()) {
+ return fieldValues.get(fieldCursor);
+ } else {
+ SimpleValueReference fieldValueReference = new SimpleValueReference();
+ fieldValues.add(fieldValueReference);
+ return fieldValueReference;
+ }
+ }
+
+ public int getCursor() {
+ return fieldCursor;
+ }
+
+ public List<SimpleValueReference> getFieldNames() {
+ return fieldNames;
+ }
+
+ public List<SimpleValueReference> getFieldTypeTags() {
+ return fieldTypeTags;
+ }
+
+ public List<SimpleValueReference> getFieldValues() {
+ return fieldValues;
+ }
+
+ @Override
+ public byte[] getBytes() {
+ return data;
+ }
+
+ @Override
+ public int getStartIndex() {
+ return start;
+ }
+
+ @Override
+ public int getLength() {
+ return len;
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ARecordCaster.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ARecordCaster.java
new file mode 100644
index 0000000..08d6876
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ARecordCaster.java
@@ -0,0 +1,188 @@
+package edu.uci.ics.asterix.runtime.util;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.asterix.builders.RecordBuilder;
+import edu.uci.ics.asterix.dataflow.data.nontagged.AqlNullWriterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AUnionType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+
+public class ARecordCaster {
+
+ // describe closed fields in the required type
+ private int[] fieldPermutation;
+
+ // describe fields (open or not) in the input records
+ private boolean[] openFields;
+
+ private List<SimpleValueReference> reqFieldNames = new ArrayList<SimpleValueReference>();
+ private List<SimpleValueReference> reqFieldTypeTags = new ArrayList<SimpleValueReference>();
+ private ARecordType cachedReqType = null;
+
+ private byte[] buffer = new byte[32768];
+ private ResetableByteArrayOutputStream bos = new ResetableByteArrayOutputStream();
+ private DataOutputStream dos = new DataOutputStream(bos);
+
+ private RecordBuilder recBuilder = new RecordBuilder();
+ private SimpleValueReference nullReference = new SimpleValueReference();
+
+ public ARecordCaster() {
+ try {
+ bos.setByteArray(buffer, 0);
+ int start = bos.size();
+ INullWriter nullWriter = AqlNullWriterFactory.INSTANCE.createNullWriter();
+ nullWriter.writeNull(dos);
+ int end = bos.size();
+ nullReference.reset(buffer, start, end - start);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public void castRecord(ARecordAccessor recordAccessor, ARecordType reqType, DataOutput output) throws IOException {
+ List<SimpleValueReference> fieldNames = recordAccessor.getFieldNames();
+ List<SimpleValueReference> fieldTypeTags = recordAccessor.getFieldTypeTags();
+ List<SimpleValueReference> fieldValues = recordAccessor.getFieldValues();
+
+ if (openFields == null || fieldNames.size() > openFields.length) {
+ openFields = new boolean[fieldNames.size()];
+ }
+ if (cachedReqType == null || !reqType.equals(cachedReqType)) {
+ loadRequiredType(reqType);
+ }
+
+ // clear the previous states
+ reset();
+ matchClosedPart(fieldNames, fieldTypeTags, fieldValues);
+ writeOutput(fieldNames, fieldTypeTags, fieldValues, output);
+ }
+
+ private void reset() {
+ for (int i = 0; i < openFields.length; i++)
+ openFields[i] = true;
+ for (int i = 0; i < fieldPermutation.length; i++)
+ fieldPermutation[i] = -1;
+ }
+
+ private void loadRequiredType(ARecordType reqType) throws IOException {
+ reqFieldNames.clear();
+ reqFieldTypeTags.clear();
+
+ cachedReqType = reqType;
+ int numSchemaFields = reqType.getFieldTypes().length;
+ IAType[] fieldTypes = reqType.getFieldTypes();
+ String[] fieldNames = reqType.getFieldNames();
+ fieldPermutation = new int[numSchemaFields];
+
+ bos.setByteArray(buffer, nullReference.getStartIndex() + nullReference.getLength());
+ for (int i = 0; i < numSchemaFields; i++) {
+ ATypeTag ftypeTag = fieldTypes[i].getTypeTag();
+ String fname = fieldNames[i];
+
+ // add type tag pointable
+ int tagStart = bos.size();
+ dos.writeByte(ftypeTag.serialize());
+ int tagEnd = bos.size();
+ SimpleValueReference typeTagPointable = new SimpleValueReference();
+ typeTagPointable.reset(buffer, tagStart, tagEnd - tagStart);
+ reqFieldTypeTags.add(typeTagPointable);
+
+ // add type name pointable (including a string type tag)
+ int nameStart = bos.size();
+ dos.write(ATypeTag.STRING.serialize());
+ dos.writeUTF(fname);
+ int nameEnd = bos.size();
+ SimpleValueReference typeNamePointable = new SimpleValueReference();
+ typeNamePointable.reset(buffer, nameStart, nameEnd - nameStart);
+ reqFieldNames.add(typeNamePointable);
+ }
+ }
+
+ private void matchClosedPart(List<SimpleValueReference> fieldNames, List<SimpleValueReference> fieldTypeTags,
+ List<SimpleValueReference> fieldValues) {
+ // forward match: match from actual to required
+ boolean matched = false;
+ for (int i = 0; i < fieldNames.size(); i++) {
+ SimpleValueReference fieldName = fieldNames.get(i);
+ SimpleValueReference fieldTypeTag = fieldTypeTags.get(i);
+ matched = false;
+ for (int j = 0; j < reqFieldNames.size(); j++) {
+ SimpleValueReference reqFieldName = reqFieldNames.get(j);
+ SimpleValueReference reqFieldTypeTag = reqFieldTypeTags.get(j);
+ if (fieldName.equals(reqFieldName) && fieldTypeTag.equals(reqFieldTypeTag)) {
+ fieldPermutation[j] = i;
+ openFields[i] = false;
+ matched = true;
+ break;
+ }
+ }
+ if (matched)
+ continue;
+ // the input has extra fields
+ if (!cachedReqType.isOpen())
+ throw new IllegalStateException("type mismatch: including extra closed fields");
+ }
+
+ // backward match: match from required to actual
+ for (int i = 0; i < reqFieldNames.size(); i++) {
+ SimpleValueReference reqFieldName = reqFieldNames.get(i);
+ SimpleValueReference reqFieldTypeTag = reqFieldTypeTags.get(i);
+ matched = false;
+ for (int j = 0; j < fieldNames.size(); j++) {
+ SimpleValueReference fieldName = fieldNames.get(j);
+ SimpleValueReference fieldTypeTag = fieldTypeTags.get(j);
+ if (fieldName.equals(reqFieldName) && fieldTypeTag.equals(reqFieldTypeTag)) {
+ matched = true;
+ break;
+ }
+ }
+ if (matched)
+ continue;
+
+ IAType t = cachedReqType.getFieldTypes()[i];
+ if (t.getTypeTag() == ATypeTag.UNION && NonTaggedFormatUtil.isOptionalField((AUnionType) t)) {
+ // add a null field into the end of field name list and type
+ // list
+ fieldNames.add(reqFieldName);
+ fieldTypeTags.add(reqFieldTypeTag);
+ fieldValues.add(nullReference);
+ fieldPermutation[i] = fieldNames.size() - 1;
+ } else {
+ // no matched field in the input for a required closed field
+ throw new IllegalStateException("type mismatch: miss a required closed field");
+ }
+ }
+ }
+
+ private void writeOutput(List<SimpleValueReference> fieldNames, List<SimpleValueReference> fieldTypeTags,
+ List<SimpleValueReference> fieldValues, DataOutput output) throws IOException {
+ // reset the states of the record builder
+ recBuilder.reset(cachedReqType);
+ recBuilder.init();
+
+ // write the closed part
+ for (int i = 0; i < fieldPermutation.length; i++) {
+ int pos = fieldPermutation[i];
+ SimpleValueReference field = fieldValues.get(pos);
+ recBuilder.addField(i, field);
+ }
+
+ // write the open part
+ for (int i = 0; i < openFields.length; i++) {
+ if (openFields[i]) {
+ SimpleValueReference name = fieldNames.get(i);
+ SimpleValueReference field = fieldValues.get(i);
+ recBuilder.addField(name, field);
+ }
+ }
+ recBuilder.write(output, true);
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ResetableByteArrayInputStream.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ResetableByteArrayInputStream.java
new file mode 100644
index 0000000..7f8f0e9
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ResetableByteArrayInputStream.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.util;
+
+import java.io.ByteArrayInputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ResetableByteArrayInputStream extends ByteArrayInputStream {
+ private static final Logger LOGGER = Logger.getLogger(ResetableByteArrayInputStream.class.getName());
+
+ private byte[] data;
+ private int position;
+
+ public ResetableByteArrayInputStream(byte[] data) {
+ super(data);
+ }
+
+ public void setByteArray(byte[] data, int position) {
+ this.data = data;
+ this.position = position;
+ }
+
+ @Override
+ public int read() {
+ int remaining = data.length - position;
+ int value = remaining > 0 ? (data[position++] & 0xff) : -1;
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("read(): value: " + value + " remaining: " + remaining + " position: " + position);
+ }
+ return value;
+ }
+
+ @Override
+ public int read(byte[] bytes, int offset, int length) {
+ int remaining = data.length - position;
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("read(bytes[], int, int): remaining: " + remaining + " offset: " + offset + " length: "
+ + length + " position: " + position);
+ }
+ if (remaining == 0) {
+ return -1;
+ }
+ int l = Math.min(length, remaining);
+ System.arraycopy(data, position, bytes, offset, l);
+ position += l;
+ return l;
+ }
+}
\ No newline at end of file
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ResetableByteArrayOutputStream.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ResetableByteArrayOutputStream.java
new file mode 100644
index 0000000..2958314
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ResetableByteArrayOutputStream.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.util;
+
+import java.io.ByteArrayOutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ResetableByteArrayOutputStream extends ByteArrayOutputStream {
+ private static final Logger LOGGER = Logger.getLogger(ResetableByteArrayOutputStream.class.getName());
+
+ private byte[] data;
+ private int position;
+
+ public ResetableByteArrayOutputStream() {
+ }
+
+ public void setByteArray(byte[] data, int position) {
+ this.data = data;
+ this.position = position;
+ }
+
+ @Override
+ public void write(int b) {
+ int remaining = data.length - position;
+ if (position + 1 > data.length - 1)
+ throw new IndexOutOfBoundsException();
+ data[position] = (byte) b;
+ position++;
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("write(): value: " + b + " remaining: " + remaining + " position: " + position);
+ }
+ }
+
+ @Override
+ public void write(byte[] bytes, int offset, int length) {
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("write(bytes[], int, int) offset: " + offset + " length: " + length + " position: "
+ + position);
+ }
+ if (position + length > data.length - 1)
+ throw new IndexOutOfBoundsException();
+ System.arraycopy(bytes, offset, data, position, length);
+ position += length;
+ }
+
+ @Override
+ public int size(){
+ return position;
+ }
+}
\ No newline at end of file
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/SimpleValueReference.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/SimpleValueReference.java
new file mode 100644
index 0000000..bb54624
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/SimpleValueReference.java
@@ -0,0 +1,55 @@
+package edu.uci.ics.asterix.runtime.util;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IValueReference;
+
+public class SimpleValueReference implements IValueReference {
+
+ private byte[] data;
+ private int start;
+ private int len;
+
+ public void reset(byte[] data, int start, int len) {
+ this.data = data;
+ this.start = start;
+ this.len = len;
+ }
+
+ public void reset(IValueReference ivf) {
+ this.data = ivf.getBytes();
+ this.start = ivf.getStartIndex();
+ this.len = ivf.getLength();
+ }
+
+ @Override
+ public byte[] getBytes() {
+ return data;
+ }
+
+ @Override
+ public int getStartIndex() {
+ return start;
+ }
+
+ @Override
+ public int getLength() {
+ return len;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof IValueReference))
+ return false;
+ IValueReference ivf = (IValueReference) o;
+ byte[] odata = ivf.getBytes();
+ int ostart = ivf.getStartIndex();
+ int olen = ivf.getLength();
+
+ if (len != olen)
+ return false;
+ for (int i = 0; i < len; i++) {
+ if (data[start + i] != odata[ostart + i])
+ return false;
+ }
+ return true;
+ }
+}