Fix Indexing on Open fields and Meta fields
This change fix the following cases:
1. Build a secondary index on a meta field: success
2. Build an open index on a meta field: failure
3. Build a secondary index on an open field in record part: success
Testing ingestion and querying are working correctly for these cases.
Change-Id: I6195149940f150250a65f2515e9ac9d6de2a33f9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/930
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
index 6205962..f5cfa2b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
@@ -102,17 +102,30 @@
InsertDeleteUpsertOperator insertOp = (InsertDeleteUpsertOperator) op1;
boolean isBulkload = insertOp.isBulkload();
ILogicalExpression recordExpr = insertOp.getPayloadExpression().getValue();
+ List<Mutable<ILogicalExpression>> metaExprs = insertOp.getAdditionalNonFilteringExpressions();
LogicalVariable recordVar = null;
+ LogicalVariable metaVar = null;
List<LogicalVariable> usedRecordVars = new ArrayList<>();
/** assume the payload is always a single variable expression */
recordExpr.getUsedVariables(usedRecordVars);
if (usedRecordVars.size() == 1) {
recordVar = usedRecordVars.get(0);
}
+ if (metaExprs != null) {
+ List<LogicalVariable> metaVars = new ArrayList<>();
+ for (Mutable<ILogicalExpression> expr : metaExprs) {
+ expr.getValue().getUsedVariables(metaVars);
+ }
+ if (metaVars.size() > 1) {
+ throw new AlgebricksException(
+ "Number of meta fields can't be more than 1. Number of meta fields found = " + metaVars.size());
+ }
+ metaVar = metaVars.get(0);
+ }
/**
- * op2 is the assign operator which extract primary keys from the record
- * variable
+ * op2 is the assign operator which extracts primary keys from the input
+ * variables (record or meta)
*/
AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
@@ -169,6 +182,11 @@
throw new AlgebricksException("Only record types can be indexed.");
}
ARecordType recType = (ARecordType) itemType;
+ // meta type
+ ARecordType metaType = null;
+ if (dataset.hasMetaPart()) {
+ metaType = (ARecordType) mp.findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
+ }
// recType might be replaced with enforced record type and we want to keep a reference to the original record
// type
ARecordType originalRecType = recType;
@@ -228,7 +246,7 @@
if (insertOp.getOperation() == Kind.INSERT || insertOp.getOperation() == Kind.UPSERT) {
try {
DatasetDataSource ds = (DatasetDataSource) (insertOp.getDataSource());
- ARecordType insertRecType = (ARecordType) ds.getSchemaTypes()[ds.getSchemaTypes().length - 1];
+ ARecordType insertRecType = (ARecordType) ds.getItemType();
// A new variable which represents the casted record
LogicalVariable castedRecVar = context.newVar();
// create the expected record type = the original + the optional open field
@@ -281,11 +299,19 @@
List<List<String>> secondaryKeyFields = index.getKeyFieldNames();
List<IAType> secondaryKeyTypes = index.getKeyFieldTypes();
List<LogicalVariable> secondaryKeyVars = new ArrayList<LogicalVariable>();
+ List<Integer> indicators = index.getKeyFieldSourceIndicators();
List<Mutable<ILogicalExpression>> expressions = new ArrayList<Mutable<ILogicalExpression>>();
List<Mutable<ILogicalExpression>> secondaryExpressions = new ArrayList<Mutable<ILogicalExpression>>();
- for (List<String> secondaryKey : secondaryKeyFields) {
- prepareVarAndExpression(secondaryKey, recType.getFieldNames(), enforcedRecordVar, expressions,
+ for (int i = 0; i < secondaryKeyFields.size(); i++) {
+ List<String> secondaryKey = secondaryKeyFields.get(i);
+ ARecordType sourceType = recType;
+ LogicalVariable sourceVar = enforcedRecordVar;
+ if (dataset.hasMetaPart()) {
+ sourceType = indicators.get(i).intValue() == 0 ? recType : metaType;
+ sourceVar = indicators.get(i).intValue() == 0 ? enforcedRecordVar : metaVar;
+ }
+ prepareVarAndExpression(secondaryKey, sourceType.getFieldNames(), sourceVar, expressions,
secondaryKeyVars, context);
}
// Used with upsert operation
@@ -298,14 +324,18 @@
prevSecondaryKeyVars = new ArrayList<LogicalVariable>();
prevExpressions = new ArrayList<Mutable<ILogicalExpression>>();
prevSecondaryExpressions = new ArrayList<Mutable<ILogicalExpression>>();
- for (List<String> secondaryKey : secondaryKeyFields) {
- prepareVarAndExpression(secondaryKey, originalRecType.getFieldNames(), insertOp.getPrevRecordVar(),
+ for (int i = 0; i < secondaryKeyFields.size(); i++) {
+ List<String> secondaryKey = secondaryKeyFields.get(i);
+ prepareVarAndExpression(secondaryKey,
+ (indicators.get(i).intValue() == 0) ? originalRecType.getFieldNames()
+ : metaType.getFieldNames(),
+ (indicators.get(i).intValue() == 0) ? insertOp.getPrevRecordVar()
+ : insertOp.getPrevAdditionalNonFilteringVars().get(0),
prevExpressions, prevSecondaryKeyVars, context);
}
prevSecondaryKeyAssign = new AssignOperator(prevSecondaryKeyVars, prevExpressions);
}
AssignOperator assign = new AssignOperator(secondaryKeyVars, expressions);
-
AssignOperator topAssign = assign;
if (insertOp.getOperation() == Kind.UPSERT) {
prevSecondaryKeyAssign.getInputs().add(new MutableObject<ILogicalOperator>(topAssign));
@@ -402,7 +432,9 @@
IndexInsertDeleteUpsertOperator indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex,
insertOp.getPrimaryKeyExpressions(), tokenizeKeyExprs, filterExpression,
- insertOp.getOperation(), insertOp.isBulkload());
+ insertOp.getOperation(), insertOp.isBulkload(),
+ insertOp.getAdditionalNonFilteringExpressions() == null ? 0
+ : insertOp.getAdditionalNonFilteringExpressions().size());
indexUpdate.setAdditionalFilteringExpressions(filteringExpressions);
indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(tokenUpdate));
@@ -415,7 +447,9 @@
// When TokenizeOperator is not needed
IndexInsertDeleteUpsertOperator indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex,
insertOp.getPrimaryKeyExpressions(), secondaryExpressions, filterExpression,
- insertOp.getOperation(), insertOp.isBulkload());
+ insertOp.getOperation(), insertOp.isBulkload(),
+ insertOp.getAdditionalNonFilteringExpressions() == null ? 0
+ : insertOp.getAdditionalNonFilteringExpressions().size());
indexUpdate.setAdditionalFilteringExpressions(filteringExpressions);
// We add the necessary expressions for upsert
@@ -482,7 +516,7 @@
// We do something similar for previous key if the operation is an upsert
if (insertOp.getOperation() == Kind.UPSERT) {
List<LogicalVariable> originalKeyVarList = new ArrayList<LogicalVariable>();
- List<Mutable<ILogicalExpression>> originalKeyExprList = new ArrayList<Mutable<ILogicalExpression>>();
+ List<Mutable<ILogicalExpression>> originalKeyExprList = new ArrayList<>();
// we don't do any filtering since nulls are expected here and there
for (int i = 0; i < numKeys; i++) {
LogicalVariable keyVar = context.newVar();
@@ -522,7 +556,9 @@
AqlIndex dataSourceIndex = new AqlIndex(index, dataverseName, datasetName, mp);
IndexInsertDeleteUpsertOperator indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex,
insertOp.getPrimaryKeyExpressions(), secondaryExpressions, filterExpression,
- insertOp.getOperation(), insertOp.isBulkload());
+ insertOp.getOperation(), insertOp.isBulkload(),
+ insertOp.getAdditionalNonFilteringExpressions() == null ? 0
+ : insertOp.getAdditionalNonFilteringExpressions().size());
indexUpdate.setAdditionalFilteringExpressions(filteringExpressions);
if (insertOp.getOperation() == Kind.UPSERT) {
// set old secondary key expressions
@@ -573,6 +609,9 @@
if (!index.isSecondaryIndex() || !index.isEnforcingKeyFileds()) {
continue;
}
+ if (index.hasMetaFields()) {
+ throw new AlgebricksException("Indexing an open field is only supported on the record part");
+ }
for (int i = 0; i < index.getKeyFieldNames().size(); i++) {
Stack<Pair<ARecordType, String>> nestedTypeStack = new Stack<Pair<ARecordType, String>>();
List<String> splits = index.getKeyFieldNames().get(i);
@@ -609,14 +648,10 @@
enforcedType = new ARecordType(bridgeName,
ArrayUtils.addAll(parent.getFieldNames(), enforcedType.getTypeName()), parentFieldTypes,
true);
-
} else {
//Schema is closed all the way to the field
//enforced fields are either null or strongly typed
- LinkedHashMap<String, IAType> recordNameTypesMap = new LinkedHashMap<String, IAType>();
- for (j = 0; j < nestedFieldType.getFieldNames().length; j++) {
- recordNameTypesMap.put(nestedFieldType.getFieldNames()[j], nestedFieldType.getFieldTypes()[j]);
- }
+ LinkedHashMap<String, IAType> recordNameTypesMap = createRecordNameTypeMap(nestedFieldType);
// if a an enforced field already exists and the type is correct
IAType enforcedFieldType = recordNameTypesMap.get(splits.get(splits.size() - 1));
if (enforcedFieldType != null && enforcedFieldType.getTypeTag() == ATypeTag.UNION
@@ -654,6 +689,14 @@
return enforcedType;
}
+ private static LinkedHashMap<String, IAType> createRecordNameTypeMap(ARecordType nestedFieldType) {
+ LinkedHashMap<String, IAType> recordNameTypesMap = new LinkedHashMap<>();
+ for (int j = 0; j < nestedFieldType.getFieldNames().length; j++) {
+ recordNameTypesMap.put(nestedFieldType.getFieldNames()[j], nestedFieldType.getFieldTypes()[j]);
+ }
+ return recordNameTypesMap;
+ }
+
/***
* This method takes a list of {fields}: a subset of {recordFields}, the original record variable
* and populate expressions with expressions which evaluate to those fields (using field access functions) and
@@ -675,8 +718,7 @@
List<Mutable<ILogicalExpression>> expressions, List<LogicalVariable> vars, IOptimizationContext context)
throws AlgebricksException {
// Get a reference to the record variable
- Mutable<ILogicalExpression> varRef = new MutableObject<ILogicalExpression>(
- new VariableReferenceExpression(recordVar));
+ Mutable<ILogicalExpression> varRef = new MutableObject<>(new VariableReferenceExpression(recordVar));
// Get the desired field position
int pos = -1;
if (fields.size() == 1) {
@@ -730,12 +772,12 @@
if (!NonTaggedFormatUtil.isOptional(secondaryKeyType) && !forceFilter) {
continue;
}
- ScalarFunctionCallExpression isNullFuncExpr = new ScalarFunctionCallExpression(
+ ScalarFunctionCallExpression isUnknownFuncExpr = new ScalarFunctionCallExpression(
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_UNKOWN),
new MutableObject<ILogicalExpression>(new VariableReferenceExpression(secondaryKeyVar)));
ScalarFunctionCallExpression notFuncExpr = new ScalarFunctionCallExpression(
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.NOT),
- new MutableObject<ILogicalExpression>(isNullFuncExpr));
+ new MutableObject<ILogicalExpression>(isUnknownFuncExpr));
filterExpressions.add(new MutableObject<ILogicalExpression>(notFuncExpr));
}
// No nullable secondary keys.
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
index d17b663..ed1803d 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
@@ -108,7 +108,7 @@
protected void fillSubTreeIndexExprs(OptimizableOperatorSubTree subTree,
Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs, IOptimizationContext context)
- throws AlgebricksException {
+ throws AlgebricksException {
Iterator<Map.Entry<IAccessMethod, AccessMethodAnalysisContext>> amIt = analyzedAMs.entrySet().iterator();
// Check applicability of indexes by access method type.
while (amIt.hasNext()) {
@@ -257,7 +257,7 @@
@Override
public Object getVarType(LogicalVariable var, List<LogicalVariable> nonNullVariables,
List<List<LogicalVariable>> correlatedNullableVariableLists)
- throws AlgebricksException {
+ throws AlgebricksException {
if (var.equals(optFuncExpr.getSourceVar(exprAndVarIdx.second))) {
return keyType;
}
@@ -435,7 +435,7 @@
protected boolean fillIndexExprs(List<Index> datasetIndexes, List<String> fieldName, IAType fieldType,
IOptimizableFuncExpr optFuncExpr, int matchedFuncExprIndex, int varIdx,
OptimizableOperatorSubTree matchedSubTree, AccessMethodAnalysisContext analysisCtx)
- throws AlgebricksException {
+ throws AlgebricksException {
List<Index> indexCandidates = new ArrayList<Index>();
// Add an index to the candidates if one of the indexed fields is
// fieldName
@@ -462,9 +462,18 @@
IOptimizationContext context) throws AlgebricksException {
int optFuncExprIndex = 0;
List<Index> datasetIndexes = new ArrayList<Index>();
+ LogicalVariable datasetMetaVar = null;
+ LogicalVariable datasetRecordVar = null;
if (subTree.dataSourceType != DataSourceType.COLLECTION_SCAN) {
datasetIndexes = metadataProvider.getDatasetIndexes(subTree.dataset.getDataverseName(),
subTree.dataset.getDatasetName());
+ List<LogicalVariable> datasetVars = subTree.getDataSourceVariables();
+ if (subTree.dataset.hasMetaPart()) {
+ datasetMetaVar = datasetVars.get(datasetVars.size() - 1);
+ datasetRecordVar = datasetVars.get(datasetVars.size() - 2);
+ } else {
+ datasetRecordVar = datasetVars.get(datasetVars.size() - 1);
+ }
}
for (IOptimizableFuncExpr optFuncExpr : analysisCtx.matchedFuncExprs) {
// Try to match variables from optFuncExpr to assigns or unnests.
@@ -472,73 +481,11 @@
.size(); assignOrUnnestIndex++) {
AbstractLogicalOperator op = subTree.assignsAndUnnests.get(assignOrUnnestIndex);
if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
- AssignOperator assignOp = (AssignOperator) op;
- List<LogicalVariable> varList = assignOp.getVariables();
- for (int varIndex = 0; varIndex < varList.size(); varIndex++) {
- LogicalVariable var = varList.get(varIndex);
- int optVarIndex = optFuncExpr.findLogicalVar(var);
- // No matching var in optFuncExpr.
- if (optVarIndex == -1) {
- continue;
- }
- // At this point we have matched the optimizable func
- // expr at optFuncExprIndex to an assigned variable.
- // Remember matching subtree.
- optFuncExpr.setOptimizableSubTree(optVarIndex, subTree);
- List<String> fieldName = getFieldNameFromSubTree(optFuncExpr, subTree, assignOrUnnestIndex,
- varIndex, subTree.recordType, optVarIndex,
- optFuncExpr.getFuncExpr().getArguments().get(optVarIndex).getValue());
- if (fieldName == null) {
- continue;
- }
- IAType fieldType = (IAType) context.getOutputTypeEnvironment(assignOp)
- .getType(optFuncExpr.getLogicalExpr(optVarIndex));
- // Set the fieldName in the corresponding matched
- // function expression.
- optFuncExpr.setFieldName(optVarIndex, fieldName);
- optFuncExpr.setFieldType(optVarIndex, fieldType);
-
- setTypeTag(context, subTree, optFuncExpr, optVarIndex);
- if (subTree.hasDataSource()) {
- fillIndexExprs(datasetIndexes, fieldName, fieldType, optFuncExpr, optFuncExprIndex,
- optVarIndex, subTree, analysisCtx);
- }
- }
+ analyzeAssignOp((AssignOperator) op, optFuncExpr, subTree, assignOrUnnestIndex, datasetRecordVar,
+ datasetMetaVar, context, datasetIndexes, optFuncExprIndex, analysisCtx);
} else {
- UnnestOperator unnestOp = (UnnestOperator) op;
- LogicalVariable var = unnestOp.getVariable();
- int funcVarIndex = optFuncExpr.findLogicalVar(var);
- // No matching var in optFuncExpr.
- if (funcVarIndex == -1) {
- continue;
- }
- // At this point we have matched the optimizable func expr
- // at optFuncExprIndex to an unnest variable.
- // Remember matching subtree.
- optFuncExpr.setOptimizableSubTree(funcVarIndex, subTree);
- List<String> fieldName = null;
- if (subTree.dataSourceType == DataSourceType.COLLECTION_SCAN) {
- optFuncExpr.setLogicalExpr(funcVarIndex, new VariableReferenceExpression(var));
- } else {
- fieldName = getFieldNameFromSubTree(optFuncExpr, subTree, assignOrUnnestIndex, 0,
- subTree.recordType, funcVarIndex,
- optFuncExpr.getFuncExpr().getArguments().get(funcVarIndex).getValue());
- if (fieldName == null) {
- continue;
- }
- }
- IAType fieldType = (IAType) context.getOutputTypeEnvironment(unnestOp)
- .getType(optFuncExpr.getLogicalExpr(funcVarIndex));
- // Set the fieldName in the corresponding matched function
- // expression.
- optFuncExpr.setFieldName(funcVarIndex, fieldName);
- optFuncExpr.setFieldType(funcVarIndex, fieldType);
-
- setTypeTag(context, subTree, optFuncExpr, funcVarIndex);
- if (subTree.hasDataSource()) {
- fillIndexExprs(datasetIndexes, fieldName, fieldType, optFuncExpr, optFuncExprIndex,
- funcVarIndex, subTree, analysisCtx);
- }
+ analyzeUnnestOp((UnnestOperator) op, optFuncExpr, subTree, assignOrUnnestIndex, datasetRecordVar,
+ datasetMetaVar, context, datasetIndexes, optFuncExprIndex, analysisCtx);
}
}
@@ -567,10 +514,88 @@
}
}
+ private void analyzeUnnestOp(UnnestOperator unnestOp, IOptimizableFuncExpr optFuncExpr,
+ OptimizableOperatorSubTree subTree, int assignOrUnnestIndex, LogicalVariable datasetRecordVar,
+ LogicalVariable datasetMetaVar, IOptimizationContext context, List<Index> datasetIndexes,
+ int optFuncExprIndex, AccessMethodAnalysisContext analysisCtx) throws AlgebricksException {
+ LogicalVariable var = unnestOp.getVariable();
+ int funcVarIndex = optFuncExpr.findLogicalVar(var);
+ // No matching var in optFuncExpr.
+ if (funcVarIndex == -1) {
+ return;
+ }
+ // At this point we have matched the optimizable func expr
+ // at optFuncExprIndex to an unnest variable.
+ // Remember matching subtree.
+ optFuncExpr.setOptimizableSubTree(funcVarIndex, subTree);
+ List<String> fieldName = null;
+ if (subTree.dataSourceType == DataSourceType.COLLECTION_SCAN) {
+ optFuncExpr.setLogicalExpr(funcVarIndex, new VariableReferenceExpression(var));
+ } else {
+ fieldName = getFieldNameFromSubTree(optFuncExpr, subTree, assignOrUnnestIndex, 0, subTree.recordType,
+ funcVarIndex, optFuncExpr.getFuncExpr().getArguments().get(funcVarIndex).getValue(),
+ datasetRecordVar, subTree.metaRecordType, datasetMetaVar);
+ if (fieldName == null) {
+ return;
+ }
+ }
+ IAType fieldType = (IAType) context.getOutputTypeEnvironment(unnestOp)
+ .getType(optFuncExpr.getLogicalExpr(funcVarIndex));
+ // Set the fieldName in the corresponding matched function
+ // expression.
+ optFuncExpr.setFieldName(funcVarIndex, fieldName);
+ optFuncExpr.setFieldType(funcVarIndex, fieldType);
+
+ setTypeTag(context, subTree, optFuncExpr, funcVarIndex);
+ if (subTree.hasDataSource()) {
+ fillIndexExprs(datasetIndexes, fieldName, fieldType, optFuncExpr, optFuncExprIndex, funcVarIndex, subTree,
+ analysisCtx);
+ }
+ }
+
+ private void analyzeAssignOp(AssignOperator assignOp, IOptimizableFuncExpr optFuncExpr,
+ OptimizableOperatorSubTree subTree, int assignOrUnnestIndex, LogicalVariable datasetRecordVar,
+ LogicalVariable datasetMetaVar, IOptimizationContext context, List<Index> datasetIndexes,
+ int optFuncExprIndex, AccessMethodAnalysisContext analysisCtx) throws AlgebricksException {
+ List<LogicalVariable> varList = assignOp.getVariables();
+ for (int varIndex = 0; varIndex < varList.size(); varIndex++) {
+ LogicalVariable var = varList.get(varIndex);
+ int optVarIndex = optFuncExpr.findLogicalVar(var);
+ // No matching var in optFuncExpr.
+ if (optVarIndex == -1) {
+ continue;
+ }
+ // At this point we have matched the optimizable func
+ // expr at optFuncExprIndex to an assigned variable.
+ // Remember matching subtree.
+ optFuncExpr.setOptimizableSubTree(optVarIndex, subTree);
+
+ List<String> fieldName = getFieldNameFromSubTree(optFuncExpr, subTree, assignOrUnnestIndex, varIndex,
+ subTree.recordType, optVarIndex,
+ optFuncExpr.getFuncExpr().getArguments().get(optVarIndex).getValue(), datasetRecordVar,
+ subTree.metaRecordType, datasetMetaVar);
+ if (fieldName == null) {
+ continue;
+ }
+ IAType fieldType = (IAType) context.getOutputTypeEnvironment(assignOp)
+ .getType(optFuncExpr.getLogicalExpr(optVarIndex));
+ // Set the fieldName in the corresponding matched
+ // function expression.
+ optFuncExpr.setFieldName(optVarIndex, fieldName);
+ optFuncExpr.setFieldType(optVarIndex, fieldType);
+
+ setTypeTag(context, subTree, optFuncExpr, optVarIndex);
+ if (subTree.hasDataSource()) {
+ fillIndexExprs(datasetIndexes, fieldName, fieldType, optFuncExpr, optFuncExprIndex, optVarIndex,
+ subTree, analysisCtx);
+ }
+ }
+ }
+
private void matchVarsFromOptFuncExprToDataSourceScan(IOptimizableFuncExpr optFuncExpr, int optFuncExprIndex,
List<Index> datasetIndexes, List<LogicalVariable> dsVarList, OptimizableOperatorSubTree subTree,
AccessMethodAnalysisContext analysisCtx, IOptimizationContext context, boolean fromAdditionalDataSource)
- throws AlgebricksException {
+ throws AlgebricksException {
for (int varIndex = 0; varIndex < dsVarList.size(); varIndex++) {
LogicalVariable var = dsVarList.get(varIndex);
int funcVarIndex = optFuncExpr.findLogicalVar(var);
@@ -640,7 +665,8 @@
*/
protected List<String> getFieldNameFromSubTree(IOptimizableFuncExpr optFuncExpr, OptimizableOperatorSubTree subTree,
int opIndex, int assignVarIndex, ARecordType recordType, int funcVarIndex,
- ILogicalExpression parentFuncExpr) throws AlgebricksException {
+ ILogicalExpression parentFuncExpr, LogicalVariable recordVar, ARecordType metaType, LogicalVariable metaVar)
+ throws AlgebricksException {
// Get expression corresponding to opVar at varIndex.
AbstractLogicalExpression expr = null;
AbstractFunctionCallExpression childFuncExpr = null;
@@ -705,6 +731,8 @@
isByName = true;
}
if (isFieldAccess) {
+ LogicalVariable sourceVar = ((VariableReferenceExpression) funcExpr.getArguments().get(0).getValue())
+ .getVariableReference();
optFuncExpr.setLogicalExpr(funcVarIndex, parentFuncExpr);
int[] assignAndExpressionIndexes = null;
@@ -743,7 +771,7 @@
//Recursive call on nested assign
List<String> parentFieldNames = getFieldNameFromSubTree(optFuncExpr, subTree,
assignAndExpressionIndexes[0], assignAndExpressionIndexes[1], recordType, funcVarIndex,
- parentFuncExpr);
+ parentFuncExpr, recordVar, metaType, metaVar);
if (parentFieldNames == null) {
//Nested assign was not a field access.
@@ -752,8 +780,9 @@
}
if (!isByName) {
- fieldName = ((ARecordType) recordType.getSubFieldType(parentFieldNames))
- .getFieldNames()[fieldIndex];
+ fieldName = sourceVar.equals(metaVar)
+ ? ((ARecordType) metaType.getSubFieldType(parentFieldNames)).getFieldNames()[fieldIndex]
+ : ((ARecordType) recordType.getSubFieldType(parentFieldNames)).getFieldNames()[fieldIndex];
}
optFuncExpr.setSourceVar(funcVarIndex, ((AssignOperator) op).getVariables().get(assignVarIndex));
//add fieldName to the nested fieldName, return
@@ -773,9 +802,10 @@
if (nestedAccessFieldName != null) {
return nestedAccessFieldName;
}
- return new ArrayList<String>(Arrays.asList(fieldName));
+ return new ArrayList<>(Arrays.asList(fieldName));
}
- return new ArrayList<String>(Arrays.asList(recordType.getFieldNames()[fieldIndex]));
+ return new ArrayList<>(Arrays.asList(sourceVar.equals(metaVar) ? metaType.getFieldNames()[fieldIndex]
+ : recordType.getFieldNames()[fieldIndex]));
}
@@ -806,7 +836,7 @@
if (var.equals(curVar)) {
optFuncExpr.setSourceVar(funcVarIndex, var);
return getFieldNameFromSubTree(optFuncExpr, subTree, assignOrUnnestIndex, varIndex, recordType,
- funcVarIndex, childFuncExpr);
+ funcVarIndex, childFuncExpr, recordVar, metaType, metaVar);
}
}
} else {
@@ -814,7 +844,7 @@
LogicalVariable var = unnestOp.getVariable();
if (var.equals(curVar)) {
getFieldNameFromSubTree(optFuncExpr, subTree, assignOrUnnestIndex, 0, recordType, funcVarIndex,
- childFuncExpr);
+ childFuncExpr, recordVar, metaType, metaVar);
}
}
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index bfa29ec..0040b61 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -135,7 +135,6 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
@@ -216,19 +215,19 @@
ILogicalOperator etsOp = new EmptyTupleSourceOperator();
// Add a logical variable for the record.
- List<LogicalVariable> payloadVars = new ArrayList<LogicalVariable>();
+ List<LogicalVariable> payloadVars = new ArrayList<>();
payloadVars.add(context.newVar());
// Create a scan operator and make the empty tuple source its input
DataSourceScanOperator dssOp = new DataSourceScanOperator(payloadVars, lds);
- dssOp.getInputs().add(new MutableObject<ILogicalOperator>(etsOp));
+ dssOp.getInputs().add(new MutableObject<>(etsOp));
ILogicalExpression payloadExpr = new VariableReferenceExpression(payloadVars.get(0));
- Mutable<ILogicalExpression> payloadRef = new MutableObject<ILogicalExpression>(payloadExpr);
+ Mutable<ILogicalExpression> payloadRef = new MutableObject<>(payloadExpr);
// Creating the assign to extract the PK out of the record
- ArrayList<LogicalVariable> pkVars = new ArrayList<LogicalVariable>();
- ArrayList<Mutable<ILogicalExpression>> pkExprs = new ArrayList<Mutable<ILogicalExpression>>();
- List<Mutable<ILogicalExpression>> varRefsForLoading = new ArrayList<Mutable<ILogicalExpression>>();
+ ArrayList<LogicalVariable> pkVars = new ArrayList<>();
+ ArrayList<Mutable<ILogicalExpression>> pkExprs = new ArrayList<>();
+ List<Mutable<ILogicalExpression>> varRefsForLoading = new ArrayList<>();
LogicalVariable payloadVar = payloadVars.get(0);
for (List<String> keyFieldName : partitionKeys) {
PlanTranslationUtil.prepareVarAndExpression(keyFieldName, payloadVar, pkVars, pkExprs, varRefsForLoading,
@@ -236,11 +235,11 @@
}
AssignOperator assign = new AssignOperator(pkVars, pkExprs);
- assign.getInputs().add(new MutableObject<ILogicalOperator>(dssOp));
+ assign.getInputs().add(new MutableObject<>(dssOp));
// If the input is pre-sorted, we set the ordering property explicitly in the assign
if (clffs.alreadySorted()) {
- List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
+ List<OrderColumn> orderColumns = new ArrayList<>();
for (int i = 0; i < pkVars.size(); ++i) {
orderColumns.add(new OrderColumn(pkVars.get(i), OrderKind.ASC));
}
@@ -253,9 +252,9 @@
List<Mutable<ILogicalExpression>> additionalFilteringExpressions = null;
AssignOperator additionalFilteringAssign = null;
if (additionalFilteringField != null) {
- additionalFilteringVars = new ArrayList<LogicalVariable>();
- additionalFilteringAssignExpressions = new ArrayList<Mutable<ILogicalExpression>>();
- additionalFilteringExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+ additionalFilteringVars = new ArrayList<>();
+ additionalFilteringAssignExpressions = new ArrayList<>();
+ additionalFilteringExpressions = new ArrayList<>();
PlanTranslationUtil.prepareVarAndExpression(additionalFilteringField, payloadVar, additionalFilteringVars,
additionalFilteringAssignExpressions, additionalFilteringExpressions, context);
additionalFilteringAssign = new AssignOperator(additionalFilteringVars,
@@ -267,15 +266,15 @@
insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
if (additionalFilteringAssign != null) {
- additionalFilteringAssign.getInputs().add(new MutableObject<ILogicalOperator>(assign));
- insertOp.getInputs().add(new MutableObject<ILogicalOperator>(additionalFilteringAssign));
+ additionalFilteringAssign.getInputs().add(new MutableObject<>(assign));
+ insertOp.getInputs().add(new MutableObject<>(additionalFilteringAssign));
} else {
- insertOp.getInputs().add(new MutableObject<ILogicalOperator>(assign));
+ insertOp.getInputs().add(new MutableObject<>(assign));
}
ILogicalOperator leafOperator = new SinkOperator();
- leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(insertOp));
- return new ALogicalPlanImpl(new MutableObject<ILogicalOperator>(leafOperator));
+ leafOperator.getInputs().add(new MutableObject<>(insertOp));
+ return new ALogicalPlanImpl(new MutableObject<>(leafOperator));
}
@SuppressWarnings("unchecked")
@@ -283,8 +282,8 @@
public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt)
throws AlgebricksException, AsterixException {
Pair<ILogicalOperator, LogicalVariable> p = expr.accept(this,
- new MutableObject<ILogicalOperator>(new EmptyTupleSourceOperator()));
- ArrayList<Mutable<ILogicalOperator>> globalPlanRoots = new ArrayList<Mutable<ILogicalOperator>>();
+ new MutableObject<>(new EmptyTupleSourceOperator()));
+ ArrayList<Mutable<ILogicalOperator>> globalPlanRoots = new ArrayList<>();
ILogicalOperator topOp = p.first;
ProjectOperator project = (ProjectOperator) topOp;
LogicalVariable unnestVar = project.getVariables().get(0);
@@ -297,12 +296,12 @@
}
metadataProvider.setOutputFile(outputFileSplit);
- List<Mutable<ILogicalExpression>> writeExprList = new ArrayList<Mutable<ILogicalExpression>>(1);
- writeExprList.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(resVar)));
+ List<Mutable<ILogicalExpression>> writeExprList = new ArrayList<>(1);
+ writeExprList.add(new MutableObject<>(new VariableReferenceExpression(resVar)));
ResultSetSinkId rssId = new ResultSetSinkId(metadataProvider.getResultSetId());
ResultSetDataSink sink = new ResultSetDataSink(rssId, null);
topOp = new DistributeResultOperator(writeExprList, sink);
- topOp.getInputs().add(new MutableObject<ILogicalOperator>(project));
+ topOp.getInputs().add(new MutableObject<>(project));
// Retrieve the Output RecordType (if any) and store it on
// the DistributeResultOperator
@@ -318,11 +317,10 @@
LogicalVariable seqVar = context.newVar();
/** This assign adds a marker function collection-to-sequence: if the input is a singleton collection, unnest it; otherwise do nothing. */
AssignOperator assignCollectionToSequence = new AssignOperator(seqVar,
- new MutableObject<ILogicalExpression>(new ScalarFunctionCallExpression(
+ new MutableObject<>(new ScalarFunctionCallExpression(
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.COLLECTION_TO_SEQUENCE),
- new MutableObject<ILogicalExpression>(new VariableReferenceExpression(resVar)))));
- assignCollectionToSequence.getInputs()
- .add(new MutableObject<ILogicalOperator>(project.getInputs().get(0).getValue()));
+ new MutableObject<>(new VariableReferenceExpression(resVar)))));
+ assignCollectionToSequence.getInputs().add(new MutableObject<>(project.getInputs().get(0).getValue()));
project.getInputs().get(0).setValue(assignCollectionToSequence);
project.getVariables().set(0, seqVar);
resVar = seqVar;
@@ -330,9 +328,9 @@
stmt.getDatasetName());
List<Integer> keySourceIndicator = ((InternalDatasetDetails) targetDatasource.getDataset()
.getDatasetDetails()).getKeySourceIndicator();
- ArrayList<LogicalVariable> vars = new ArrayList<LogicalVariable>();
- ArrayList<Mutable<ILogicalExpression>> exprs = new ArrayList<Mutable<ILogicalExpression>>();
- List<Mutable<ILogicalExpression>> varRefsForLoading = new ArrayList<Mutable<ILogicalExpression>>();
+ ArrayList<LogicalVariable> vars = new ArrayList<>();
+ ArrayList<Mutable<ILogicalExpression>> exprs = new ArrayList<>();
+ List<Mutable<ILogicalExpression>> varRefsForLoading = new ArrayList<>();
List<List<String>> partitionKeys = DatasetUtils.getPartitioningKeys(targetDatasource.getDataset());
int numOfPrimaryKeys = partitionKeys.size();
for (int i = 0; i < numOfPrimaryKeys; i++) {
@@ -354,23 +352,22 @@
List<Mutable<ILogicalExpression>> additionalFilteringExpressions = null;
AssignOperator additionalFilteringAssign = null;
if (additionalFilteringField != null) {
- additionalFilteringVars = new ArrayList<LogicalVariable>();
- additionalFilteringAssignExpressions = new ArrayList<Mutable<ILogicalExpression>>();
- additionalFilteringExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+ additionalFilteringVars = new ArrayList<>();
+ additionalFilteringAssignExpressions = new ArrayList<>();
+ additionalFilteringExpressions = new ArrayList<>();
PlanTranslationUtil.prepareVarAndExpression(additionalFilteringField, resVar, additionalFilteringVars,
additionalFilteringAssignExpressions, additionalFilteringExpressions, context);
additionalFilteringAssign = new AssignOperator(additionalFilteringVars,
additionalFilteringAssignExpressions);
- additionalFilteringAssign.getInputs().add(new MutableObject<ILogicalOperator>(project));
- assign.getInputs().add(new MutableObject<ILogicalOperator>(additionalFilteringAssign));
+ additionalFilteringAssign.getInputs().add(new MutableObject<>(project));
+ assign.getInputs().add(new MutableObject<>(additionalFilteringAssign));
} else {
- assign.getInputs().add(new MutableObject<ILogicalOperator>(project));
+ assign.getInputs().add(new MutableObject<>(project));
}
- Mutable<ILogicalExpression> varRef = new MutableObject<ILogicalExpression>(
- new VariableReferenceExpression(resVar));
+ Mutable<ILogicalExpression> varRef = new MutableObject<>(new VariableReferenceExpression(resVar));
ILogicalOperator leafOperator = null;
switch (stmt.getKind()) {
@@ -382,9 +379,9 @@
InsertDeleteUpsertOperator insertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef,
varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, false);
insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
- insertOp.getInputs().add(new MutableObject<ILogicalOperator>(assign));
+ insertOp.getInputs().add(new MutableObject<>(assign));
leafOperator = new SinkOperator();
- leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(insertOp));
+ leafOperator.getInputs().add(new MutableObject<>(insertOp));
break;
}
case UPSERT: {
@@ -395,7 +392,7 @@
InsertDeleteUpsertOperator upsertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef,
varRefsForLoading, InsertDeleteUpsertOperator.Kind.UPSERT, false);
upsertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
- upsertOp.getInputs().add(new MutableObject<ILogicalOperator>(assign));
+ upsertOp.getInputs().add(new MutableObject<>(assign));
// Create and add a new variable used for representing the original record
ARecordType recordType = (ARecordType) targetDatasource.getItemType();
upsertOp.setPrevRecordVar(context.newVar());
@@ -405,7 +402,7 @@
upsertOp.setPrevFilterType(recordType.getFieldType(additionalFilteringField.get(0)));
}
leafOperator = new SinkOperator();
- leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(upsertOp));
+ leafOperator.getInputs().add(new MutableObject<>(upsertOp));
break;
}
case DELETE: {
@@ -416,18 +413,18 @@
InsertDeleteUpsertOperator deleteOp = new InsertDeleteUpsertOperator(targetDatasource, varRef,
varRefsForLoading, InsertDeleteUpsertOperator.Kind.DELETE, false);
deleteOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
- deleteOp.getInputs().add(new MutableObject<ILogicalOperator>(assign));
+ deleteOp.getInputs().add(new MutableObject<>(assign));
leafOperator = new SinkOperator();
- leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(deleteOp));
+ leafOperator.getInputs().add(new MutableObject<>(deleteOp));
break;
}
case CONNECT_FEED: {
InsertDeleteUpsertOperator insertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef,
varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, false);
insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
- insertOp.getInputs().add(new MutableObject<ILogicalOperator>(assign));
+ insertOp.getInputs().add(new MutableObject<>(assign));
leafOperator = new SinkOperator();
- leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(insertOp));
+ leafOperator.getInputs().add(new MutableObject<>(insertOp));
break;
}
case SUBSCRIBE_FEED: {
@@ -448,14 +445,13 @@
// add the meta function
IFunctionInfo finfoMeta = FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.META);
ScalarFunctionCallExpression metaFunction = new ScalarFunctionCallExpression(finfoMeta,
- new MutableObject<ILogicalExpression>(new VariableReferenceExpression(unnestVar)));
+ new MutableObject<>(new VariableReferenceExpression(unnestVar)));
// create assign for the meta part
LogicalVariable metaVar = context.newVar();
metaExpSingletonList = new ArrayList<>(1);
- metaExpSingletonList
- .add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(metaVar)));
+ metaExpSingletonList.add(new MutableObject<>(new VariableReferenceExpression(metaVar)));
metaAndKeysVars.add(metaVar);
- metaAndKeysExprs.add(new MutableObject<ILogicalExpression>(metaFunction));
+ metaAndKeysExprs.add(new MutableObject<>(metaFunction));
project.getVariables().add(metaVar);
}
if (isChangeFeed) {
@@ -467,10 +463,9 @@
funcCall.substituteVar(resVar, unnestVar);
LogicalVariable pkVar = context.newVar();
metaAndKeysVars.add(pkVar);
- metaAndKeysExprs.add(new MutableObject<ILogicalExpression>(assignExpr.getValue()));
+ metaAndKeysExprs.add(new MutableObject<>(assignExpr.getValue()));
project.getVariables().add(pkVar);
- varRefsForLoading.add(
- new MutableObject<ILogicalExpression>(new VariableReferenceExpression(pkVar)));
+ varRefsForLoading.add(new MutableObject<>(new VariableReferenceExpression(pkVar)));
}
}
// A change feed, we don't need the assign to access PKs
@@ -479,6 +474,15 @@
// Create and add a new variable used for representing the original record
feedModificationOp.setPrevRecordVar(context.newVar());
feedModificationOp.setPrevRecordType(targetDatasource.getItemType());
+ if (targetDatasource.getDataset().hasMetaPart()) {
+ List<LogicalVariable> metaVars = new ArrayList<>();
+ metaVars.add(context.newVar());
+ feedModificationOp.setPrevAdditionalNonFilteringVars(metaVars);
+ List<Object> metaTypes = new ArrayList<>();
+ metaTypes.add(targetDatasource.getMetaItemType());
+ feedModificationOp.setPrevAdditionalNonFilteringTypes(metaTypes);
+ }
+
if (additionalFilteringField != null) {
feedModificationOp.setPrevFilterVar(context.newVar());
feedModificationOp.setPrevFilterType(((ARecordType) targetDatasource.getItemType())
@@ -492,16 +496,16 @@
} else {
feedModificationOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
metaExpSingletonList, InsertDeleteUpsertOperator.Kind.INSERT, false);
- feedModificationOp.getInputs().add(new MutableObject<ILogicalOperator>(assign));
+ feedModificationOp.getInputs().add(new MutableObject<>(assign));
}
if (targetDatasource.getDataset().hasMetaPart() || isChangeFeed) {
metaAndKeysAssign = new AssignOperator(metaAndKeysVars, metaAndKeysExprs);
metaAndKeysAssign.getInputs().add(project.getInputs().get(0));
- project.getInputs().set(0, new MutableObject<ILogicalOperator>(metaAndKeysAssign));
+ project.getInputs().set(0, new MutableObject<>(metaAndKeysAssign));
}
feedModificationOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
leafOperator = new SinkOperator();
- leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(feedModificationOp));
+ leafOperator.getInputs().add(new MutableObject<>(feedModificationOp));
break;
}
default:
@@ -509,7 +513,7 @@
}
topOp = leafOperator;
}
- globalPlanRoots.add(new MutableObject<ILogicalOperator>(topOp));
+ globalPlanRoots.add(new MutableObject<>(topOp));
ILogicalPlan plan = new ALogicalPlanImpl(globalPlanRoots);
eliminateSharedOperatorReferenceForPlan(plan);
return plan;
@@ -552,8 +556,7 @@
case VARIABLE_EXPRESSION: {
v = context.newVar(lc.getVarExpr());
LogicalVariable prev = context.getVar(((VariableExpr) lc.getBindingExpr()).getVar().getId());
- returnedOp = new AssignOperator(v,
- new MutableObject<ILogicalExpression>(new VariableReferenceExpression(prev)));
+ returnedOp = new AssignOperator(v, new MutableObject<>(new VariableReferenceExpression(prev)));
returnedOp.getInputs().add(tupSource);
break;
}
@@ -561,12 +564,12 @@
v = context.newVar(lc.getVarExpr());
Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo = langExprToAlgExpression(lc.getBindingExpr(),
tupSource);
- returnedOp = new AssignOperator(v, new MutableObject<ILogicalExpression>(eo.first));
+ returnedOp = new AssignOperator(v, new MutableObject<>(eo.first));
returnedOp.getInputs().add(eo.second);
break;
}
}
- return new Pair<ILogicalOperator, LogicalVariable>(returnedOp, v);
+ return new Pair<>(returnedOp, v);
}
@Override
@@ -576,13 +579,13 @@
LogicalVariable v = context.newVar();
AbstractFunctionCallExpression fldAccess = new ScalarFunctionCallExpression(
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME));
- fldAccess.getArguments().add(new MutableObject<ILogicalExpression>(p.first));
+ fldAccess.getArguments().add(new MutableObject<>(p.first));
ILogicalExpression faExpr = new ConstantExpression(
new AsterixConstantValue(new AString(fa.getIdent().getValue())));
- fldAccess.getArguments().add(new MutableObject<ILogicalExpression>(faExpr));
- AssignOperator a = new AssignOperator(v, new MutableObject<ILogicalExpression>(fldAccess));
+ fldAccess.getArguments().add(new MutableObject<>(faExpr));
+ AssignOperator a = new AssignOperator(v, new MutableObject<>(fldAccess));
a.getInputs().add(p.second);
- return new Pair<ILogicalOperator, LogicalVariable>(a, v);
+ return new Pair<>(a, v);
}
@Override
@@ -594,17 +597,17 @@
if (ia.isAny()) {
f = new ScalarFunctionCallExpression(
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.ANY_COLLECTION_MEMBER));
- f.getArguments().add(new MutableObject<ILogicalExpression>(p.first));
+ f.getArguments().add(new MutableObject<>(p.first));
} else {
Pair<ILogicalExpression, Mutable<ILogicalOperator>> indexPair = langExprToAlgExpression(ia.getIndexExpr(),
tupSource);
f = new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.GET_ITEM));
- f.getArguments().add(new MutableObject<ILogicalExpression>(p.first));
- f.getArguments().add(new MutableObject<ILogicalExpression>(indexPair.first));
+ f.getArguments().add(new MutableObject<>(p.first));
+ f.getArguments().add(new MutableObject<>(indexPair.first));
}
- AssignOperator a = new AssignOperator(v, new MutableObject<ILogicalExpression>(f));
+ AssignOperator a = new AssignOperator(v, new MutableObject<>(f));
a.getInputs().add(p.second);
- return new Pair<ILogicalOperator, LogicalVariable>(a, v);
+ return new Pair<>(a, v);
}
@Override
@@ -612,26 +615,26 @@
throws AsterixException {
LogicalVariable v = context.newVar();
FunctionSignature signature = fcall.getFunctionSignature();
- List<Mutable<ILogicalExpression>> args = new ArrayList<Mutable<ILogicalExpression>>();
+ List<Mutable<ILogicalExpression>> args = new ArrayList<>();
Mutable<ILogicalOperator> topOp = tupSource;
for (Expression expr : fcall.getExprList()) {
switch (expr.getKind()) {
case VARIABLE_EXPRESSION: {
LogicalVariable var = context.getVar(((VariableExpr) expr).getVar().getId());
- args.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(var)));
+ args.add(new MutableObject<>(new VariableReferenceExpression(var)));
break;
}
case LITERAL_EXPRESSION: {
LiteralExpr val = (LiteralExpr) expr;
- args.add(new MutableObject<ILogicalExpression>(new ConstantExpression(
+ args.add(new MutableObject<>(new ConstantExpression(
new AsterixConstantValue(ConstantHelper.objectFromLiteral(val.getValue())))));
break;
}
default: {
Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo = langExprToAlgExpression(expr, topOp);
AbstractLogicalOperator o1 = (AbstractLogicalOperator) eo.second.getValue();
- args.add(new MutableObject<ILogicalExpression>(eo.first));
+ args.add(new MutableObject<>(eo.first));
if (o1 != null && !(o1.getOperatorTag() == LogicalOperatorTag.ASSIGN && hasOnlyChild(o1, topOp))) {
topOp = eo.second;
}
@@ -656,12 +659,12 @@
}
}
- AssignOperator op = new AssignOperator(v, new MutableObject<ILogicalExpression>(f));
+ AssignOperator op = new AssignOperator(v, new MutableObject<>(f));
if (topOp != null) {
op.getInputs().add(topOp);
}
- return new Pair<ILogicalOperator, LogicalVariable>(op, v);
+ return new Pair<>(op, v);
}
private AbstractFunctionCallExpression lookupUserDefinedFunction(FunctionSignature signature,
@@ -733,17 +736,17 @@
for (Pair<Expression, Identifier> groupField : groupFieldList) {
ILogicalExpression groupFieldNameExpr = langExprToAlgExpression(
new LiteralExpr(new StringLiteral(groupField.second.getValue())), topOp).first;
- groupRecordConstructorArgList.add(new MutableObject<ILogicalExpression>(groupFieldNameExpr));
+ groupRecordConstructorArgList.add(new MutableObject<>(groupFieldNameExpr));
ILogicalExpression groupFieldExpr = langExprToAlgExpression(groupField.first, topOp).first;
- groupRecordConstructorArgList.add(new MutableObject<ILogicalExpression>(groupFieldExpr));
+ groupRecordConstructorArgList.add(new MutableObject<>(groupFieldExpr));
}
LogicalVariable groupVar = context.newVar(gc.getGroupVar());
AssignOperator groupVarAssignOp = new AssignOperator(groupVar,
- new MutableObject<ILogicalExpression>(new ScalarFunctionCallExpression(
+ new MutableObject<>(new ScalarFunctionCallExpression(
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR),
groupRecordConstructorArgList)));
groupVarAssignOp.getInputs().add(topOp);
- topOp = new MutableObject<ILogicalOperator>(groupVarAssignOp);
+ topOp = new MutableObject<>(groupVarAssignOp);
}
if (gc.isGroupAll()) {
List<LogicalVariable> aggVars = new ArrayList<>();
@@ -751,19 +754,19 @@
for (VariableExpr var : gc.getWithVarList()) {
LogicalVariable aggVar = context.newVar();
LogicalVariable oldVar = context.getVar(var);
- List<Mutable<ILogicalExpression>> flArgs = new ArrayList<Mutable<ILogicalExpression>>();
- flArgs.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(oldVar)));
+ List<Mutable<ILogicalExpression>> flArgs = new ArrayList<>();
+ flArgs.add(new MutableObject<>(new VariableReferenceExpression(oldVar)));
AggregateFunctionCallExpression fListify = AsterixBuiltinFunctions
.makeAggregateFunctionExpression(AsterixBuiltinFunctions.LISTIFY, flArgs);
aggVars.add(aggVar);
- aggFuncs.add(new MutableObject<ILogicalExpression>(fListify));
+ aggFuncs.add(new MutableObject<>(fListify));
// Hide the variable that was part of the "with", replacing it with
// the one bound by the aggregation op.
context.setVar(var, aggVar);
}
AggregateOperator aggOp = new AggregateOperator(aggVars, aggFuncs);
aggOp.getInputs().add(topOp);
- return new Pair<ILogicalOperator, LogicalVariable>(aggOp, null);
+ return new Pair<>(aggOp, null);
} else {
GroupByOperator gOp = new GroupByOperator();
for (GbyVariableExpressionPair ve : gc.getGbyPairList()) {
@@ -795,23 +798,22 @@
for (VariableExpr var : gc.getWithVarList()) {
LogicalVariable aggVar = context.newVar();
LogicalVariable oldVar = context.getVar(var);
- List<Mutable<ILogicalExpression>> flArgs = new ArrayList<Mutable<ILogicalExpression>>(1);
- flArgs.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(oldVar)));
+ List<Mutable<ILogicalExpression>> flArgs = new ArrayList<>(1);
+ flArgs.add(new MutableObject<>(new VariableReferenceExpression(oldVar)));
AggregateFunctionCallExpression fListify = AsterixBuiltinFunctions
.makeAggregateFunctionExpression(AsterixBuiltinFunctions.LISTIFY, flArgs);
AggregateOperator agg = new AggregateOperator(mkSingletonArrayList(aggVar),
- (List) mkSingletonArrayList(new MutableObject<ILogicalExpression>(fListify)));
+ (List) mkSingletonArrayList(new MutableObject<>(fListify)));
- agg.getInputs().add(new MutableObject<ILogicalOperator>(
- new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(gOp))));
- ILogicalPlan plan = new ALogicalPlanImpl(new MutableObject<ILogicalOperator>(agg));
+ agg.getInputs().add(new MutableObject<>(new NestedTupleSourceOperator(new MutableObject<>(gOp))));
+ ILogicalPlan plan = new ALogicalPlanImpl(new MutableObject<>(agg));
gOp.getNestedPlans().add(plan);
// Hide the variable that was part of the "with", replacing it with
// the one bound by the aggregation op.
context.setVar(var, aggVar);
}
gOp.getAnnotations().put(OperatorAnnotations.USE_HASH_GROUP_BY, gc.hasHashGroupByHint());
- return new Pair<ILogicalOperator, LogicalVariable>(gOp, null);
+ return new Pair<>(gOp, null);
}
}
@@ -833,54 +835,53 @@
//Creates a subplan for the "then" branch.
Pair<ILogicalOperator, LogicalVariable> opAndVarForThen = constructSubplanOperatorForBranch(pCond.first,
- new MutableObject<ILogicalExpression>(new VariableReferenceExpression(varCond)), ifexpr.getThenExpr());
+ new MutableObject<>(new VariableReferenceExpression(varCond)), ifexpr.getThenExpr());
// Creates a subplan for the "else" branch.
AbstractFunctionCallExpression notVarCond = new ScalarFunctionCallExpression(
- FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.NOT), Collections.singletonList(
- new MutableObject<ILogicalExpression>(new VariableReferenceExpression(varCond))));
+ FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.NOT),
+ Collections.singletonList(new MutableObject<>(new VariableReferenceExpression(varCond))));
Pair<ILogicalOperator, LogicalVariable> opAndVarForElse = constructSubplanOperatorForBranch(
- opAndVarForThen.first, new MutableObject<ILogicalExpression>(notVarCond), ifexpr.getElseExpr());
+ opAndVarForThen.first, new MutableObject<>(notVarCond), ifexpr.getElseExpr());
// Uses switch-case function to select the results of two branches.
LogicalVariable selectVar = context.newVar();
List<Mutable<ILogicalExpression>> arguments = new ArrayList<>();
- arguments.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(varCond)));
- arguments.add(new MutableObject<ILogicalExpression>(ConstantExpression.TRUE));
- arguments.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(opAndVarForThen.second)));
- arguments.add(new MutableObject<ILogicalExpression>(ConstantExpression.FALSE));
- arguments.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(opAndVarForElse.second)));
+ arguments.add(new MutableObject<>(new VariableReferenceExpression(varCond)));
+ arguments.add(new MutableObject<>(ConstantExpression.TRUE));
+ arguments.add(new MutableObject<>(new VariableReferenceExpression(opAndVarForThen.second)));
+ arguments.add(new MutableObject<>(ConstantExpression.FALSE));
+ arguments.add(new MutableObject<>(new VariableReferenceExpression(opAndVarForElse.second)));
AbstractFunctionCallExpression swithCaseExpr = new ScalarFunctionCallExpression(
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SWITCH_CASE), arguments);
- AssignOperator assignOp = new AssignOperator(selectVar, new MutableObject<ILogicalExpression>(swithCaseExpr));
- assignOp.getInputs().add(new MutableObject<ILogicalOperator>(opAndVarForElse.first));
+ AssignOperator assignOp = new AssignOperator(selectVar, new MutableObject<>(swithCaseExpr));
+ assignOp.getInputs().add(new MutableObject<>(opAndVarForElse.first));
// Unnests the selected ("if" or "else") result.
LogicalVariable unnestVar = context.newVar();
UnnestOperator unnestOp = new UnnestOperator(unnestVar,
- new MutableObject<ILogicalExpression>(new UnnestingFunctionCallExpression(
- FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION),
- Collections.singletonList(
- new MutableObject<ILogicalExpression>(new VariableReferenceExpression(selectVar))))));
- unnestOp.getInputs().add(new MutableObject<ILogicalOperator>(assignOp));
+ new MutableObject<>(new UnnestingFunctionCallExpression(
+ FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION), Collections
+ .singletonList(new MutableObject<>(new VariableReferenceExpression(selectVar))))));
+ unnestOp.getInputs().add(new MutableObject<>(assignOp));
// Produces the final result.
LogicalVariable resultVar = context.newVar();
AssignOperator finalAssignOp = new AssignOperator(resultVar,
- new MutableObject<ILogicalExpression>(new VariableReferenceExpression(unnestVar)));
- finalAssignOp.getInputs().add(new MutableObject<ILogicalOperator>(unnestOp));
- return new Pair<ILogicalOperator, LogicalVariable>(finalAssignOp, resultVar);
+ new MutableObject<>(new VariableReferenceExpression(unnestVar)));
+ finalAssignOp.getInputs().add(new MutableObject<>(unnestOp));
+ return new Pair<>(finalAssignOp, resultVar);
}
@Override
public Pair<ILogicalOperator, LogicalVariable> visit(LiteralExpr l, Mutable<ILogicalOperator> tupSource) {
LogicalVariable var = context.newVar();
- AssignOperator a = new AssignOperator(var, new MutableObject<ILogicalExpression>(
+ AssignOperator a = new AssignOperator(var, new MutableObject<>(
new ConstantExpression(new AsterixConstantValue(ConstantHelper.objectFromLiteral(l.getValue())))));
if (tupSource != null) {
a.getInputs().add(tupSource);
}
- return new Pair<ILogicalOperator, LogicalVariable>(a, var);
+ return new Pair<>(a, var);
}
@Override
@@ -910,7 +911,7 @@
// chain the operators
if (i == 0) {
- c.getArguments().add(new MutableObject<ILogicalExpression>(e));
+ c.getArguments().add(new MutableObject<>(e));
currExpr = c;
if (op.isBroadcastOperand(i)) {
BroadcastExpressionAnnotation bcast = new BroadcastExpressionAnnotation();
@@ -918,9 +919,8 @@
c.getAnnotations().put(BroadcastExpressionAnnotation.BROADCAST_ANNOTATION_KEY, bcast);
}
} else {
- ((AbstractFunctionCallExpression) currExpr).getArguments()
- .add(new MutableObject<ILogicalExpression>(e));
- c.getArguments().add(new MutableObject<ILogicalExpression>(currExpr));
+ ((AbstractFunctionCallExpression) currExpr).getArguments().add(new MutableObject<>(e));
+ c.getArguments().add(new MutableObject<>(currExpr));
currExpr = c;
if (i == 1 && op.isBroadcastOperand(i)) {
BroadcastExpressionAnnotation bcast = new BroadcastExpressionAnnotation();
@@ -932,18 +932,16 @@
AbstractFunctionCallExpression f = createFunctionCallExpressionForBuiltinOperator(ops.get(i));
if (i == 0) {
- f.getArguments().add(new MutableObject<ILogicalExpression>(e));
+ f.getArguments().add(new MutableObject<>(e));
currExpr = f;
} else {
- ((AbstractFunctionCallExpression) currExpr).getArguments()
- .add(new MutableObject<ILogicalExpression>(e));
- f.getArguments().add(new MutableObject<ILogicalExpression>(currExpr));
+ ((AbstractFunctionCallExpression) currExpr).getArguments().add(new MutableObject<>(e));
+ f.getArguments().add(new MutableObject<>(currExpr));
currExpr = f;
}
}
} else { // don't forget the last expression...
- ((AbstractFunctionCallExpression) currExpr).getArguments()
- .add(new MutableObject<ILogicalExpression>(e));
+ ((AbstractFunctionCallExpression) currExpr).getArguments().add(new MutableObject<>(e));
if (i == 1 && op.isBroadcastOperand(i)) {
BroadcastExpressionAnnotation bcast = new BroadcastExpressionAnnotation();
bcast.setObject(BroadcastSide.RIGHT);
@@ -962,11 +960,11 @@
}
LogicalVariable assignedVar = context.newVar();
- AssignOperator a = new AssignOperator(assignedVar, new MutableObject<ILogicalExpression>(currExpr));
+ AssignOperator a = new AssignOperator(assignedVar, new MutableObject<>(currExpr));
a.getInputs().add(topOp);
- return new Pair<ILogicalOperator, LogicalVariable>(a, assignedVar);
+ return new Pair<>(a, assignedVar);
}
@Override
@@ -979,8 +977,7 @@
Pair<ILogicalExpression, Mutable<ILogicalOperator>> p = langExprToAlgExpression(e, topOp);
OrderModifier m = modifIter.next();
OrderOperator.IOrder comp = (m == OrderModifier.ASC) ? OrderOperator.ASC_ORDER : OrderOperator.DESC_ORDER;
- ord.getOrderExpressions().add(new Pair<IOrder, Mutable<ILogicalExpression>>(comp,
- new MutableObject<ILogicalExpression>(p.first)));
+ ord.getOrderExpressions().add(new Pair<>(comp, new MutableObject<>(p.first)));
topOp = p.second;
}
ord.getInputs().add(topOp);
@@ -996,7 +993,7 @@
RangeMapBuilder.verifyRangeOrder(oc.getRangeMap(), ascending);
ord.getAnnotations().put(OperatorAnnotations.USE_RANGE_CONNECTOR, oc.getRangeMap());
}
- return new Pair<ILogicalOperator, LogicalVariable>(ord, null);
+ return new Pair<>(ord, null);
}
@Override
@@ -1011,8 +1008,7 @@
Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo1 = langExprToAlgExpression(qt.getExpr(), topOp);
topOp = eo1.second;
LogicalVariable uVar = context.newVar(qt.getVarExpr());
- ILogicalOperator u = new UnnestOperator(uVar,
- new MutableObject<ILogicalExpression>(makeUnnestExpression(eo1.first)));
+ ILogicalOperator u = new UnnestOperator(uVar, new MutableObject<>(makeUnnestExpression(eo1.first)));
if (firstOp == null) {
firstOp = u;
@@ -1020,7 +1016,7 @@
if (lastOp != null) {
u.getInputs().add(lastOp);
}
- lastOp = new MutableObject<ILogicalOperator>(u);
+ lastOp = new MutableObject<>(u);
}
// We make all the unnest correspond. to quantif. vars. sit on top
@@ -1033,24 +1029,24 @@
AggregateFunctionCallExpression fAgg;
SelectOperator s;
if (qe.getQuantifier() == Quantifier.SOME) {
- s = new SelectOperator(new MutableObject<ILogicalExpression>(eo2.first), false, null);
+ s = new SelectOperator(new MutableObject<>(eo2.first), false, null);
s.getInputs().add(eo2.second);
fAgg = AsterixBuiltinFunctions.makeAggregateFunctionExpression(AsterixBuiltinFunctions.NON_EMPTY_STREAM,
- new ArrayList<Mutable<ILogicalExpression>>());
+ new ArrayList<>());
} else { // EVERY
- List<Mutable<ILogicalExpression>> satExprList = new ArrayList<Mutable<ILogicalExpression>>(1);
- satExprList.add(new MutableObject<ILogicalExpression>(eo2.first));
- s = new SelectOperator(new MutableObject<ILogicalExpression>(new ScalarFunctionCallExpression(
+ List<Mutable<ILogicalExpression>> satExprList = new ArrayList<>(1);
+ satExprList.add(new MutableObject<>(eo2.first));
+ s = new SelectOperator(new MutableObject<>(new ScalarFunctionCallExpression(
FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.NOT), satExprList)), false, null);
s.getInputs().add(eo2.second);
fAgg = AsterixBuiltinFunctions.makeAggregateFunctionExpression(AsterixBuiltinFunctions.EMPTY_STREAM,
- new ArrayList<Mutable<ILogicalExpression>>());
+ new ArrayList<>());
}
LogicalVariable qeVar = context.newVar();
AggregateOperator a = new AggregateOperator(mkSingletonArrayList(qeVar),
- (List) mkSingletonArrayList(new MutableObject<ILogicalExpression>(fAgg)));
- a.getInputs().add(new MutableObject<ILogicalOperator>(s));
- return new Pair<ILogicalOperator, LogicalVariable>(a, qeVar);
+ (List) mkSingletonArrayList(new MutableObject<>(fAgg)));
+ a.getInputs().add(new MutableObject<>(s));
+ return new Pair<>(a, qeVar);
}
@Override
@@ -1065,18 +1061,18 @@
AbstractFunctionCallExpression f = new ScalarFunctionCallExpression(
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR));
LogicalVariable v1 = context.newVar();
- AssignOperator a = new AssignOperator(v1, new MutableObject<ILogicalExpression>(f));
+ AssignOperator a = new AssignOperator(v1, new MutableObject<>(f));
Mutable<ILogicalOperator> topOp = tupSource;
for (FieldBinding fb : rc.getFbList()) {
Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo1 = langExprToAlgExpression(fb.getLeftExpr(), topOp);
- f.getArguments().add(new MutableObject<ILogicalExpression>(eo1.first));
+ f.getArguments().add(new MutableObject<>(eo1.first));
topOp = eo1.second;
Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo2 = langExprToAlgExpression(fb.getRightExpr(), topOp);
- f.getArguments().add(new MutableObject<ILogicalExpression>(eo2.first));
+ f.getArguments().add(new MutableObject<>(eo2.first));
topOp = eo2.second;
}
a.getInputs().add(topOp);
- return new Pair<ILogicalOperator, LogicalVariable>(a, v1);
+ return new Pair<>(a, v1);
}
@Override
@@ -1086,15 +1082,15 @@
? AsterixBuiltinFunctions.ORDERED_LIST_CONSTRUCTOR : AsterixBuiltinFunctions.UNORDERED_LIST_CONSTRUCTOR;
AbstractFunctionCallExpression f = new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(fid));
LogicalVariable v1 = context.newVar();
- AssignOperator a = new AssignOperator(v1, new MutableObject<ILogicalExpression>(f));
+ AssignOperator a = new AssignOperator(v1, new MutableObject<>(f));
Mutable<ILogicalOperator> topOp = tupSource;
for (Expression expr : lc.getExprList()) {
Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo = langExprToAlgExpression(expr, topOp);
- f.getArguments().add(new MutableObject<ILogicalExpression>(eo.first));
+ f.getArguments().add(new MutableObject<>(eo.first));
topOp = eo.second;
}
a.getInputs().add(topOp);
- return new Pair<ILogicalOperator, LogicalVariable>(a, v1);
+ return new Pair<>(a, v1);
}
@Override
@@ -1105,15 +1101,15 @@
LogicalVariable v1 = context.newVar();
AssignOperator a;
if (u.getSign() == Sign.POSITIVE) {
- a = new AssignOperator(v1, new MutableObject<ILogicalExpression>(eo.first));
+ a = new AssignOperator(v1, new MutableObject<>(eo.first));
} else {
AbstractFunctionCallExpression m = new ScalarFunctionCallExpression(
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.NUMERIC_UNARY_MINUS));
- m.getArguments().add(new MutableObject<ILogicalExpression>(eo.first));
- a = new AssignOperator(v1, new MutableObject<ILogicalExpression>(m));
+ m.getArguments().add(new MutableObject<>(eo.first));
+ a = new AssignOperator(v1, new MutableObject<>(m));
}
a.getInputs().add(eo.second);
- return new Pair<ILogicalOperator, LogicalVariable>(a, v1);
+ return new Pair<>(a, v1);
}
@Override
@@ -1121,19 +1117,18 @@
// Should we ever get to this method?
LogicalVariable var = context.newVar();
LogicalVariable oldV = context.getVar(v.getVar().getId());
- AssignOperator a = new AssignOperator(var,
- new MutableObject<ILogicalExpression>(new VariableReferenceExpression(oldV)));
+ AssignOperator a = new AssignOperator(var, new MutableObject<>(new VariableReferenceExpression(oldV)));
a.getInputs().add(tupSource);
- return new Pair<ILogicalOperator, LogicalVariable>(a, var);
+ return new Pair<>(a, var);
}
@Override
public Pair<ILogicalOperator, LogicalVariable> visit(WhereClause w, Mutable<ILogicalOperator> tupSource)
throws AsterixException {
Pair<ILogicalExpression, Mutable<ILogicalOperator>> p = langExprToAlgExpression(w.getWhereExpr(), tupSource);
- SelectOperator s = new SelectOperator(new MutableObject<ILogicalExpression>(p.first), false, null);
+ SelectOperator s = new SelectOperator(new MutableObject<>(p.first), false, null);
s.getInputs().add(p.second);
- return new Pair<ILogicalOperator, LogicalVariable>(s, null);
+ return new Pair<>(s, null);
}
@Override
@@ -1150,7 +1145,7 @@
opLim = new LimitOperator(p1.first);
opLim.getInputs().add(p1.second);
}
- return new Pair<ILogicalOperator, LogicalVariable>(opLim, null);
+ return new Pair<>(opLim, null);
}
protected AbstractFunctionCallExpression createComparisonExpression(OperatorType t) {
@@ -1252,20 +1247,20 @@
case VARIABLE_EXPRESSION: {
VariableReferenceExpression ve = new VariableReferenceExpression(
context.getVar(((VariableExpr) expr).getVar().getId()));
- return new Pair<ILogicalExpression, Mutable<ILogicalOperator>>(ve, topOpRef);
+ return new Pair<>(ve, topOpRef);
}
case LITERAL_EXPRESSION: {
LiteralExpr val = (LiteralExpr) expr;
- return new Pair<ILogicalExpression, Mutable<ILogicalOperator>>(new ConstantExpression(
+ return new Pair<>(new ConstantExpression(
new AsterixConstantValue(ConstantHelper.objectFromLiteral(val.getValue()))), topOpRef);
}
default: {
if (expressionNeedsNoNesting(expr)) {
Pair<ILogicalOperator, LogicalVariable> p = expr.accept(this, topOpRef);
ILogicalExpression exp = ((AssignOperator) p.first).getExpressions().get(0).getValue();
- return new Pair<ILogicalExpression, Mutable<ILogicalOperator>>(exp, p.first.getInputs().get(0));
+ return new Pair<>(exp, p.first.getInputs().get(0));
} else {
- Mutable<ILogicalOperator> srcRef = new MutableObject<ILogicalOperator>();
+ Mutable<ILogicalOperator> srcRef = new MutableObject<>();
Pair<ILogicalOperator, LogicalVariable> p = expr.accept(this, srcRef);
if (p.first.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
if (topOpRef.getValue() != null) {
@@ -1274,17 +1269,15 @@
// Re-binds the bottom operator reference to {@code topOpRef}.
rebindBottomOpRef(p.first, srcRef, topOpRef);
}
- Mutable<ILogicalOperator> top2 = new MutableObject<ILogicalOperator>(p.first);
- return new Pair<ILogicalExpression, Mutable<ILogicalOperator>>(
- new VariableReferenceExpression(p.second), top2);
+ Mutable<ILogicalOperator> top2 = new MutableObject<>(p.first);
+ return new Pair<>(new VariableReferenceExpression(p.second), top2);
} else {
SubplanOperator s = new SubplanOperator();
s.getInputs().add(topOpRef);
- srcRef.setValue(new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(s)));
- Mutable<ILogicalOperator> planRoot = new MutableObject<ILogicalOperator>(p.first);
+ srcRef.setValue(new NestedTupleSourceOperator(new MutableObject<>(s)));
+ Mutable<ILogicalOperator> planRoot = new MutableObject<>(p.first);
s.setRootOp(planRoot);
- return new Pair<ILogicalExpression, Mutable<ILogicalOperator>>(
- new VariableReferenceExpression(p.second), new MutableObject<ILogicalOperator>(s));
+ return new Pair<>(new VariableReferenceExpression(p.second), new MutableObject<>(s));
}
}
}
@@ -1293,23 +1286,23 @@
protected Pair<ILogicalOperator, LogicalVariable> aggListifyForSubquery(LogicalVariable var,
Mutable<ILogicalOperator> opRef, boolean bProject) {
- AggregateFunctionCallExpression funAgg = AsterixBuiltinFunctions.makeAggregateFunctionExpression(
- AsterixBuiltinFunctions.LISTIFY, new ArrayList<Mutable<ILogicalExpression>>());
- funAgg.getArguments().add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(var)));
+ AggregateFunctionCallExpression funAgg = AsterixBuiltinFunctions
+ .makeAggregateFunctionExpression(AsterixBuiltinFunctions.LISTIFY, new ArrayList<>());
+ funAgg.getArguments().add(new MutableObject<>(new VariableReferenceExpression(var)));
LogicalVariable varListified = context.newSubplanOutputVar();
AggregateOperator agg = new AggregateOperator(mkSingletonArrayList(varListified),
- (List) mkSingletonArrayList(new MutableObject<ILogicalExpression>(funAgg)));
+ (List) mkSingletonArrayList(new MutableObject<>(funAgg)));
agg.getInputs().add(opRef);
ILogicalOperator res;
if (bProject) {
ProjectOperator pr = new ProjectOperator(varListified);
- pr.getInputs().add(new MutableObject<ILogicalOperator>(agg));
+ pr.getInputs().add(new MutableObject<>(agg));
res = pr;
} else {
res = agg;
}
- return new Pair<ILogicalOperator, LogicalVariable>(res, varListified);
+ return new Pair<>(res, varListified);
}
protected Pair<ILogicalOperator, LogicalVariable> visitAndOrOperator(OperatorExpr op,
@@ -1334,14 +1327,14 @@
"Unexpected operator " + ops.get(i) + " in an OperatorExpr starting with " + opLogical);
}
}
- f.getArguments().add(new MutableObject<ILogicalExpression>(p.first));
+ f.getArguments().add(new MutableObject<>(p.first));
}
LogicalVariable assignedVar = context.newVar();
- AssignOperator a = new AssignOperator(assignedVar, new MutableObject<ILogicalExpression>(f));
+ AssignOperator a = new AssignOperator(assignedVar, new MutableObject<>(f));
a.getInputs().add(topOp);
- return new Pair<ILogicalOperator, LogicalVariable>(a, assignedVar);
+ return new Pair<>(a, assignedVar);
}
@@ -1361,7 +1354,7 @@
protected ILogicalExpression makeUnnestExpression(ILogicalExpression expr) {
List<Mutable<ILogicalExpression>> argRefs = new ArrayList<>();
- argRefs.add(new MutableObject<ILogicalExpression>(expr));
+ argRefs.add(new MutableObject<>(expr));
switch (expr.getExpressionTag()) {
case CONSTANT:
case VARIABLE: {
@@ -1463,7 +1456,7 @@
varMap.putAll(cloneVarMap);
// Sets the new child.
- childRef = new MutableObject<ILogicalOperator>(newChild);
+ childRef = new MutableObject<>(newChild);
currentOperator.getInputs().set(childIndex, childRef);
}
@@ -1512,25 +1505,23 @@
Mutable<ILogicalExpression> selectExpr, Expression branchExpression) throws AsterixException {
context.enterSubplan();
SubplanOperator subplanOp = new SubplanOperator();
- subplanOp.getInputs().add(new MutableObject<ILogicalOperator>(inputOp));
- Mutable<ILogicalOperator> nestedSource = new MutableObject<ILogicalOperator>(
- new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(subplanOp)));
+ subplanOp.getInputs().add(new MutableObject<>(inputOp));
+ Mutable<ILogicalOperator> nestedSource = new MutableObject<>(
+ new NestedTupleSourceOperator(new MutableObject<>(subplanOp)));
SelectOperator select = new SelectOperator(selectExpr, false, null);
// The select operator cannot be moved up and down, otherwise it will cause typing issues (ASTERIXDB-1203).
OperatorPropertiesUtil.markMovable(select, false);
select.getInputs().add(nestedSource);
- Pair<ILogicalOperator, LogicalVariable> pBranch = branchExpression.accept(this,
- new MutableObject<ILogicalOperator>(select));
+ Pair<ILogicalOperator, LogicalVariable> pBranch = branchExpression.accept(this, new MutableObject<>(select));
LogicalVariable branchVar = context.newVar();
AggregateOperator aggOp = new AggregateOperator(Collections.singletonList(branchVar),
- Collections.singletonList(new MutableObject<ILogicalExpression>(new AggregateFunctionCallExpression(
- FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.LISTIFY), false,
- Collections.singletonList(new MutableObject<ILogicalExpression>(
- new VariableReferenceExpression(pBranch.second)))))));
- aggOp.getInputs().add(new MutableObject<ILogicalOperator>(pBranch.first));
- ILogicalPlan planForBranch = new ALogicalPlanImpl(new MutableObject<ILogicalOperator>(aggOp));
+ Collections.singletonList(new MutableObject<>(new AggregateFunctionCallExpression(
+ FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.LISTIFY), false, Collections.singletonList(
+ new MutableObject<>(new VariableReferenceExpression(pBranch.second)))))));
+ aggOp.getInputs().add(new MutableObject<>(pBranch.first));
+ ILogicalPlan planForBranch = new ALogicalPlanImpl(new MutableObject<>(aggOp));
subplanOp.getNestedPlans().add(planForBranch);
context.exitSubplan();
- return new Pair<ILogicalOperator, LogicalVariable>(subplanOp, branchVar);
+ return new Pair<>(subplanOp, branchVar);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
index 37e888d..d4c3b0e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
@@ -20,7 +20,6 @@
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
@@ -107,13 +106,9 @@
public class ExternalIndexingOperations {
- public static final List<List<String>> FILE_INDEX_FIELD_NAMES = new ArrayList<List<String>>();
- public static final ArrayList<IAType> FILE_INDEX_FIELD_TYPES = new ArrayList<IAType>();
-
- static {
- FILE_INDEX_FIELD_NAMES.add(new ArrayList<String>(Arrays.asList("")));
- FILE_INDEX_FIELD_TYPES.add(BuiltinType.ASTRING);
- }
+ public static final List<List<String>> FILE_INDEX_FIELD_NAMES = Collections
+ .singletonList(Collections.singletonList(""));
+ public static final List<IAType> FILE_INDEX_FIELD_TYPES = Collections.singletonList(BuiltinType.ASTRING);
public static boolean isIndexible(ExternalDatasetDetails ds) {
String adapter = ds.getAdapter();
@@ -151,7 +146,7 @@
public static ArrayList<ExternalFile> getSnapshotFromExternalFileSystem(Dataset dataset)
throws AlgebricksException {
- ArrayList<ExternalFile> files = new ArrayList<ExternalFile>();
+ ArrayList<ExternalFile> files = new ArrayList<>();
ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
try {
// Create the file system object
@@ -259,27 +254,27 @@
* @param files
* @param indexerDesc
* @return
+ * @throws AsterixException
* @throws Exception
*/
private static Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint> getExternalDataIndexingOperator(
JobSpecification jobSpec, IAType itemType, Dataset dataset, List<ExternalFile> files,
- RecordDescriptor indexerDesc, AqlMetadataProvider metadataProvider) throws Exception {
+ RecordDescriptor indexerDesc) throws AsterixException {
ExternalDatasetDetails externalDatasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
Map<String, String> configuration = externalDatasetDetails.getProperties();
IAdapterFactory adapterFactory = AdapterFactoryProvider.getIndexingAdapterFactory(
externalDatasetDetails.getAdapter(), configuration, (ARecordType) itemType, files, true, null);
- return new Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint>(
- new ExternalDataScanOperatorDescriptor(jobSpec, indexerDesc, adapterFactory),
+ return new Pair<>(new ExternalDataScanOperatorDescriptor(jobSpec, indexerDesc, adapterFactory),
adapterFactory.getPartitionConstraint());
}
public static Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint> createExternalIndexingOp(
JobSpecification spec, AqlMetadataProvider metadataProvider, Dataset dataset, ARecordType itemType,
- RecordDescriptor indexerDesc, List<ExternalFile> files) throws Exception {
+ RecordDescriptor indexerDesc, List<ExternalFile> files) throws AsterixException {
if (files == null) {
files = MetadataManager.INSTANCE.getDatasetExternalFiles(metadataProvider.getMetadataTxnContext(), dataset);
}
- return getExternalDataIndexingOperator(spec, itemType, dataset, files, indexerDesc, metadataProvider);
+ return getExternalDataIndexingOperator(spec, itemType, dataset, files, indexerDesc);
}
/**
@@ -431,7 +426,7 @@
public static JobSpecification buildFilesIndexUpdateOp(Dataset ds, List<ExternalFile> metadataFiles,
List<ExternalFile> deletedFiles, List<ExternalFile> addedFiles, List<ExternalFile> appendedFiles,
AqlMetadataProvider metadataProvider) throws MetadataException, AlgebricksException {
- ArrayList<ExternalFile> files = new ArrayList<ExternalFile>();
+ ArrayList<ExternalFile> files = new ArrayList<>();
for (ExternalFile file : metadataFiles) {
if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) {
files.add(file);
@@ -456,7 +451,7 @@
List<ExternalFile> deletedFiles, List<ExternalFile> addedFiles, List<ExternalFile> appendedFiles,
AqlMetadataProvider metadataProvider) throws AsterixException, AlgebricksException {
// Create files list
- ArrayList<ExternalFile> files = new ArrayList<ExternalFile>();
+ ArrayList<ExternalFile> files = new ArrayList<>();
for (ExternalFile metadataFile : metadataFiles) {
if (metadataFile.getPendingOp() != ExternalFilePendingOp.PENDING_APPEND_OP) {
@@ -478,7 +473,7 @@
CompiledCreateIndexStatement ccis = new CompiledCreateIndexStatement(index.getIndexName(),
index.getDataverseName(), index.getDatasetName(), index.getKeyFieldNames(), index.getKeyFieldTypes(),
index.isEnforcingKeyFileds(), index.getGramLength(), index.getIndexType());
- return IndexOperations.buildSecondaryIndexLoadingJobSpec(ccis, null, null, metadataProvider, files);
+ return IndexOperations.buildSecondaryIndexLoadingJobSpec(ccis, null, null, null, null, metadataProvider, files);
}
public static JobSpecification buildCommitJob(Dataset ds, List<Index> indexes, AqlMetadataProvider metadataProvider)
@@ -500,10 +495,10 @@
IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider,
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
- ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory>();
- ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
- ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<ExternalRTreeDataflowHelperFactory>();
- ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
+ ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<>();
+ ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<>();
+ ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<>();
+ ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<>();
for (Index index : indexes) {
if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
@@ -634,10 +629,10 @@
IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider,
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
- ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory>();
- ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
- ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<ExternalRTreeDataflowHelperFactory>();
- ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
+ ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<>();
+ ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<>();
+ ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<>();
+ ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<>();
for (Index index : indexes) {
if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
@@ -692,10 +687,10 @@
IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider,
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
- ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory>();
- ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
- ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<ExternalRTreeDataflowHelperFactory>();
- ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
+ ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<>();
+ ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<>();
+ ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<>();
+ ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<>();
for (Index index : indexes) {
if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 3d91c7d..7656006 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -861,7 +861,7 @@
if (subType.isOpen()) {
isOpen = true;
break;
- };
+ } ;
}
}
if (fieldExpr.second == null) {
@@ -875,6 +875,9 @@
throw new AlgebricksException("Typed index on \"" + fieldExpr.first
+ "\" field could be created only for open datatype");
}
+ if (stmtCreateIndex.hasMetaField()) {
+ throw new AlgebricksException("Typed open index can only be created on the record part");
+ }
Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, fieldExpr.second,
indexName, dataverseName);
TypeSignature typeSignature = new TypeSignature(dataverseName, indexName);
@@ -1013,7 +1016,8 @@
CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
index.getDatasetName(), index.getKeyFieldNames(), index.getKeyFieldTypes(),
index.isEnforcingKeyFileds(), index.getGramLength(), index.getIndexType());
- spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, aRecordType, enforcedType, metadataProvider);
+ spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, aRecordType, metaRecordType,
+ keySourceIndicators, enforcedType, metadataProvider);
if (spec == null) {
throw new AsterixException("Failed to create job spec for creating index '"
+ stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
@@ -1034,7 +1038,9 @@
cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, index.getDatasetName(),
index.getKeyFieldNames(), index.getKeyFieldTypes(), index.isEnforcingKeyFileds(),
index.getGramLength(), index.getIndexType());
- spec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, aRecordType, enforcedType, metadataProvider);
+
+ spec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, aRecordType, metaRecordType,
+ keySourceIndicators, enforcedType, metadataProvider);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
@@ -1949,7 +1955,7 @@
private JobSpecification rewriteCompileQuery(AqlMetadataProvider metadataProvider, Query query,
ICompiledDmlStatement stmt)
- throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException {
+ throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException {
// Query Rewriting (happens under the same ongoing metadata transaction)
Pair<Query, Integer> reWrittenQuery = apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query,
@@ -2259,7 +2265,7 @@
*/
private Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> getFeedConnectionRequest(String dataverse,
Feed feed, String dataset, FeedPolicyEntity feedPolicy, MetadataTransactionContext mdTxnCtx)
- throws MetadataException {
+ throws MetadataException {
IFeedJoint sourceFeedJoint = null;
FeedConnectionRequest request = null;
List<String> functionsToApply = new ArrayList<String>();
@@ -2445,7 +2451,6 @@
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
MetadataLockManager.INSTANCE.compactBegin(dataverseName, dataverseName + "." + datasetName);
-
List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
try {
Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
@@ -2453,24 +2458,27 @@
throw new AlgebricksException(
"There is no dataset with this name " + datasetName + " in dataverse " + dataverseName + ".");
}
-
String itemTypeName = ds.getItemTypeName();
Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
ds.getItemTypeDataverseName(), itemTypeName);
-
+ ARecordType metaRecordType = null;
+ if (ds.hasMetaPart()) {
+ metaRecordType = (ARecordType) MetadataManager.INSTANCE
+ .getDatatype(metadataProvider.getMetadataTxnContext(), ds.getMetaItemTypeDataverseName(),
+ ds.getMetaItemTypeName())
+ .getDatatype();
+ }
// Prepare jobs to compact the datatset and its indexes
List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
if (indexes.size() == 0) {
throw new AlgebricksException(
"Cannot compact the extrenal dataset " + datasetName + " because it has no indexes");
}
-
Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
dataverseName);
jobsToExecute.add(DatasetOperations.compactDatasetJobSpec(dataverse, datasetName, metadataProvider));
ARecordType aRecordType = (ARecordType) dt.getDatatype();
ARecordType enforcedType = IntroduceSecondaryIndexInsertDeleteRule.createEnforcedType(aRecordType, indexes);
-
if (ds.getDatasetType() == DatasetType.INTERNAL) {
for (int j = 0; j < indexes.size(); j++) {
if (indexes.get(j).isSecondaryIndex()) {
@@ -2479,18 +2487,8 @@
}
}
} else {
- for (int j = 0; j < indexes.size(); j++) {
- if (!ExternalIndexingOperations.isFileIndex(indexes.get(j))) {
- CompiledIndexCompactStatement cics = new CompiledIndexCompactStatement(dataverseName,
- datasetName, indexes.get(j).getIndexName(), indexes.get(j).getKeyFieldNames(),
- indexes.get(j).getKeyFieldTypes(), indexes.get(j).isEnforcingKeyFileds(),
- indexes.get(j).getGramLength(), indexes.get(j).getIndexType());
- jobsToExecute.add(IndexOperations.buildSecondaryIndexCompactJobSpec(cics, aRecordType,
- enforcedType, metadataProvider, ds));
- }
-
- }
- jobsToExecute.add(ExternalIndexingOperations.compactFilesIndexJobSpec(ds, metadataProvider));
+ prepareCompactJobsForExternalDataset(indexes, dataverseName, datasetName, ds, jobsToExecute,
+ aRecordType, metaRecordType, metadataProvider, enforcedType);
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
@@ -2509,6 +2507,28 @@
}
}
+ private void prepareCompactJobsForExternalDataset(List<Index> indexes, String dataverseName, String datasetName,
+ Dataset ds, List<JobSpecification> jobsToExecute, ARecordType aRecordType, ARecordType metaRecordType,
+ AqlMetadataProvider metadataProvider, ARecordType enforcedType)
+ throws MetadataException, AlgebricksException {
+ for (int j = 0; j < indexes.size(); j++) {
+ if (!ExternalIndexingOperations.isFileIndex(indexes.get(j))) {
+ CompiledIndexCompactStatement cics = new CompiledIndexCompactStatement(dataverseName, datasetName,
+ indexes.get(j).getIndexName(), indexes.get(j).getKeyFieldNames(),
+ indexes.get(j).getKeyFieldTypes(), indexes.get(j).isEnforcingKeyFileds(),
+ indexes.get(j).getGramLength(), indexes.get(j).getIndexType());
+ List<Integer> keySourceIndicators = null;
+ if (ds.hasMetaPart()) {
+ keySourceIndicators = indexes.get(j).getKeyFieldSourceIndicators();
+ }
+ jobsToExecute.add(IndexOperations.buildSecondaryIndexCompactJobSpec(cics, aRecordType, metaRecordType,
+ keySourceIndicators, enforcedType, metadataProvider));
+ }
+
+ }
+ jobsToExecute.add(ExternalIndexingOperations.compactFilesIndexJobSpec(ds, metadataProvider));
+ }
+
private void handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
IHyracksDataset hdc, ResultDelivery resultDelivery, ResultUtils.Stats stats) throws Exception {
@@ -2919,7 +2939,7 @@
private void prepareRunExternalRuntime(AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc,
RunStatement pregelixStmt, String dataverseNameFrom, String dataverseNameTo, String datasetNameFrom,
String datasetNameTo, MetadataTransactionContext mdTxnCtx)
- throws AlgebricksException, AsterixException, Exception {
+ throws AlgebricksException, AsterixException, Exception {
// Validates the source/sink dataverses and datasets.
Dataset fromDataset = metadataProvider.findDataset(dataverseNameFrom, datasetNameFrom);
if (fromDataset == null) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
index 9052696..46b9c35 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
@@ -159,7 +159,7 @@
}
JobSpecification spec = JobSpecificationUtils.createJobSpecification();
IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
- itemType, format.getBinaryComparatorFactoryProvider());
+ itemType, metaItemType, format.getBinaryComparatorFactoryProvider());
ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
int[] bloomFilterKeyFields = DatasetUtils.createBloomFilterKeyFields(dataset);
@@ -227,7 +227,7 @@
ARecordType metaItemType = DatasetUtils.getMetaType(metadata, dataset);
JobSpecification spec = JobSpecificationUtils.createJobSpecification();
IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
- itemType, format.getBinaryComparatorFactoryProvider());
+ itemType, metaItemType, format.getBinaryComparatorFactoryProvider());
ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
int[] blooFilterKeyFields = DatasetUtils.createBloomFilterKeyFields(dataset);
ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
index 4cb8a0f..6ed8db9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
@@ -51,42 +51,43 @@
public class IndexOperations {
- private static final PhysicalOptimizationConfig physicalOptimizationConfig = OptimizationConfUtil
- .getPhysicalOptimizationConfig();
+ private static final PhysicalOptimizationConfig physicalOptimizationConfig =
+ OptimizationConfUtil.getPhysicalOptimizationConfig();
public static JobSpecification buildSecondaryIndexCreationJobSpec(CompiledCreateIndexStatement createIndexStmt,
- ARecordType recType, ARecordType enforcedType, AqlMetadataProvider metadataProvider)
- throws AsterixException, AlgebricksException {
+ ARecordType recType, ARecordType metaType, List<Integer> keySourceIndicators, ARecordType enforcedType,
+ AqlMetadataProvider metadataProvider) throws AsterixException, AlgebricksException {
SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
.createIndexOperationsHelper(createIndexStmt.getIndexType(), createIndexStmt.getDataverseName(),
createIndexStmt.getDatasetName(), createIndexStmt.getIndexName(),
createIndexStmt.getKeyFields(), createIndexStmt.getKeyFieldTypes(),
createIndexStmt.isEnforced(), createIndexStmt.getGramLength(), metadataProvider,
- physicalOptimizationConfig, recType, enforcedType);
+ physicalOptimizationConfig, recType, metaType, keySourceIndicators, enforcedType);
return secondaryIndexHelper.buildCreationJobSpec();
}
public static JobSpecification buildSecondaryIndexLoadingJobSpec(CompiledCreateIndexStatement createIndexStmt,
- ARecordType recType, ARecordType enforcedType, AqlMetadataProvider metadataProvider)
+ ARecordType recType, ARecordType metaType, List<Integer> keySourceIndicators, ARecordType enforcedType,
+ AqlMetadataProvider metadataProvider) throws AsterixException, AlgebricksException {
+ SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
+ .createIndexOperationsHelper(createIndexStmt.getIndexType(), createIndexStmt.getDataverseName(),
+ createIndexStmt.getDatasetName(), createIndexStmt.getIndexName(),
+ createIndexStmt.getKeyFields(), createIndexStmt.getKeyFieldTypes(),
+ createIndexStmt.isEnforced(), createIndexStmt.getGramLength(), metadataProvider,
+ physicalOptimizationConfig, recType, metaType, keySourceIndicators, enforcedType);
+ return secondaryIndexHelper.buildLoadingJobSpec();
+ }
+
+ public static JobSpecification buildSecondaryIndexLoadingJobSpec(CompiledCreateIndexStatement createIndexStmt,
+ ARecordType recType, ARecordType metaType, List<Integer> keySourceIndicators, ARecordType enforcedType,
+ AqlMetadataProvider metadataProvider, List<ExternalFile> files)
throws AsterixException, AlgebricksException {
SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
.createIndexOperationsHelper(createIndexStmt.getIndexType(), createIndexStmt.getDataverseName(),
createIndexStmt.getDatasetName(), createIndexStmt.getIndexName(),
createIndexStmt.getKeyFields(), createIndexStmt.getKeyFieldTypes(),
createIndexStmt.isEnforced(), createIndexStmt.getGramLength(), metadataProvider,
- physicalOptimizationConfig, recType, enforcedType);
- return secondaryIndexHelper.buildLoadingJobSpec();
- }
-
- public static JobSpecification buildSecondaryIndexLoadingJobSpec(CompiledCreateIndexStatement createIndexStmt,
- ARecordType recType, ARecordType enforcedType, AqlMetadataProvider metadataProvider,
- List<ExternalFile> files) throws AsterixException, AlgebricksException {
- SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
- .createIndexOperationsHelper(createIndexStmt.getIndexType(), createIndexStmt.getDataverseName(),
- createIndexStmt.getDatasetName(), createIndexStmt.getIndexName(),
- createIndexStmt.getKeyFields(), createIndexStmt.getKeyFieldTypes(),
- createIndexStmt.isEnforced(), createIndexStmt.getGramLength(), metadataProvider,
- physicalOptimizationConfig, recType, enforcedType);
+ physicalOptimizationConfig, recType, metaType, keySourceIndicators, enforcedType);
secondaryIndexHelper.setExternalFiles(files);
return secondaryIndexHelper.buildLoadingJobSpec();
}
@@ -103,33 +104,34 @@
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
.splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp);
AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
- metadataProvider.getMetadataTxnContext());
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+ DatasetUtils.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
// The index drop operation should be persistent regardless of temp datasets or permanent dataset.
IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec,
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
- dataset.getDatasetId()), compactionInfo.first, compactionInfo.second,
+ splitsAndConstraint.first,
+ new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+ compactionInfo.first, compactionInfo.second,
new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate(), false, null, null, null, null, !temp));
- AlgebricksPartitionConstraintHelper
- .setPartitionConstraintInJobSpec(spec, btreeDrop, splitsAndConstraint.second);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop,
+ splitsAndConstraint.second);
spec.addRoot(btreeDrop);
return spec;
}
public static JobSpecification buildSecondaryIndexCompactJobSpec(CompiledIndexCompactStatement indexCompactStmt,
- ARecordType recType, ARecordType enforcedType, AqlMetadataProvider metadataProvider, Dataset dataset)
- throws AsterixException, AlgebricksException {
+ ARecordType recType, ARecordType metaType, List<Integer> keySourceIndicators, ARecordType enforcedType,
+ AqlMetadataProvider metadataProvider) throws AsterixException, AlgebricksException {
SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
.createIndexOperationsHelper(indexCompactStmt.getIndexType(), indexCompactStmt.getDataverseName(),
indexCompactStmt.getDatasetName(), indexCompactStmt.getIndexName(),
indexCompactStmt.getKeyFields(), indexCompactStmt.getKeyTypes(), indexCompactStmt.isEnforced(),
indexCompactStmt.getGramLength(), metadataProvider, physicalOptimizationConfig, recType,
- enforcedType);
+ metaType, keySourceIndicators, enforcedType);
return secondaryIndexHelper.buildCompactJobSpec();
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
index c793f59..15ea2f8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
@@ -35,6 +35,7 @@
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
+import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
import org.apache.asterix.transaction.management.resource.ExternalBTreeWithBuddyLocalResourceMetadata;
@@ -146,7 +147,7 @@
// Assign op.
AbstractOperatorDescriptor sourceOp = primaryScanOp;
if (isEnforcingKeyTypes) {
- sourceOp = createCastOp(spec, primaryScanOp, numSecondaryKeys, dataset.getDatasetType());
+ sourceOp = createCastOp(spec, dataset.getDatasetType());
spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
}
AlgebricksMetaOperatorDescriptor asterixAssignOp = createExternalAssignOp(spec, numSecondaryKeys,
@@ -208,7 +209,7 @@
// Assign op.
AbstractOperatorDescriptor sourceOp = primaryScanOp;
if (isEnforcingKeyTypes) {
- sourceOp = createCastOp(spec, primaryScanOp, numSecondaryKeys, dataset.getDatasetType());
+ sourceOp = createCastOp(spec, dataset.getDatasetType());
spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
}
AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, sourceOp, numSecondaryKeys,
@@ -312,9 +313,11 @@
secondaryBloomFilterKeyFields = new int[numSecondaryKeys];
ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys
+ numFilterFields];
- ISerializerDeserializer[] enforcedRecFields = new ISerializerDeserializer[1 + numPrimaryKeys + numFilterFields];
+ ISerializerDeserializer[] enforcedRecFields = new ISerializerDeserializer[1 + numPrimaryKeys
+ + (dataset.hasMetaPart() ? 1 : 0) + numFilterFields];
+ ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys + (dataset.hasMetaPart() ? 1 : 0)
+ + numFilterFields];
secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
- ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys];
ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
ITypeTraitProvider typeTraitProvider = metadataProvider.getFormat().getTypeTraitProvider();
IBinaryComparatorFactoryProvider comparatorFactoryProvider = metadataProvider.getFormat()
@@ -322,10 +325,19 @@
// Record column is 0 for external datasets, numPrimaryKeys for internal ones
int recordColumn = dataset.getDatasetType() == DatasetType.INTERNAL ? numPrimaryKeys : 0;
for (int i = 0; i < numSecondaryKeys; i++) {
+ ARecordType sourceType;
+ int sourceColumn;
+ if (keySourceIndicators == null || keySourceIndicators.get(i) == 0) {
+ sourceType = itemType;
+ sourceColumn = recordColumn;
+ } else {
+ sourceType = metaType;
+ sourceColumn = recordColumn + 1;
+ }
secondaryFieldAccessEvalFactories[i] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
- isEnforcingKeyTypes ? enforcedItemType : itemType, secondaryKeyFields.get(i), recordColumn);
+ isEnforcingKeyTypes ? enforcedItemType : sourceType, secondaryKeyFields.get(i), sourceColumn);
Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
- secondaryKeyFields.get(i), itemType);
+ secondaryKeyFields.get(i), sourceType);
IAType keyType = keyTypePair.first;
anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second;
ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
@@ -354,6 +366,11 @@
}
}
enforcedRecFields[numPrimaryKeys] = serdeProvider.getSerializerDeserializer(itemType);
+ enforcedTypeTraits[numPrimaryKeys] = typeTraitProvider.getTypeTrait(itemType);
+ if (dataset.hasMetaPart()) {
+ enforcedRecFields[numPrimaryKeys + 1] = serdeProvider.getSerializerDeserializer(metaType);
+ enforcedTypeTraits[numPrimaryKeys + 1] = typeTraitProvider.getTypeTrait(metaType);
+ }
if (numFilterFields > 0) {
secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat()
@@ -362,8 +379,10 @@
IAType type = keyTypePair.first;
ISerializerDeserializer serde = serdeProvider.getSerializerDeserializer(type);
secondaryRecFields[numPrimaryKeys + numSecondaryKeys] = serde;
+ enforcedRecFields[numPrimaryKeys + 1 + (dataset.hasMetaPart() ? 1 : 0)] = serde;
+ enforcedTypeTraits[numPrimaryKeys + 1 + (dataset.hasMetaPart() ? 1 : 0)] = typeTraitProvider
+ .getTypeTrait(type);
}
-
secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
enforcedRecDesc = new RecordDescriptor(enforcedRecFields, enforcedTypeTraits);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
index 65afe3c..e677f54 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
@@ -47,6 +47,7 @@
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
import org.apache.asterix.metadata.utils.DatasetUtils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
@@ -107,6 +108,9 @@
protected String datasetName;
protected Dataset dataset;
protected ARecordType itemType;
+ protected ARecordType metaType;
+ protected List<Integer> keySourceIndicators;
+ protected ISerializerDeserializer metaSerde;
protected ISerializerDeserializer payloadSerde;
protected IFileSplitProvider primaryFileSplitProvider;
protected AlgebricksPartitionConstraint primaryPartitionConstraint;
@@ -131,7 +135,7 @@
protected Map<String, String> mergePolicyFactoryProperties;
protected RecordDescriptor enforcedRecDesc;
protected ARecordType enforcedItemType;
-
+ protected ARecordType enforcedMetaType;
protected int numFilterFields;
protected List<String> filterFieldName;
protected ITypeTraits[] filterTypeTraits;
@@ -152,8 +156,8 @@
public static SecondaryIndexOperationsHelper createIndexOperationsHelper(IndexType indexType, String dataverseName,
String datasetName, String indexName, List<List<String>> secondaryKeyFields, List<IAType> secondaryKeyTypes,
boolean isEnforced, int gramLength, AqlMetadataProvider metadataProvider,
- PhysicalOptimizationConfig physOptConf, ARecordType recType, ARecordType enforcedType)
- throws AsterixException, AlgebricksException {
+ PhysicalOptimizationConfig physOptConf, ARecordType recType, ARecordType metaType,
+ List<Integer> keySourceIndicators, ARecordType enforcedType) throws AsterixException, AlgebricksException {
IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
SecondaryIndexOperationsHelper indexOperationsHelper = null;
switch (indexType) {
@@ -178,7 +182,8 @@
}
}
indexOperationsHelper.init(indexType, dataverseName, datasetName, indexName, secondaryKeyFields,
- secondaryKeyTypes, isEnforced, gramLength, metadataProvider, recType, enforcedType);
+ secondaryKeyTypes, isEnforced, gramLength, metadataProvider, recType, metaType, keySourceIndicators,
+ enforcedType);
return indexOperationsHelper;
}
@@ -190,7 +195,8 @@
protected void init(IndexType indexType, String dvn, String dsn, String in, List<List<String>> secondaryKeyFields,
List<IAType> secondaryKeyTypes, boolean isEnforced, int gramLength, AqlMetadataProvider metadataProvider,
- ARecordType aRecType, ARecordType enforcedType) throws AsterixException, AlgebricksException {
+ ARecordType aRecType, ARecordType metaType, List<Integer> keySourceIndicators, ARecordType enforcedType)
+ throws AsterixException, AlgebricksException {
this.metadataProvider = metadataProvider;
dataverseName = dvn == null ? metadataProvider.getDefaultDataverseName() : dvn;
datasetName = dsn;
@@ -202,8 +208,12 @@
}
boolean temp = dataset.getDatasetDetails().isTemp();
itemType = aRecType;
+ this.metaType = metaType;
+ this.keySourceIndicators = keySourceIndicators;
enforcedItemType = enforcedType;
payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
+ metaSerde = metaType == null ? null
+ : AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType);
numSecondaryKeys = secondaryKeyFields.size();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
.splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, secondaryIndexName, temp);
@@ -266,13 +276,20 @@
protected void setPrimaryRecDescAndComparators() throws AlgebricksException {
List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
int numPrimaryKeys = partitioningKeys.size();
- ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
- ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
+ ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1
+ + (dataset.hasMetaPart() ? 1 : 0)];
+ ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1 + (dataset.hasMetaPart() ? 1 : 0)];
primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
primaryBloomFilterKeyFields = new int[numPrimaryKeys];
ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
+ List<Integer> indicators = null;
+ if (dataset.hasMetaPart()) {
+ indicators = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator();
+ }
for (int i = 0; i < numPrimaryKeys; i++) {
- IAType keyType = itemType.getSubFieldType(partitioningKeys.get(i));
+ IAType keyType = (indicators == null || indicators.get(i) == 0)
+ ? itemType.getSubFieldType(partitioningKeys.get(i))
+ : metaType.getSubFieldType(partitioningKeys.get(i));
primaryRecFields[i] = serdeProvider.getSerializerDeserializer(keyType);
primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
.getBinaryComparatorFactory(keyType, true);
@@ -281,6 +298,10 @@
}
primaryRecFields[numPrimaryKeys] = payloadSerde;
primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+ if (dataset.hasMetaPart()) {
+ primaryRecFields[numPrimaryKeys + 1] = payloadSerde;
+ primaryTypeTraits[numPrimaryKeys + 1] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+ }
primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits);
}
@@ -376,14 +397,13 @@
return asterixAssignOp;
}
- protected AlgebricksMetaOperatorDescriptor createCastOp(JobSpecification spec,
- AbstractOperatorDescriptor primaryScanOp, int numSecondaryKeyFields, DatasetType dsType) {
+ protected AlgebricksMetaOperatorDescriptor createCastOp(JobSpecification spec, DatasetType dsType) {
CastRecordDescriptor castFuncDesc = (CastRecordDescriptor) CastRecordDescriptor.FACTORY
.createFunctionDescriptor();
castFuncDesc.reset(enforcedItemType, itemType);
int[] outColumns = new int[1];
- int[] projectionList = new int[1 + numPrimaryKeys];
+ int[] projectionList = new int[(dataset.hasMetaPart() ? 2 : 1) + numPrimaryKeys];
int recordIdx;
//external datascan operator returns a record as the first field, instead of the last in internal case
if (dsType == DatasetType.EXTERNAL) {
@@ -396,6 +416,9 @@
for (int i = 0; i <= numPrimaryKeys; i++) {
projectionList[i] = i;
}
+ if (dataset.hasMetaPart()) {
+ projectionList[numPrimaryKeys + 1] = numPrimaryKeys + 1;
+ }
IScalarEvaluatorFactory[] castEvalFact = new IScalarEvaluatorFactory[] {
new ColumnAccessEvalFactory(recordIdx) };
IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[1];
@@ -440,10 +463,10 @@
for (int i = 0; i < numSecondaryKeyFields; i++) {
// Access column i, and apply 'is not null'.
ColumnAccessEvalFactory columnAccessEvalFactory = new ColumnAccessEvalFactory(i);
- IScalarEvaluatorFactory isNullEvalFactory = isUnknownDesc
+ IScalarEvaluatorFactory isUnknownEvalFactory = isUnknownDesc
.createEvaluatorFactory(new IScalarEvaluatorFactory[] { columnAccessEvalFactory });
IScalarEvaluatorFactory notEvalFactory = notDesc
- .createEvaluatorFactory(new IScalarEvaluatorFactory[] { isNullEvalFactory });
+ .createEvaluatorFactory(new IScalarEvaluatorFactory[] { isUnknownEvalFactory });
andArgsEvalFactories[i] = notEvalFactory;
}
IScalarEvaluatorFactory selectCond = null;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java
index e072a63..7044205 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java
@@ -43,8 +43,8 @@
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -95,8 +95,8 @@
@Override
@SuppressWarnings("rawtypes")
protected void setSecondaryRecDescAndComparators(IndexType indexType, List<List<String>> secondaryKeyFields,
- List<IAType> secondaryKeyTypes, int gramLength, AqlMetadataProvider metadata) throws AlgebricksException,
- AsterixException {
+ List<IAType> secondaryKeyTypes, int gramLength, AqlMetadataProvider metadata)
+ throws AlgebricksException, AsterixException {
// Sanity checks.
if (numPrimaryKeys > 1) {
throw new AsterixException("Cannot create inverted index on dataset with composite primary key.");
@@ -250,10 +250,11 @@
AbstractOperatorDescriptor sourceOp = primaryScanOp;
if (isEnforcingKeyTypes) {
- sourceOp = createCastOp(spec, primaryScanOp, numSecondaryKeys, dataset.getDatasetType());
+ sourceOp = createCastOp(spec, dataset.getDatasetType());
spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
}
- AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, sourceOp, numSecondaryKeys, secondaryRecDesc);
+ AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, sourceOp, numSecondaryKeys,
+ secondaryRecDesc);
// If any of the secondary fields are nullable, then add a select op
// that filters nulls.
@@ -266,7 +267,8 @@
AbstractOperatorDescriptor tokenizerOp = createTokenizerOp(spec);
// Sort by token + primary keys.
- ExternalSortOperatorDescriptor sortOp = createSortOp(spec, tokenKeyPairComparatorFactories, tokenKeyPairRecDesc);
+ ExternalSortOperatorDescriptor sortOp = createSortOp(spec, tokenKeyPairComparatorFactories,
+ tokenKeyPairRecDesc);
// Create secondary inverted index bulk load op.
LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = createInvertedIndexBulkLoadOp(spec);
@@ -296,8 +298,8 @@
for (int i = 0; i < primaryKeyFields.length; i++) {
primaryKeyFields[i] = numSecondaryKeys + i;
}
- BinaryTokenizerOperatorDescriptor tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec,
- tokenKeyPairRecDesc, tokenizerFactory, docField, primaryKeyFields, isPartitioned, false);
+ BinaryTokenizerOperatorDescriptor tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec, tokenKeyPairRecDesc,
+ tokenizerFactory, docField, primaryKeyFields, isPartitioned, false);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, tokenizerOp,
primaryPartitionConstraint);
return tokenizerOp;
@@ -337,18 +339,18 @@
AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
boolean temp = dataset.getDatasetDetails().isTemp();
if (!isPartitioned) {
- return new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
- dataset.getDatasetId()), mergePolicyFactory, mergePolicyFactoryProperties,
- new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ return new LSMInvertedIndexDataflowHelperFactory(
+ new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), mergePolicyFactory,
+ mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
filterCmpFactories, secondaryFilterFields, secondaryFilterFieldsForNonBulkLoadOps,
invertedIndexFieldsForNonBulkLoadOps, !temp);
} else {
- return new PartitionedLSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
- dataset.getDatasetId()), mergePolicyFactory, mergePolicyFactoryProperties,
- new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ return new PartitionedLSMInvertedIndexDataflowHelperFactory(
+ new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), mergePolicyFactory,
+ mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
index d488d69..7c500cf 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
@@ -296,7 +296,7 @@
// Assign op.
AbstractOperatorDescriptor sourceOp = primaryScanOp;
if (isEnforcingKeyTypes) {
- sourceOp = createCastOp(spec, primaryScanOp, numSecondaryKeys, dataset.getDatasetType());
+ sourceOp = createCastOp(spec, dataset.getDatasetType());
spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
}
AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, sourceOp,
@@ -354,7 +354,7 @@
ExternalDataScanOperatorDescriptor primaryScanOp = createExternalIndexingOp(spec);
AbstractOperatorDescriptor sourceOp = primaryScanOp;
if (isEnforcingKeyTypes) {
- sourceOp = createCastOp(spec, primaryScanOp, numSecondaryKeys, dataset.getDatasetType());
+ sourceOp = createCastOp(spec, dataset.getDatasetType());
spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
}
// Assign op.
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-lojoin_with_meta-1.aql b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-lojoin_with_meta-1.aql
new file mode 100644
index 0000000..788db55
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-lojoin_with_meta-1.aql
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type AuxiliaryType as open {
+ id:int32,
+ num:int32
+}
+
+create type LineType as open {
+}
+
+create dataset Book(LineType) with meta(AuxiliaryType)
+primary key meta().id;
+
+create index NumIndex1 on Book(meta().num);
+create index NumIndex2 on Book(linenum:int32) enforced;
+create index NumIndex3 on Book(count1:int32) enforced;
+create index NumIndex4 on Book(count2:int32) enforced;
+
+for $t1 in dataset Book
+where $t1.linenum < 10
+order by $t1.linenum
+return {
+"linenum1": $t1.linenum,
+"count1":$t1.count1,
+"t2info": for $t2 in dataset Book
+ where $t1.count1 /* +indexnl */= $t2.count2
+ order by $t2.linenum
+ return {"linenum2": $t2.linenum,
+ "count2":$t2.count2}
+};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-nljoin_with_meta-1.aql b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-nljoin_with_meta-1.aql
new file mode 100644
index 0000000..447c25c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-nljoin_with_meta-1.aql
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type AuxiliaryType as open {
+ id:int32,
+ num:int32
+}
+
+create type LineType as open {
+}
+
+create dataset Book(LineType) with meta(AuxiliaryType)
+primary key meta().id;
+
+create index NumIndex on Book(lineid:int64) enforced;
+
+for $x in dataset Book
+for $y in dataset Book
+where $x.lineid /*+ indexnl */ = $y.lineid
+return {"authx":$x.author,"authy":$y.author};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-nljoin_with_meta-2.aql b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-nljoin_with_meta-2.aql
new file mode 100644
index 0000000..1545f33
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-nljoin_with_meta-2.aql
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type AuxiliaryType as open {
+ id:int32,
+ num:int32
+}
+
+create type LineType as open {
+}
+
+create type LineTypeWithNum as open {
+lineid:int64
+}
+create dataset Book1(LineType) with meta(AuxiliaryType)
+primary key meta().id;
+
+create dataset Book2(LineTypeWithNum) with meta(AuxiliaryType)
+primary key meta().id;
+
+create index NumIndex on Book1(lineid:int64) enforced;
+
+for $y in dataset Book2
+for $x in dataset Book1
+where $y.lineid /*+ indexnl */= $x.lineid
+return {"authx":$x.author,"authy":$y.author};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-nljoin_with_meta-3.aql b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-nljoin_with_meta-3.aql
new file mode 100644
index 0000000..63e105d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-nljoin_with_meta-3.aql
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type AuxiliaryType as open {
+ id:int32,
+ num:int32
+}
+
+create type LineType as open {
+}
+
+create dataset Book(LineType) with meta(AuxiliaryType)
+primary key meta().id;
+
+create index NumIndex on Book(meta().num);
+
+for $x in dataset Book
+for $y in dataset Book
+where meta($x).num /*+ indexnl */ = meta($y).num
+return {"authx":$x.author,"authy":$y.author};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-nljoin_with_meta-4.aql b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-nljoin_with_meta-4.aql
new file mode 100644
index 0000000..1fd6869
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-nljoin_with_meta-4.aql
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type AuxiliaryType as open {
+ id:int32,
+ num:int32
+}
+
+create type LineType as open {
+}
+
+create dataset Book1(LineType) with meta(AuxiliaryType)
+primary key meta().id;
+
+create dataset Book2(LineType) with meta(AuxiliaryType)
+primary key meta().id;
+
+create index NumIndex on Book1(meta().num);
+
+for $y in dataset Book2
+for $x in dataset Book1
+where meta($y).num /*+ indexnl */= meta($x).num
+return {"authx":$x.author,"authy":$y.author};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index_with_meta-1.aql b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index_with_meta-1.aql
new file mode 100644
index 0000000..bc3dc0a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index_with_meta-1.aql
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type AuxiliaryType as open {
+ id:int32,
+ num:int32
+}
+
+create type LineType as open {
+}
+
+create dataset Book(LineType) with meta(AuxiliaryType)
+primary key meta().id;
+
+create index NumIndex on Book(meta().num);
+
+for $x in dataset Book
+where meta($x).num >10
+return $x;
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index_with_meta-2.aql b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index_with_meta-2.aql
new file mode 100644
index 0000000..7f13ac3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index_with_meta-2.aql
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type AuxiliaryType as open {
+ id:int32,
+ num:int32
+}
+
+create type LineType as open {
+}
+
+create dataset Book(LineType) with meta(AuxiliaryType)
+primary key meta().id;
+
+create index NumIndex on Book(lineid:int64) enforced;
+
+for $x in dataset Book
+where $x.lineid >10
+return $x;
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-lojoin_with_meta-1.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-lojoin_with_meta-1.plan
new file mode 100644
index 0000000..0ee7293
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-lojoin_with_meta-1.plan
@@ -0,0 +1,47 @@
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- SORT_MERGE_EXCHANGE [$$18(ASC) ] |PARTITIONED|
+ -- STABLE_SORT [$$18(ASC)] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$19] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- STREAM_SELECT |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$19(ASC), $$25(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$19] |PARTITIONED|
+ -- STREAM_SELECT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$35(ASC)] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_SELECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$33(ASC)] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-nljoin_with_meta-1.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-nljoin_with_meta-1.plan
new file mode 100644
index 0000000..d65bb2a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-nljoin_with_meta-1.plan
@@ -0,0 +1,23 @@
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- STREAM_SELECT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$19(ASC)] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-nljoin_with_meta-2.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-nljoin_with_meta-2.plan
new file mode 100644
index 0000000..d65bb2a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-nljoin_with_meta-2.plan
@@ -0,0 +1,23 @@
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- STREAM_SELECT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$19(ASC)] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-nljoin_with_meta-3.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-nljoin_with_meta-3.plan
new file mode 100644
index 0000000..4d919c5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-nljoin_with_meta-3.plan
@@ -0,0 +1,23 @@
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- STREAM_SELECT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$21(ASC)] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-nljoin_with_meta-4.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-nljoin_with_meta-4.plan
new file mode 100644
index 0000000..4d919c5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-nljoin_with_meta-4.plan
@@ -0,0 +1,23 @@
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- STREAM_SELECT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$21(ASC)] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index_with_meta-1.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index_with_meta-1.plan
new file mode 100644
index 0000000..5a3c313
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index_with_meta-1.plan
@@ -0,0 +1,16 @@
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- STREAM_SELECT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$10(ASC)] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index_with_meta-2.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index_with_meta-2.plan
new file mode 100644
index 0000000..d29358f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index_with_meta-2.plan
@@ -0,0 +1,15 @@
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_SELECT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$9(ASC)] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-open-index-in-meta/change-feed-with-meta-open-index-in-meta.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-open-index-in-meta/change-feed-with-meta-open-index-in-meta.1.ddl.aql
new file mode 100644
index 0000000..3289999
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-open-index-in-meta/change-feed-with-meta-open-index-in-meta.1.ddl.aql
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a dataset with meta part and attempt to
+ * build a secondary index on an open meta field
+ * Expected Res : Failure
+ * Date : 18th Jun 2016
+ */
+
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use dataverse KeyVerse;
+
+create type DocumentType as open{
+};
+
+create type KVMetaType as open{
+"key":string,
+vbucket:int32,
+seq:int64,
+cas:int64,
+expiration:int32,
+flags:int32,
+revSeq:int64,
+lockTime:int32
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType)primary key meta()."key";
+
+create feed KVChangeStream using adapter(
+ ("type-name"="DocumentType"),
+ ("meta-type-name"="KVMetaType"),
+ ("reader"="org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory"),
+ ("parser"="record-with-metadata"),
+ ("format"="dcp"),
+ ("record-format"="json"),
+ ("change-feed"="true"),
+ ("key-indexes"="0"),
+ ("key-indicators"="1"),
+ ("num-of-records"="1000")
+);
+
+create index OpenIndex on KVStore(meta().id:int32) enforced;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-open-index-in-meta/change-feed-with-meta-open-index-in-meta.2.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-open-index-in-meta/change-feed-with-meta-open-index-in-meta.2.ddl.aql
new file mode 100644
index 0000000..dec8611
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-open-index-in-meta/change-feed-with-meta-open-index-in-meta.2.ddl.aql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a dataset with meta part and attempt to
+ * build a secondary index on an open meta field
+ * Expected Res : Failure
+ * Date : 18th Jun 2016
+ */
+
+drop dataverse KeyVerse;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.1.ddl.aql
new file mode 100644
index 0000000..37e3e90
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.1.ddl.aql
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data, ingest some records then
+ * build a secondary index after the dataset has data
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use dataverse KeyVerse;
+
+create type DocumentType as open{
+};
+
+create type KVMetaType as open{
+"key":string,
+vbucket:int32,
+seq:int64,
+cas:int64,
+expiration:int32,
+flags:int32,
+revSeq:int64,
+lockTime:int32
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType)primary key meta()."key";
+
+create feed KVChangeStream using adapter(
+ ("type-name"="DocumentType"),
+ ("meta-type-name"="KVMetaType"),
+ ("reader"="org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory"),
+ ("parser"="record-with-metadata"),
+ ("format"="dcp"),
+ ("record-format"="json"),
+ ("change-feed"="true"),
+ ("key-indexes"="0"),
+ ("key-indicators"="1"),
+ ("num-of-records"="1000")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.2.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.2.update.aql
new file mode 100644
index 0000000..be3b808
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.2.update.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data, ingest some records then
+ * build a secondary index after the dataset has data
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+set wait-for-completion-feed "true";
+connect feed KVChangeStream to dataset KVStore;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.3.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.3.ddl.aql
new file mode 100644
index 0000000..4a0b32b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.3.ddl.aql
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data, ingest some records then
+ * build a secondary index after the dataset has data
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+create index VBucketIndex on KVStore(meta().vbucket);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.4.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.4.query.aql
new file mode 100644
index 0000000..190e472
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.4.query.aql
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data, ingest some records then
+ * build a secondary index after the dataset has data
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+count(
+ for $d in dataset KVStore
+ where meta($d).vbucket = 5
+ return $d
+);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.5.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.5.ddl.aql
new file mode 100644
index 0000000..bf60e83
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.5.ddl.aql
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data, ingest some records then
+ * build a secondary index after the dataset has data
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+drop dataverse KeyVerse;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.1.ddl.aql
new file mode 100644
index 0000000..1a1acea
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.1.ddl.aql
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data
+ * build secondary index on a meta field and then test ingestion of records
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use dataverse KeyVerse;
+
+create type DocumentType as open{
+};
+
+create type KVMetaType as open{
+"key":string,
+vbucket:int32,
+seq:int64,
+cas:int64,
+expiration:int32,
+flags:int32,
+revSeq:int64,
+lockTime:int32
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType)primary key meta()."key";
+
+create feed KVChangeStream using adapter(
+ ("type-name"="DocumentType"),
+ ("meta-type-name"="KVMetaType"),
+ ("reader"="org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory"),
+ ("parser"="record-with-metadata"),
+ ("format"="dcp"),
+ ("record-format"="json"),
+ ("change-feed"="true"),
+ ("key-indexes"="0"),
+ ("key-indicators"="1"),
+ ("num-of-records"="1000")
+);
+
+create index VBucketIndex on KVStore(meta().vbucket);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.2.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.2.update.aql
new file mode 100644
index 0000000..81fffc5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.2.update.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data
+ * build secondary index on a meta field and then test ingestion of records
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+set wait-for-completion-feed "true";
+connect feed KVChangeStream to dataset KVStore;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.3.query.aql
new file mode 100644
index 0000000..59f70c8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.3.query.aql
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data
+ * build secondary index on a meta field and then test ingestion of records
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+count(
+ for $d in dataset KVStore
+ where meta($d).vbucket = 5
+ return $d
+);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.4.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.4.ddl.aql
new file mode 100644
index 0000000..b1cda6d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.4.ddl.aql
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data
+ * build secondary index on a meta field and then test ingestion of records
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+drop dataverse KeyVerse;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.1.ddl.aql
new file mode 100644
index 0000000..6d475bd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.1.ddl.aql
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data, ingest data then create an index
+ * on a field which sometimes is missing and sometimes is null
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use dataverse KeyVerse;
+
+create type DocumentType as open{
+};
+
+create type KVMetaType as open{
+"key":string,
+vbucket:int32,
+seq:int64,
+cas:int64,
+expiration:int32,
+flags:int32,
+revSeq:int64,
+lockTime:int32
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType)primary key meta()."key";
+
+create feed KVChangeStream using adapter(
+ ("type-name"="DocumentType"),
+ ("meta-type-name"="KVMetaType"),
+ ("reader"="org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory"),
+ ("parser"="record-with-metadata"),
+ ("format"="dcp"),
+ ("record-format"="json"),
+ ("change-feed"="true"),
+ ("key-indexes"="0"),
+ ("key-indicators"="1"),
+ ("num-of-records"="1000")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.2.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.2.update.aql
new file mode 100644
index 0000000..37d67d0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.2.update.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data, ingest data then create an index
+ * on a field which sometimes is missing and sometimes is null
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+set wait-for-completion-feed "true";
+connect feed KVChangeStream to dataset KVStore;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.3.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.3.ddl.aql
new file mode 100644
index 0000000..5bfc09b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.3.ddl.aql
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data, ingest data then create an index
+ * on a field which sometimes is missing and sometimes is null
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+create index VBucketIndex on KVStore(exp:int32) enforced;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.4.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.4.query.aql
new file mode 100644
index 0000000..883be64
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.4.query.aql
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data, ingest data then create an index
+ * on a field which sometimes is missing and sometimes is null
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+count(
+ for $d in dataset KVStore
+ where $d.exp = 15
+ return $d
+);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.5.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.5.ddl.aql
new file mode 100644
index 0000000..2f043ba
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.5.ddl.aql
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data, ingest data then create an index
+ * on a field which sometimes is missing and sometimes is null
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+drop dataverse KeyVerse;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.1.ddl.aql
new file mode 100644
index 0000000..0ae91fa
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.1.ddl.aql
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data, create an index
+ * on an open field in the record part then ingest data
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use dataverse KeyVerse;
+
+create type DocumentType as open{
+};
+
+create type KVMetaType as open{
+"key":string,
+vbucket:int32,
+seq:int64,
+cas:int64,
+expiration:int32,
+flags:int32,
+revSeq:int64,
+lockTime:int32
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType)primary key meta()."key";
+
+create feed KVChangeStream using adapter(
+ ("type-name"="DocumentType"),
+ ("meta-type-name"="KVMetaType"),
+ ("reader"="org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory"),
+ ("parser"="record-with-metadata"),
+ ("format"="dcp"),
+ ("record-format"="json"),
+ ("change-feed"="true"),
+ ("key-indexes"="0"),
+ ("key-indicators"="1"),
+ ("num-of-records"="1000")
+);
+
+create index OpenIndex on KVStore(id:int32) enforced;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.2.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.2.update.aql
new file mode 100644
index 0000000..ae5390b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.2.update.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data, create an index
+ * on an open field in the record part then ingest data
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+set wait-for-completion-feed "true";
+connect feed KVChangeStream to dataset KVStore;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.3.query.aql
new file mode 100644
index 0000000..5e9b3f4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.3.query.aql
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data, create an index
+ * on an open field in the record part then ingest data
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+for $d in dataset KVStore
+where $d.id = 5
+return $d;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.4.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.4.ddl.aql
new file mode 100644
index 0000000..004cebe
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.4.ddl.aql
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data, create an index
+ * on an open field in the record part then ingest data
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+drop dataverse KeyVerse;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.1.ddl.aql
new file mode 100644
index 0000000..3cf9374
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.1.ddl.aql
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data, create an index
+ * on a field which sometimes is missing and sometimes is null and then ingest data
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use dataverse KeyVerse;
+
+create type DocumentType as open{
+};
+
+create type KVMetaType as open{
+"key":string,
+vbucket:int32,
+seq:int64,
+cas:int64,
+expiration:int32,
+flags:int32,
+revSeq:int64,
+lockTime:int32
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType)primary key meta()."key";
+
+create feed KVChangeStream using adapter(
+ ("type-name"="DocumentType"),
+ ("meta-type-name"="KVMetaType"),
+ ("reader"="org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory"),
+ ("parser"="record-with-metadata"),
+ ("format"="dcp"),
+ ("record-format"="json"),
+ ("change-feed"="true"),
+ ("key-indexes"="0"),
+ ("key-indicators"="1"),
+ ("num-of-records"="1000")
+);
+
+create index OpenIndex on KVStore(exp:int32) enforced;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.2.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.2.update.aql
new file mode 100644
index 0000000..9b34e70
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.2.update.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data, create an index
+ * on a field which sometimes is missing and sometimes is null and then ingest data
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+set wait-for-completion-feed "true";
+connect feed KVChangeStream to dataset KVStore;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.3.query.aql
new file mode 100644
index 0000000..8c887f1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.3.query.aql
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data, create an index
+ * on a field which sometimes is missing and sometimes is null and then ingest data
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+for $d in dataset KVStore
+where $d.exp = 15
+return $d;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.4.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.4.ddl.aql
new file mode 100644
index 0000000..2fcc497
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.4.ddl.aql
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data, create an index
+ * on a field which sometimes is missing and sometimes is null and then ingest data
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+drop dataverse KeyVerse;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.1.ddl.aql
new file mode 100644
index 0000000..2471f79
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.1.ddl.aql
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data, create a composite index
+ * on a field from meta and a field from the value and then ingest data
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use dataverse KeyVerse;
+
+create type DocumentType as open{
+id:int32
+};
+
+create type KVMetaType as open{
+"key":string,
+vbucket:int32,
+seq:int64,
+cas:int64,
+expiration:int32,
+flags:int32,
+revSeq:int64,
+lockTime:int32
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType)primary key meta()."key";
+
+create feed KVChangeStream using adapter(
+ ("type-name"="DocumentType"),
+ ("meta-type-name"="KVMetaType"),
+ ("reader"="org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory"),
+ ("parser"="record-with-metadata"),
+ ("format"="dcp"),
+ ("record-format"="json"),
+ ("change-feed"="true"),
+ ("key-indexes"="0"),
+ ("key-indicators"="1"),
+ ("num-of-records"="1000")
+);
+
+create index MixedIndex on KVStore(meta().vbucket,id);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.2.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.2.update.aql
new file mode 100644
index 0000000..9fa0c3d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.2.update.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data, create a composite index
+ * on a field from meta and a field from the value and then ingest data
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+set wait-for-completion-feed "true";
+connect feed KVChangeStream to dataset KVStore;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.3.query.aql
new file mode 100644
index 0000000..5089770
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.3.query.aql
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data, create a composite index
+ * on a field from meta and a field from the value and then ingest data
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+for $d in dataset KVStore
+where $d.id = 5
+and meta($d).vbucket<100
+return $d;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.4.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.4.ddl.aql
new file mode 100644
index 0000000..8f688b1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.4.ddl.aql
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data, create a composite index
+ * on a field from meta and a field from the value and then ingest data
+ * Expected Res : Success
+ * Date : 18th Jun 2016
+ */
+drop dataverse KeyVerse;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.4.adm
new file mode 100644
index 0000000..56a6051
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.4.adm
@@ -0,0 +1 @@
+1
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.4.adm
new file mode 100644
index 0000000..56a6051
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.4.adm
@@ -0,0 +1 @@
+1
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.4.adm
new file mode 100644
index 0000000..56a6051
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.4.adm
@@ -0,0 +1 @@
+1
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.4.adm
new file mode 100644
index 0000000..77d1727
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.4.adm
@@ -0,0 +1 @@
+{ "id": 5, "name": "Ian Maxon", "exp": 15 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.4.adm
new file mode 100644
index 0000000..77d1727
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.4.adm
@@ -0,0 +1 @@
+{ "id": 5, "name": "Ian Maxon", "exp": 15 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.3.adm
new file mode 100644
index 0000000..8402b39
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.3.adm
@@ -0,0 +1 @@
+{ "id": 5i32, "name": "Ian Maxon", "exp": 15 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index be663e3..3907677 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -92,6 +92,42 @@
</test-group>
<test-group name="feeds">
<test-case FilePath="feeds">
+ <compilation-unit name="change-feed-with-meta-with-mixed-index">
+ <output-dir compare="Text">change-feed-with-meta-with-mixed-index</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
+ <compilation-unit name="change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest">
+ <output-dir compare="Text">change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
+ <compilation-unit name="change-feed-with-meta-pk-in-meta-open-index-with-missing">
+ <output-dir compare="Text">change-feed-with-meta-pk-in-meta-open-index-with-missing</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
+ <compilation-unit name="change-feed-with-meta-open-index-in-meta">
+ <output-dir compare="Text">change-feed-with-meta-open-index-in-meta</output-dir>
+ <expected-error>Typed open index can only be created on the record part</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
+ <compilation-unit name="change-feed-with-meta-pk-in-meta-open-index-in-value">
+ <output-dir compare="Text">change-feed-with-meta-pk-in-meta-open-index-in-value</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
+ <compilation-unit name="change-feed-with-meta-pk-in-meta-index-after-ingest">
+ <output-dir compare="Text">change-feed-with-meta-pk-in-meta-index-after-ingest</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
+ <compilation-unit name="change-feed-with-meta-pk-in-meta-index-in-meta">
+ <output-dir compare="Text">change-feed-with-meta-pk-in-meta-index-in-meta</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
<compilation-unit name="drop-nonexistent-feed">
<output-dir compare="Text">drop-nonexistent-feed</output-dir>
</compilation-unit>
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
index f7fe77f..57f37da 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
@@ -153,8 +153,21 @@
// reset the string
strBuilder.setLength(0);
strBuilder.append("{\"id\":" + (counter + upsertCounter) + ",\"name\":\""
- + names[(counter + upsertCounter) % names.length] + "\",\"exp\":" + ((counter + upsertCounter) * 3)
- + "}");
+ + names[(counter + upsertCounter) % names.length] + "\"");
+ switch (counter % 3) {
+ case 0:
+ // Missing
+ break;
+ case 1:
+ strBuilder.append(",\"exp\":null");
+ break;
+ case 2:
+ strBuilder.append(",\"exp\":" + ((counter + upsertCounter) * 3));
+ break;
+ default:
+ break;
+ }
+ strBuilder.append("}");
byteBuff.clear();
byteBuff.writeBytes(strBuilder.toString().getBytes(StandardCharsets.UTF_8));
}
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateIndexStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateIndexStatement.java
index d3d681a..44c249f 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateIndexStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateIndexStatement.java
@@ -123,6 +123,17 @@
return Kind.CREATE_INDEX;
}
+ public boolean hasMetaField() {
+ if (fieldIndexIndicators != null) {
+ for (Integer indicator : fieldIndexIndicators) {
+ if (indicator.intValue() != 0) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
@Override
public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
return visitor.visit(this, arg);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java
index c13ee0e..d809f4f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java
@@ -209,19 +209,12 @@
}
public LogicalVariable getDataRecordVariable(List<LogicalVariable> dataScanVariables) {
- if (hasMeta()) {
- return dataScanVariables.get(dataScanVariables.size() - 2);
- } else {
- return dataScanVariables.get(dataScanVariables.size() - 1);
- }
+ return hasMeta() ? dataScanVariables.get(dataScanVariables.size() - 2)
+ : dataScanVariables.get(dataScanVariables.size() - 1);
}
public List<LogicalVariable> getPrimaryKeyVariables(List<LogicalVariable> dataScanVariables) {
- if (hasMeta()) {
- return new ArrayList<>(dataScanVariables.subList(0, dataScanVariables.size() - 2));
- } else {
- return new ArrayList<>(dataScanVariables.subList(0, dataScanVariables.size() - 1));
- }
+ return new ArrayList<>(dataScanVariables.subList(0, dataScanVariables.size() - (hasMeta() ? 2 : 1)));
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index f2e0424..7557b79 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -581,6 +581,14 @@
ARecordType itemType =
(ARecordType) this.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
+ ARecordType metaType = null;
+ List<Integer> primaryKeyIndicators = null;
+ if (dataset.hasMetaPart()) {
+ metaType =
+ (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
+ primaryKeyIndicators = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator();
+ }
+
ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
itemType, context.getBinaryComparatorFactoryProvider());
@@ -598,7 +606,9 @@
Pair<IBinaryComparatorFactory[], ITypeTraits[]> comparatorFactoriesAndTypeTraits =
getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(secondaryIndex.getIndexType(),
secondaryIndex.getKeyFieldNames(), secondaryIndex.getKeyFieldTypes(),
- DatasetUtils.getPartitioningKeys(dataset), itemType, dataset.getDatasetType());
+ DatasetUtils.getPartitioningKeys(dataset), itemType, dataset.getDatasetType(),
+ dataset.hasMetaPart(), primaryKeyIndicators,
+ secondaryIndex.getKeyFieldSourceIndicators(), metaType);
comparatorFactories = comparatorFactoriesAndTypeTraits.first;
typeTraits = comparatorFactoriesAndTypeTraits.second;
if (filterTypeTraits != null) {
@@ -618,7 +628,7 @@
// get meta item type
ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
- comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, itemType,
+ comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, itemType, metaItemType,
context.getBinaryComparatorFactoryProvider());
filterFields = DatasetUtils.createFilterFields(dataset);
btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
@@ -699,7 +709,9 @@
private Pair<IBinaryComparatorFactory[], ITypeTraits[]> getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(
IndexType indexType, List<List<String>> sidxKeyFieldNames, List<IAType> sidxKeyFieldTypes,
- List<List<String>> pidxKeyFieldNames, ARecordType recType, DatasetType dsType) throws AlgebricksException {
+ List<List<String>> pidxKeyFieldNames, ARecordType recType, DatasetType dsType, boolean hasMeta,
+ List<Integer> primaryIndexKeyIndicators, List<Integer> secondaryIndexIndicators, ARecordType metaType)
+ throws AlgebricksException {
IBinaryComparatorFactory[] comparatorFactories;
ITypeTraits[] typeTraits;
@@ -711,7 +723,8 @@
int i = 0;
for (; i < sidxKeyFieldCount; ++i) {
Pair<IAType, Boolean> keyPairType =
- Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i), sidxKeyFieldNames.get(i), recType);
+ Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i), sidxKeyFieldNames.get(i),
+ (hasMeta && secondaryIndexIndicators.get(i).intValue() == 1) ? metaType : recType);
IAType keyType = keyPairType.first;
comparatorFactories[i] =
AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
@@ -723,7 +736,9 @@
try {
switch (dsType) {
case INTERNAL:
- keyType = recType.getSubFieldType(pidxKeyFieldNames.get(j));
+ keyType = (hasMeta && primaryIndexKeyIndicators.get(j).intValue() == 1)
+ ? metaType.getSubFieldType(pidxKeyFieldNames.get(j))
+ : recType.getSubFieldType(pidxKeyFieldNames.get(j));
break;
case EXTERNAL:
keyType = IndexingConstants.getFieldType(j);
@@ -794,9 +809,14 @@
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), dataset.getDatasetName(),
indexName, temp);
+ ARecordType metaType = null;
+ if (dataset.hasMetaPart()) {
+ metaType =
+ (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
+ }
IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
- dataset, recType, context.getBinaryComparatorFactoryProvider());
+ dataset, recType, metaType, context.getBinaryComparatorFactoryProvider());
int[] btreeFields = new int[primaryComparatorFactories.length];
for (int i = 0; i < btreeFields.length; i++) {
btreeFields[i] = i + numNestedSecondaryKeyFields;
@@ -1015,12 +1035,18 @@
dataset.getDatasetName(), dataset.getDatasetName());
String indexName = primaryIndex.getIndexName();
+ ARecordType metaType = null;
+ if (dataset.hasMetaPart()) {
+ metaType =
+ (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
+ }
+
String itemTypeName = dataset.getItemTypeName();
ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName).getDatatype();
ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, null);
IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
- itemType, context.getBinaryComparatorFactoryProvider());
+ itemType, metaType, context.getBinaryComparatorFactoryProvider());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), datasetName,
@@ -1111,7 +1137,7 @@
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
- itemType, context.getBinaryComparatorFactoryProvider());
+ itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), datasetName,
indexName, temp);
@@ -1749,11 +1775,12 @@
numTokenFields = secondaryKeys.size() + 1;
}
+ ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields];
ITypeTraits[] invListsTypeTraits = new ITypeTraits[primaryKeys.size()];
IBinaryComparatorFactory[] tokenComparatorFactories = new IBinaryComparatorFactory[numTokenFields];
IBinaryComparatorFactory[] invListComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
- dataset, recType, context.getBinaryComparatorFactoryProvider());
+ dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider());
IAType secondaryKeyType = null;
@@ -1936,9 +1963,9 @@
typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
++i;
}
-
+ ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
- dataset, recType, context.getBinaryComparatorFactoryProvider());
+ dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider());
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp);
@@ -2276,7 +2303,7 @@
ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
- itemType, context.getBinaryComparatorFactoryProvider());
+ itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), datasetName,
indexName, temp);
@@ -2315,17 +2342,27 @@
btreeFields, filterFields, !temp);
AsterixLSMTreeUpsertOperatorDescriptor op;
- ITypeTraits[] outputTypeTraits = new ITypeTraits[recordDesc.getFieldCount() + 1 + numFilterFields];
+ ITypeTraits[] outputTypeTraits =
+ new ITypeTraits[recordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
ISerializerDeserializer[] outputSerDes =
- new ISerializerDeserializer[recordDesc.getFieldCount() + 1 + numFilterFields];
+ new ISerializerDeserializer[recordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1)
+ + numFilterFields];
for (int j = 0; j < recordDesc.getFieldCount(); j++) {
outputTypeTraits[j] = recordDesc.getTypeTraits()[j];
outputSerDes[j] = recordDesc.getFields()[j];
}
- outputSerDes[outputSerDes.length - 1 - numFilterFields] =
+ outputSerDes[outputSerDes.length - (dataset.hasMetaPart() ? 2 : 1) - numFilterFields] =
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
- outputTypeTraits[outputTypeTraits.length - 1 - numFilterFields] =
+ outputTypeTraits[outputTypeTraits.length - (dataset.hasMetaPart() ? 2 : 1) - numFilterFields] =
FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(itemType);
+
+ if (dataset.hasMetaPart()) {
+ outputSerDes[outputSerDes.length - 1 - numFilterFields] =
+ FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType);
+ outputTypeTraits[outputTypeTraits.length - 1 - numFilterFields] =
+ FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
+ }
+
int fieldIdx = -1;
if (numFilterFields > 0) {
String filterField = DatasetUtils.getFilterField(dataset).get(0);
@@ -2531,11 +2568,12 @@
numTokenFields = secondaryKeys.size() + 1;
}
+ ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields];
ITypeTraits[] invListsTypeTraits = new ITypeTraits[primaryKeys.size()];
IBinaryComparatorFactory[] tokenComparatorFactories = new IBinaryComparatorFactory[numTokenFields];
IBinaryComparatorFactory[] invListComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
- dataset, recType, context.getBinaryComparatorFactoryProvider());
+ dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider());
IAType secondaryKeyType = null;
@@ -2736,8 +2774,9 @@
++i;
}
+ ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
- dataset, recType, context.getBinaryComparatorFactoryProvider());
+ dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider());
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
index 949e6ca..bdf9ed0 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
@@ -238,4 +238,15 @@
}
return dataverseName.compareTo(otherIndex.getDataverseName());
}
+
+ public boolean hasMetaFields() {
+ if (keyFieldSourceIndicators != null) {
+ for (Integer indicator : keyFieldSourceIndicators) {
+ if (indicator.intValue() != 0) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
index 0ac4f56..d0af744 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
@@ -59,7 +59,8 @@
public class DatasetUtils {
public static IBinaryComparatorFactory[] computeKeysBinaryComparatorFactories(Dataset dataset, ARecordType itemType,
- IBinaryComparatorFactoryProvider comparatorFactoryProvider) throws AlgebricksException {
+ ARecordType metaItemType, IBinaryComparatorFactoryProvider comparatorFactoryProvider)
+ throws AlgebricksException {
List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[partitioningKeys.size()];
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
@@ -72,8 +73,11 @@
}
}
} else {
+ InternalDatasetDetails dsd = (InternalDatasetDetails) dataset.getDatasetDetails();
for (int i = 0; i < partitioningKeys.size(); i++) {
- IAType keyType = itemType.getSubFieldType(partitioningKeys.get(i));
+ IAType keyType = (dataset.hasMetaPart() && dsd.getKeySourceIndicator().get(i).intValue() == 1)
+ ? metaItemType.getSubFieldType(partitioningKeys.get(i))
+ : itemType.getSubFieldType(partitioningKeys.get(i));
bcfs[i] = comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true);
}
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/KeyFieldTypeUtils.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/KeyFieldTypeUtils.java
index 60ca42f..108cd33 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/KeyFieldTypeUtils.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/KeyFieldTypeUtils.java
@@ -204,6 +204,6 @@
*/
public static ARecordType chooseSource(List<Integer> keySourceIndicators, int index, ARecordType recordType,
ARecordType metaRecordType) {
- return keySourceIndicators.get(0) == 0 ? recordType : metaRecordType;
+ return keySourceIndicators.get(index) == 0 ? recordType : metaRecordType;
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastRecordDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastRecordDescriptor.java
index 8f21ffc..81e132a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastRecordDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastRecordDescriptor.java
@@ -39,6 +39,9 @@
public class CastRecordDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+ private CastRecordDescriptor() {
+ }
+
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@Override
public IFunctionDescriptor createFunctionDescriptor() {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
index 24988d4..857f1e2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
@@ -74,29 +74,31 @@
private int presetFieldIndex = -1;
private ARecordPointable recPointable;
private DataOutput prevDos;
+ private final boolean hasMeta;
public AsterixLSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, int numOfPrimaryKeys,
ARecordType recordType, int filterFieldIndex) {
super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, IndexOperation.UPSERT);
- // initialize missingWriter
- this.missingWriter = opDesc.getMissingWriterFactory().createMissingWriter();
- // The search key should only have the primary index and not use the permutations.
this.key = new PermutingFrameTupleReference();
+ this.numOfPrimaryKeys = numOfPrimaryKeys;
+ missingWriter = opDesc.getMissingWriterFactory().createMissingWriter();
int[] searchKeyPermutations = new int[numOfPrimaryKeys];
for (int i = 0; i < searchKeyPermutations.length; i++) {
searchKeyPermutations[i] = fieldPermutation[i];
}
key.setFieldPermutation(searchKeyPermutations);
- this.numOfPrimaryKeys = numOfPrimaryKeys;
+ hasMeta = (fieldPermutation.length > numOfPrimaryKeys + 1) && (filterFieldIndex < 0
+ || (filterFieldIndex >= 0 && (fieldPermutation.length > numOfPrimaryKeys + 2)));
if (filterFieldIndex >= 0) {
isFiltered = true;
this.recordType = recordType;
this.presetFieldIndex = filterFieldIndex;
this.recPointable = (ARecordPointable) ARecordPointable.FACTORY.createPointable();
- this.prevRecWithPKWithFilterValue = new ArrayTupleBuilder(fieldPermutation.length);
+ this.prevRecWithPKWithFilterValue = new ArrayTupleBuilder(fieldPermutation.length + (hasMeta ? 1 : 0));
this.prevDos = prevRecWithPKWithFilterValue.getDataOutput();
}
+
}
// we have the permutation which has [pk locations, record location, optional:filter-location]
@@ -129,7 +131,6 @@
modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
indexHelper.getResourcePath(), indexHelper.getResourceID(), indexHelper.getResourcePartition(),
index, ctx, this);
-
indexAccessor = index.createAccessor(modCallback, opDesc.getSearchOpCallbackFactory()
.createSearchOperationCallback(indexHelper.getResourceID(), ctx, this));
cursor = indexAccessor.createSearchCursor(false);
@@ -161,14 +162,24 @@
dos.write(prevTuple.getFieldData(numOfPrimaryKeys), prevTuple.getFieldStart(numOfPrimaryKeys),
prevTuple.getFieldLength(numOfPrimaryKeys));
tb.addFieldEndOffset();
- // if with filters, append the filter
- if (isFiltered) {
+ // if has meta, then append meta
+ if (hasMeta) {
dos.write(prevTuple.getFieldData(numOfPrimaryKeys + 1), prevTuple.getFieldStart(numOfPrimaryKeys + 1),
prevTuple.getFieldLength(numOfPrimaryKeys + 1));
tb.addFieldEndOffset();
}
+ // if with filters, append the filter
+ if (isFiltered) {
+ dos.write(prevTuple.getFieldData(numOfPrimaryKeys + (hasMeta ? 2 : 1)),
+ prevTuple.getFieldStart(numOfPrimaryKeys + (hasMeta ? 2 : 1)),
+ prevTuple.getFieldLength(numOfPrimaryKeys + (hasMeta ? 2 : 1)));
+ tb.addFieldEndOffset();
+ }
} else {
addNullField();
+ if (hasMeta) {
+ addNullField();
+ }
// if with filters, append null
if (isFiltered) {
addNullField();
@@ -191,7 +202,6 @@
//TODO: use tryDelete/tryInsert in order to prevent deadlocks
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-
accessor.reset(buffer);
LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessor;
int tupleCount = accessor.getTupleCount();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
index c06cc18..d47492c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
@@ -49,16 +49,19 @@
// used for upsert operations
private List<Mutable<ILogicalExpression>> prevSecondaryKeyExprs;
private Mutable<ILogicalExpression> prevAdditionalFilteringExpression;
+ private final int numberOfAdditionalNonFilteringFields;
public IndexInsertDeleteUpsertOperator(IDataSourceIndex<?, ?> dataSourceIndex,
List<Mutable<ILogicalExpression>> primaryKeyExprs, List<Mutable<ILogicalExpression>> secondaryKeyExprs,
- Mutable<ILogicalExpression> filterExpr, Kind operation, boolean bulkload) {
+ Mutable<ILogicalExpression> filterExpr, Kind operation, boolean bulkload,
+ int numberOfAdditionalNonFilteringFields) {
this.dataSourceIndex = dataSourceIndex;
this.primaryKeyExprs = primaryKeyExprs;
this.secondaryKeyExprs = secondaryKeyExprs;
this.filterExpr = filterExpr;
this.operation = operation;
this.bulkload = bulkload;
+ this.numberOfAdditionalNonFilteringFields = numberOfAdditionalNonFilteringFields;
}
@Override
@@ -181,4 +184,8 @@
public void setPrevAdditionalFilteringExpression(Mutable<ILogicalExpression> prevAdditionalFilteringExpression) {
this.prevAdditionalFilteringExpression = prevAdditionalFilteringExpression;
}
+
+ public int getNumberOfAdditionalNonFilteringFields() {
+ return numberOfAdditionalNonFilteringFields;
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
index 6a74eb9..7d6c299 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
@@ -56,6 +56,9 @@
// previous filter (for UPSERT)
private LogicalVariable prevFilterVar;
private Object prevFilterType;
+ // previous additional fields (for UPSERT)
+ private List<LogicalVariable> prevAdditionalNonFilteringVars;
+ private List<Object> prevAdditionalNonFilteringTypes;
public InsertDeleteUpsertOperator(IDataSource<?> dataSource, Mutable<ILogicalExpression> payloadExpr,
List<Mutable<ILogicalExpression>> primaryKeyExprs,
@@ -85,6 +88,9 @@
if (operation == Kind.UPSERT) {
// The upsert case also produces the previous record
schema.add(prevRecordVar);
+ if (additionalNonFilteringExpressions != null) {
+ schema.addAll(prevAdditionalNonFilteringVars);
+ }
if (prevFilterVar != null) {
schema.add(prevFilterVar);
}
@@ -95,6 +101,9 @@
if (prevRecordVar != null) {
producedVariables.add(prevRecordVar);
}
+ if (prevAdditionalNonFilteringVars != null) {
+ producedVariables.addAll(prevAdditionalNonFilteringVars);
+ }
if (prevFilterVar != null) {
producedVariables.add(prevFilterVar);
}
@@ -140,6 +149,11 @@
target.addAllVariables(sources[0]);
if (operation == Kind.UPSERT) {
target.addVariable(prevRecordVar);
+ if (prevAdditionalNonFilteringVars != null) {
+ for (LogicalVariable var : prevAdditionalNonFilteringVars) {
+ target.addVariable(var);
+ }
+ }
if (prevFilterVar != null) {
target.addVariable(prevFilterVar);
}
@@ -158,6 +172,11 @@
PropagatingTypeEnvironment env = createPropagatingAllInputsTypeEnvironment(ctx);
if (operation == Kind.UPSERT) {
env.setVarType(prevRecordVar, prevRecordType);
+ if (prevAdditionalNonFilteringVars != null) {
+ for (int i = 0; i < prevAdditionalNonFilteringVars.size(); i++) {
+ env.setVarType(prevAdditionalNonFilteringVars.get(i), prevAdditionalNonFilteringTypes.get(i));
+ }
+ }
if (prevFilterVar != null) {
env.setVarType(prevFilterVar, prevFilterType);
}
@@ -224,4 +243,20 @@
public void setPrevFilterType(Object prevFilterType) {
this.prevFilterType = prevFilterType;
}
+
+ public List<LogicalVariable> getPrevAdditionalNonFilteringVars() {
+ return prevAdditionalNonFilteringVars;
+ }
+
+ public void setPrevAdditionalNonFilteringVars(List<LogicalVariable> prevAdditionalNonFilteringVars) {
+ this.prevAdditionalNonFilteringVars = prevAdditionalNonFilteringVars;
+ }
+
+ public List<Object> getPrevAdditionalNonFilteringTypes() {
+ return prevAdditionalNonFilteringTypes;
+ }
+
+ public void setPrevAdditionalNonFilteringTypes(List<Object> prevAdditionalNonFilteringTypes) {
+ this.prevAdditionalNonFilteringTypes = prevAdditionalNonFilteringTypes;
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 2450c6c..99123b3 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -74,8 +74,8 @@
@Override
public ILogicalOperator visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException {
- ArrayList<LogicalVariable> newList = new ArrayList<LogicalVariable>();
- ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+ ArrayList<LogicalVariable> newList = new ArrayList<>();
+ ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>();
newList.addAll(op.getVariables());
deepCopyExpressionRefs(newExpressions, op.getExpressions());
return new AggregateOperator(newList, newExpressions);
@@ -84,8 +84,8 @@
@Override
public ILogicalOperator visitRunningAggregateOperator(RunningAggregateOperator op, Void arg)
throws AlgebricksException {
- ArrayList<LogicalVariable> newList = new ArrayList<LogicalVariable>();
- ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+ ArrayList<LogicalVariable> newList = new ArrayList<>();
+ ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>();
newList.addAll(op.getVariables());
deepCopyExpressionRefs(newExpressions, op.getExpressions());
return new RunningAggregateOperator(newList, newExpressions);
@@ -99,16 +99,14 @@
@Override
public ILogicalOperator visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
- List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
- List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> decoList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
- ArrayList<ILogicalPlan> newSubplans = new ArrayList<ILogicalPlan>();
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<>();
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> decoList = new ArrayList<>();
+ ArrayList<ILogicalPlan> newSubplans = new ArrayList<>();
for (Pair<LogicalVariable, Mutable<ILogicalExpression>> pair : op.getGroupByList()) {
- groupByList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(pair.first,
- deepCopyExpressionRef(pair.second)));
+ groupByList.add(new Pair<>(pair.first, deepCopyExpressionRef(pair.second)));
}
for (Pair<LogicalVariable, Mutable<ILogicalExpression>> pair : op.getDecorList()) {
- decoList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(pair.first,
- deepCopyExpressionRef(pair.second)));
+ decoList.add(new Pair<>(pair.first, deepCopyExpressionRef(pair.second)));
}
GroupByOperator gbyOp = new GroupByOperator(groupByList, decoList, newSubplans);
for (ILogicalPlan plan : op.getNestedPlans()) {
@@ -148,8 +146,8 @@
@Override
public ILogicalOperator visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException {
- ArrayList<LogicalVariable> newList = new ArrayList<LogicalVariable>();
- ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+ ArrayList<LogicalVariable> newList = new ArrayList<>();
+ ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>();
newList.addAll(op.getVariables());
deepCopyExpressionRefs(newExpressions, op.getExpressions());
return new AssignOperator(newList, newExpressions);
@@ -163,7 +161,7 @@
@Override
public ILogicalOperator visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
- ArrayList<LogicalVariable> newList = new ArrayList<LogicalVariable>();
+ ArrayList<LogicalVariable> newList = new ArrayList<>();
newList.addAll(op.getVariables());
return new ProjectOperator(newList);
}
@@ -171,7 +169,7 @@
@Override
public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg)
throws AlgebricksException {
- ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+ ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>();
deepCopyExpressionRefs(newExpressions, op.getExpressions());
return new PartitioningSplitOperator(newExpressions, op.getDefaultBranchIndex());
}
@@ -183,8 +181,8 @@
@Override
public ILogicalOperator visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException {
- ArrayList<LogicalVariable> newInputList = new ArrayList<LogicalVariable>();
- ArrayList<LogicalVariable> newOutputList = new ArrayList<LogicalVariable>();
+ ArrayList<LogicalVariable> newInputList = new ArrayList<>();
+ ArrayList<LogicalVariable> newOutputList = new ArrayList<>();
newInputList.addAll(op.getInputVariables());
newOutputList.addAll(op.getOutputVariables());
return new ScriptOperator(op.getScriptDescription(), newInputList, newOutputList);
@@ -192,7 +190,7 @@
@Override
public ILogicalOperator visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
- ArrayList<ILogicalPlan> newSubplans = new ArrayList<ILogicalPlan>();
+ ArrayList<ILogicalPlan> newSubplans = new ArrayList<>();
SubplanOperator subplanOp = new SubplanOperator(newSubplans);
for (ILogicalPlan plan : op.getNestedPlans()) {
newSubplans.add(OperatorManipulationUtil.deepCopy(plan, subplanOp));
@@ -202,11 +200,10 @@
@Override
public ILogicalOperator visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException {
- List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> newVarMap = new ArrayList<Triple<LogicalVariable, LogicalVariable, LogicalVariable>>();
+ List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> newVarMap = new ArrayList<>();
List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varMap = op.getVariableMappings();
for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> triple : varMap) {
- newVarMap.add(new Triple<LogicalVariable, LogicalVariable, LogicalVariable>(triple.first, triple.second,
- triple.third));
+ newVarMap.add(new Triple<>(triple.first, triple.second, triple.third));
}
return new UnionAllOperator(newVarMap);
}
@@ -229,31 +226,31 @@
@Override
public ILogicalOperator visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
- ArrayList<LogicalVariable> newInputList = new ArrayList<LogicalVariable>();
+ ArrayList<LogicalVariable> newInputList = new ArrayList<>();
newInputList.addAll(op.getVariables());
return new UnnestMapOperator(newInputList, deepCopyExpressionRef(op.getExpressionRef()),
- new ArrayList<Object>(op.getVariableTypes()), op.propagatesInput());
+ new ArrayList<>(op.getVariableTypes()), op.propagatesInput());
}
@Override
public ILogicalOperator visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Void arg)
throws AlgebricksException {
- ArrayList<LogicalVariable> newInputList = new ArrayList<LogicalVariable>();
+ ArrayList<LogicalVariable> newInputList = new ArrayList<>();
newInputList.addAll(op.getVariables());
return new LeftOuterUnnestMapOperator(newInputList, deepCopyExpressionRef(op.getExpressionRef()),
- new ArrayList<Object>(op.getVariableTypes()), op.propagatesInput());
+ new ArrayList<>(op.getVariableTypes()), op.propagatesInput());
}
@Override
public ILogicalOperator visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
- ArrayList<LogicalVariable> newInputList = new ArrayList<LogicalVariable>();
+ ArrayList<LogicalVariable> newInputList = new ArrayList<>();
newInputList.addAll(op.getVariables());
return new DataSourceScanOperator(newInputList, op.getDataSource());
}
@Override
public ILogicalOperator visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
- ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+ ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>();
deepCopyExpressionRefs(newExpressions, op.getExpressions());
return new DistinctOperator(newExpressions);
}
@@ -265,7 +262,7 @@
@Override
public ILogicalOperator visitWriteOperator(WriteOperator op, Void arg) throws AlgebricksException {
- ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+ ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>();
deepCopyExpressionRefs(newExpressions, op.getExpressions());
return new WriteOperator(newExpressions, op.getDataSink());
}
@@ -273,16 +270,16 @@
@Override
public ILogicalOperator visitDistributeResultOperator(DistributeResultOperator op, Void arg)
throws AlgebricksException {
- ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+ ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>();
deepCopyExpressionRefs(newExpressions, op.getExpressions());
return new DistributeResultOperator(newExpressions, op.getDataSink());
}
@Override
public ILogicalOperator visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException {
- ArrayList<Mutable<ILogicalExpression>> newKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+ ArrayList<Mutable<ILogicalExpression>> newKeyExpressions = new ArrayList<>();
deepCopyExpressionRefs(newKeyExpressions, op.getKeyExpressions());
- List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+ List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<>();
deepCopyExpressionRefs(newKeyExpressions, op.getAdditionalFilteringExpressions());
WriteResultOperator writeResultOp = new WriteResultOperator(op.getDataSource(),
deepCopyExpressionRef(op.getPayloadExpression()), newKeyExpressions);
@@ -293,13 +290,13 @@
@Override
public ILogicalOperator visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg)
throws AlgebricksException {
- List<Mutable<ILogicalExpression>> newKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+ List<Mutable<ILogicalExpression>> newKeyExpressions = new ArrayList<>();
deepCopyExpressionRefs(newKeyExpressions, op.getPrimaryKeyExpressions());
- List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+ List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<>();
deepCopyExpressionRefs(newKeyExpressions, op.getAdditionalFilteringExpressions());
- InsertDeleteUpsertOperator insertDeleteOp = new InsertDeleteUpsertOperator(op.getDataSource(),
- deepCopyExpressionRef(op.getPayloadExpression()), newKeyExpressions, op.getOperation(),
- op.isBulkload());
+ InsertDeleteUpsertOperator insertDeleteOp =
+ new InsertDeleteUpsertOperator(op.getDataSource(), deepCopyExpressionRef(op.getPayloadExpression()),
+ newKeyExpressions, op.getOperation(), op.isBulkload());
insertDeleteOp.setAdditionalFilteringExpressions(newLSMComponentFilterExpressions);
return insertDeleteOp;
}
@@ -307,32 +304,32 @@
@Override
public ILogicalOperator visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg)
throws AlgebricksException {
- List<Mutable<ILogicalExpression>> newPrimaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+ List<Mutable<ILogicalExpression>> newPrimaryKeyExpressions = new ArrayList<>();
deepCopyExpressionRefs(newPrimaryKeyExpressions, op.getPrimaryKeyExpressions());
- List<Mutable<ILogicalExpression>> newSecondaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+ List<Mutable<ILogicalExpression>> newSecondaryKeyExpressions = new ArrayList<>();
deepCopyExpressionRefs(newSecondaryKeyExpressions, op.getSecondaryKeyExpressions());
- Mutable<ILogicalExpression> newFilterExpression = new MutableObject<ILogicalExpression>(
- ((AbstractLogicalExpression) op.getFilterExpression()).cloneExpression());
- List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+ Mutable<ILogicalExpression> newFilterExpression =
+ new MutableObject<>(((AbstractLogicalExpression) op.getFilterExpression()).cloneExpression());
+ List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<>();
deepCopyExpressionRefs(newLSMComponentFilterExpressions, op.getAdditionalFilteringExpressions());
IndexInsertDeleteUpsertOperator indexInsertDeleteOp = new IndexInsertDeleteUpsertOperator(
op.getDataSourceIndex(), newPrimaryKeyExpressions, newSecondaryKeyExpressions, newFilterExpression,
- op.getOperation(), op.isBulkload());
+ op.getOperation(), op.isBulkload(), op.getNumberOfAdditionalNonFilteringFields());
indexInsertDeleteOp.setAdditionalFilteringExpressions(newLSMComponentFilterExpressions);
return indexInsertDeleteOp;
}
@Override
public ILogicalOperator visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
- List<Mutable<ILogicalExpression>> newPrimaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+ List<Mutable<ILogicalExpression>> newPrimaryKeyExpressions = new ArrayList<>();
deepCopyExpressionRefs(newPrimaryKeyExpressions, op.getPrimaryKeyExpressions());
- List<Mutable<ILogicalExpression>> newSecondaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+ List<Mutable<ILogicalExpression>> newSecondaryKeyExpressions = new ArrayList<>();
deepCopyExpressionRefs(newSecondaryKeyExpressions, op.getSecondaryKeyExpressions());
- List<LogicalVariable> newTokenizeVars = new ArrayList<LogicalVariable>();
+ List<LogicalVariable> newTokenizeVars = new ArrayList<>();
deepCopyVars(newTokenizeVars, op.getTokenizeVars());
- Mutable<ILogicalExpression> newFilterExpression = new MutableObject<ILogicalExpression>(
- ((AbstractLogicalExpression) op.getFilterExpression()).cloneExpression());
- List<Object> newTokenizeVarTypes = new ArrayList<Object>();
+ Mutable<ILogicalExpression> newFilterExpression =
+ new MutableObject<>(((AbstractLogicalExpression) op.getFilterExpression()).cloneExpression());
+ List<Object> newTokenizeVarTypes = new ArrayList<>();
deepCopyObjects(newTokenizeVarTypes, op.getTokenizeVarTypes());
TokenizeOperator tokenizeOp = new TokenizeOperator(op.getDataSourceIndex(), newPrimaryKeyExpressions,
@@ -349,17 +346,16 @@
private void deepCopyExpressionRefs(List<Mutable<ILogicalExpression>> newExprs,
List<Mutable<ILogicalExpression>> oldExprs) {
for (Mutable<ILogicalExpression> oldExpr : oldExprs) {
- newExprs.add(new MutableObject<ILogicalExpression>(
- ((AbstractLogicalExpression) oldExpr.getValue()).cloneExpression()));
+ newExprs.add(new MutableObject<>(((AbstractLogicalExpression) oldExpr.getValue()).cloneExpression()));
}
}
private Mutable<ILogicalExpression> deepCopyExpressionRef(Mutable<ILogicalExpression> oldExprRef) {
ILogicalExpression oldExpr = oldExprRef.getValue();
if (oldExpr == null) {
- return new MutableObject<ILogicalExpression>(null);
+ return new MutableObject<>(null);
}
- return new MutableObject<ILogicalExpression>(oldExpr.cloneExpression());
+ return new MutableObject<>(oldExpr.cloneExpression());
}
private List<LogicalVariable> deepCopyVars(List<LogicalVariable> newVars, List<LogicalVariable> oldVars) {
@@ -376,12 +372,11 @@
return newObjs;
}
- private List<Pair<IOrder, Mutable<ILogicalExpression>>> deepCopyOrderAndExpression(
- List<Pair<IOrder, Mutable<ILogicalExpression>>> ordersAndExprs) {
- List<Pair<IOrder, Mutable<ILogicalExpression>>> newOrdersAndExprs = new ArrayList<Pair<IOrder, Mutable<ILogicalExpression>>>();
+ private List<Pair<IOrder, Mutable<ILogicalExpression>>>
+ deepCopyOrderAndExpression(List<Pair<IOrder, Mutable<ILogicalExpression>>> ordersAndExprs) {
+ List<Pair<IOrder, Mutable<ILogicalExpression>>> newOrdersAndExprs = new ArrayList<>();
for (Pair<IOrder, Mutable<ILogicalExpression>> pair : ordersAndExprs) {
- newOrdersAndExprs
- .add(new Pair<IOrder, Mutable<ILogicalExpression>>(pair.first, deepCopyExpressionRef(pair.second)));
+ newOrdersAndExprs.add(new Pair<>(pair.first, deepCopyExpressionRef(pair.second)));
}
return newOrdersAndExprs;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
index f29fd6f..ce86e58 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
@@ -56,11 +56,12 @@
private final List<LogicalVariable> additionalFilteringKeys;
private final List<LogicalVariable> prevSecondaryKeys;
private final LogicalVariable prevAdditionalFilteringKey;
+ private final int numOfAdditionalNonFilteringFields;
public IndexInsertDeleteUpsertPOperator(List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
List<LogicalVariable> additionalFilteringKeys, Mutable<ILogicalExpression> filterExpr,
IDataSourceIndex<?, ?> dataSourceIndex, List<LogicalVariable> prevSecondaryKeys,
- LogicalVariable prevAdditionalFilteringKey) {
+ LogicalVariable prevAdditionalFilteringKey, int numOfAdditionalNonFilteringFields) {
this.primaryKeys = primaryKeys;
this.secondaryKeys = secondaryKeys;
if (filterExpr != null) {
@@ -72,6 +73,7 @@
this.additionalFilteringKeys = additionalFilteringKeys;
this.prevSecondaryKeys = prevSecondaryKeys;
this.prevAdditionalFilteringKey = prevAdditionalFilteringKey;
+ this.numOfAdditionalNonFilteringFields = numOfAdditionalNonFilteringFields;
}
@Override
@@ -91,6 +93,9 @@
List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
scanVariables.addAll(primaryKeys);
scanVariables.add(new LogicalVariable(-1));
+ for (int i = 0; i < numOfAdditionalNonFilteringFields; i++) {
+ scanVariables.add(new LogicalVariable(-1));
+ }
IPhysicalPropertiesVector r = dataSourceIndex.getDataSource().getPropertiesProvider()
.computePropertiesVector(scanVariables);
r.getLocalProperties().clear();
@@ -103,7 +108,7 @@
@Override
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
- throws AlgebricksException {
+ throws AlgebricksException {
IndexInsertDeleteUpsertOperator insertDeleteUpsertOp = (IndexInsertDeleteUpsertOperator) op;
IMetadataProvider mp = context.getMetadataProvider();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
index 0bc683c..3c9cddf 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
@@ -83,8 +83,7 @@
IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
scanVariables.addAll(keys);
- // Why do we add $$-1 and not the payLoad variable?
- scanVariables.add(new LogicalVariable(-1));
+ scanVariables.add(payload);
if (additionalNonFilteringFields != null) {
scanVariables.addAll(additionalNonFilteringFields);
}
@@ -99,7 +98,7 @@
@Override
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
- throws AlgebricksException {
+ throws AlgebricksException {
InsertDeleteUpsertOperator insertDeleteOp = (InsertDeleteUpsertOperator) op;
IMetadataProvider mp = context.getMetadataProvider();
IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index f1f6217..e85c35c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -45,11 +45,11 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -378,15 +378,19 @@
throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
String header = getIndexOpString(op.getOperation());
- addIndent(buffer, indent).append(header).append(op.getDataSource()).append(" from ")
+ addIndent(buffer, indent).append(header).append(op.getDataSource()).append(" from record: ")
.append(op.getPayloadExpression().getValue().accept(exprVisitor, indent));
if (op.getAdditionalNonFilteringExpressions() != null) {
+ buffer.append(", meta: ");
pprintExprList(op.getAdditionalNonFilteringExpressions(), buffer, indent);
}
buffer.append(" partitioned by ");
pprintExprList(op.getPrimaryKeyExpressions(), buffer, indent);
if (op.getOperation() == Kind.UPSERT) {
- buffer.append(" out: ([record-before-upsert:" + op.getPrevRecordVar() + "]) ");
+ buffer.append(" out: ([record-before-upsert:" + op.getPrevRecordVar()
+ + ((op.getPrevAdditionalNonFilteringVars() != null)
+ ? (", additional-before-upsert: " + op.getPrevAdditionalNonFilteringVars()) : "")
+ + "]) ");
}
if (op.isBulkload()) {
buffer.append(" [bulkload]");
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 6d3692f..8701851 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -352,7 +352,8 @@
}
op.setPhysicalOperator(new IndexInsertDeleteUpsertPOperator(primaryKeys, secondaryKeys,
additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex(),
- prevSecondaryKeys, prevAdditionalFilteringKey));
+ prevSecondaryKeys, prevAdditionalFilteringKey,
+ opInsDel.getNumberOfAdditionalNonFilteringFields()));
}
break;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/TypeAwareTupleReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/TypeAwareTupleReference.java
index c6a0035..27a767f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/TypeAwareTupleReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/TypeAwareTupleReference.java
@@ -19,14 +19,13 @@
package org.apache.hyracks.storage.am.common.tuples;
-import static org.apache.hyracks.util.encoding.VarLenIntEncoderDecoder.VarLenIntDecoder;
-
import java.nio.ByteBuffer;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference;
import org.apache.hyracks.util.encoding.VarLenIntEncoderDecoder;
+import org.apache.hyracks.util.encoding.VarLenIntEncoderDecoder.VarLenIntDecoder;
public class TypeAwareTupleReference implements ITreeIndexTupleReference {
protected ByteBuffer buf;
@@ -36,7 +35,7 @@
protected int nullFlagsBytes;
protected int dataStartOff;
- protected ITypeTraits[] typeTraits;
+ protected final ITypeTraits[] typeTraits;
protected VarLenIntDecoder encDec = VarLenIntEncoderDecoder.createDecoder();
protected int[] decodedFieldSlots;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
index 653c451..f62f70c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
@@ -184,7 +184,7 @@
}
ComparableFileName cmpBTreeFileName = null;
ComparableFileName cmpBloomFilterFileName = null;
- while (btreeFileIter.hasNext() && bloomFilterFileIter.hasNext()) {
+ while (btreeFileIter.hasNext() && (hasBloomFilter ? bloomFilterFileIter.hasNext() : true)) {
cmpBTreeFileName = btreeFileIter.next();
if (hasBloomFilter) {
cmpBloomFilterFileName = bloomFilterFileIter.next();
diff --git a/pom.xml b/pom.xml
index 131ef02..786ce46 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,4 +44,35 @@
<module>hyracks-fullstack</module>
<module>asterixdb</module>
</modules>
+ <build>
+ <pluginManagement>
+ <plugins>
+ <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <versionRange>[2.17,)</versionRange>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
</project>