fixed an abort issue appeared from RecoveryIT test
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 8301f5d..ce06e47 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -999,7 +999,9 @@
// generate field permutations
int[] fieldPermutation = new int[numKeys];
int[] bloomFilterKeyFields = new int[secondaryKeys.size()];
+ int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
int i = 0;
+ int j = 0;
for (LogicalVariable varKey : secondaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
@@ -1009,7 +1011,9 @@
for (LogicalVariable varKey : primaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
+ modificationCallbackPrimaryKeyFields[j] = i;
i++;
+ j++;
}
Dataset dataset = findDataset(dataverseName, datasetName);
@@ -1059,16 +1063,9 @@
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
int datasetId = dataset.getDatasetId();
- int[] primaryKeyFields = new int[primaryKeys.size()];
- i = 0;
- for (LogicalVariable varKey : primaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- primaryKeyFields[i] = idx;
- i++;
- }
TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
- jobId, datasetId, primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
+ jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
dataset, mdTxnCtx);
@@ -1108,7 +1105,9 @@
int numKeys = primaryKeys.size() + secondaryKeys.size();
// generate field permutations
int[] fieldPermutation = new int[numKeys];
+ int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
int i = 0;
+ int j = 0;
for (LogicalVariable varKey : secondaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
@@ -1117,7 +1116,9 @@
for (LogicalVariable varKey : primaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
+ modificationCallbackPrimaryKeyFields[j] = i;
i++;
+ j++;
}
boolean isPartitioned;
@@ -1188,16 +1189,9 @@
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
int datasetId = dataset.getDatasetId();
- int[] primaryKeyFields = new int[primaryKeys.size()];
- i = 0;
- for (LogicalVariable varKey : primaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- primaryKeyFields[i] = idx;
- i++;
- }
TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
- jobId, datasetId, primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_INVERTED_INDEX);
+ jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_INVERTED_INDEX);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
dataset, mdTxnCtx);
@@ -1244,7 +1238,9 @@
ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
int[] fieldPermutation = new int[numKeys];
+ int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
int i = 0;
+ int j = 0;
for (LogicalVariable varKey : secondaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
@@ -1254,7 +1250,9 @@
for (LogicalVariable varKey : primaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
+ modificationCallbackPrimaryKeyFields[j] = i;
i++;
+ j++;
}
IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numSecondaryKeys];
@@ -1282,16 +1280,9 @@
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
int datasetId = dataset.getDatasetId();
- int[] primaryKeyFields = new int[numPrimaryKeys];
- i = 0;
- for (LogicalVariable varKey : primaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- primaryKeyFields[i] = idx;
- i++;
- }
TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
- jobId, datasetId, primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE);
+ jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
dataset, mdTxnCtx);