[ASTERIXDB-3144][HYR] Make index search runtime support multiple partitions
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
This patch changes the index search runtime to support
operating on multiple partitions. With this change, an index
search node pushable will read from multiple indexes
representing multiple partitions. This is a step towards
achieving compute/storage separation.
Change-Id: Iea8418bdfbca2db9cc5f0aa23c2434f3779e8531
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17444
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Al Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index a8dcb1f..a7a3838 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -84,7 +84,6 @@
private final List<LogicalVariable> highKeyVarList;
private final boolean isPrimaryIndex;
private final boolean isEqCondition;
- private Object implConfig;
public BTreeSearchPOperator(IDataSourceIndex<String, DataSourceId> idx, INodeDomain domain,
boolean requiresBroadcast, boolean isPrimaryIndex, boolean isEqCondition,
@@ -96,14 +95,6 @@
this.highKeyVarList = highKeyVarList;
}
- public void setImplConfig(Object implConfig) {
- this.implConfig = implConfig;
- }
-
- public Object getImplConfig() {
- return implConfig;
- }
-
@Override
public PhysicalOperatorTag getOperatorTag() {
return PhysicalOperatorTag.BTREE_SEARCH;
@@ -177,7 +168,8 @@
jobGenParams.isLowKeyInclusive(), jobGenParams.isHighKeyInclusive(), propagateFilter,
nonFilterWriterFactory, minFilterFieldIndexes, maxFilterFieldIndexes, tupleFilterFactory, outputLimit,
unnestMap.getGenerateCallBackProceedResultVar(),
- isPrimaryIndexPointSearch(op, context.getPhysicalOptimizationConfig()), tupleProjectorFactory);
+ useBatchPointSearch(op, context.getPhysicalOptimizationConfig()), tupleProjectorFactory,
+ isPrimaryIndexPointSearch());
IOperatorDescriptor opDesc = btreeSearch.first;
opDesc.setSourceLocation(unnestMap.getSourceLocation());
@@ -188,18 +180,20 @@
builder.contributeGraphEdge(srcExchange, 0, unnestMap, 0);
}
- /**
- * Check whether we can use {@link LSMBTreeBatchPointSearchCursor} to perform point-lookups on the primary index
- */
- private boolean isPrimaryIndexPointSearch(ILogicalOperator op, PhysicalOptimizationConfig config) {
- if (!config.isBatchLookupEnabled() || !isEqCondition || !isPrimaryIndex
- || !lowKeyVarList.equals(highKeyVarList)) {
+ private boolean isPrimaryIndexPointSearch() {
+ if (!isEqCondition || !isPrimaryIndex || !lowKeyVarList.equals(highKeyVarList)) {
return false;
}
Index searchIndex = ((DataSourceIndex) idx).getIndex();
int numberOfKeyFields = ((Index.ValueIndexDetails) searchIndex.getIndexDetails()).getKeyFieldNames().size();
+ return lowKeyVarList.size() == numberOfKeyFields && highKeyVarList.size() == numberOfKeyFields;
+ }
- if (lowKeyVarList.size() != numberOfKeyFields || highKeyVarList.size() != numberOfKeyFields) {
+ /**
+ * Check whether we can use {@link LSMBTreeBatchPointSearchCursor} to perform point-lookups on the primary index
+ */
+ private boolean useBatchPointSearch(ILogicalOperator op, PhysicalOptimizationConfig config) {
+ if (!config.isBatchLookupEnabled() || !isPrimaryIndexPointSearch()) {
return false;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
index 047a6d5..f962142 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
@@ -107,7 +107,7 @@
throws AlgebricksException {
return metadataProvider.buildBtreeRuntime(jobSpec, opSchema, typeEnv, context, true, false, null, ds, indexName,
null, null, true, true, false, null, null, null, tupleFilterFactory, outputLimit, false, false,
- DefaultTupleProjectorFactory.INSTANCE);
+ DefaultTupleProjectorFactory.INSTANCE, false);
}
@Override
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index dbcbce0..c77e032 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -166,7 +166,7 @@
return metadataProvider.buildBtreeRuntime(jobSpec, opSchema, typeEnv, context, true, false, null,
((DatasetDataSource) dataSource).getDataset(), primaryIndex.getIndexName(), null, null, true,
true, false, null, minFilterFieldIndexes, maxFilterFieldIndexes, tupleFilterFactory,
- outputLimit, false, false, tupleProjectorFactory);
+ outputLimit, false, false, tupleProjectorFactory, false);
default:
throw new AlgebricksException("Unknown datasource type");
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 5508644..3692a8d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -105,6 +105,7 @@
import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
import org.apache.asterix.runtime.operators.LSMSecondaryUpsertWithNestedPlanOperatorDescriptor;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -134,10 +135,12 @@
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
import org.apache.hyracks.api.dataflow.value.IResultSerializerFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.IWarningCollector;
@@ -149,6 +152,7 @@
import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.data.std.primitive.ShortPointable;
import org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
@@ -541,8 +545,8 @@
int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
boolean propagateFilter, IMissingWriterFactory nonFilterWriterFactory, int[] minFilterFieldIndexes,
int[] maxFilterFieldIndexes, ITupleFilterFactory tupleFilterFactory, long outputLimit,
- boolean isIndexOnlyPlan, boolean isPrimaryIndexPointSearch, ITupleProjectorFactory tupleProjectorFactory)
- throws AlgebricksException {
+ boolean isIndexOnlyPlan, boolean isPrimaryIndexPointSearch, ITupleProjectorFactory tupleProjectorFactory,
+ boolean partitionInputTuples) throws AlgebricksException {
boolean isSecondary = true;
Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), dataset.getDatasetName());
@@ -602,24 +606,46 @@
IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(storageManager, spPc.first);
BTreeSearchOperatorDescriptor btreeSearchOp;
+ int numPartitions;
+ if (spPc.second.getPartitionConstraintType() == AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
+ numPartitions = ((AlgebricksCountPartitionConstraint) spPc.second).getCount();
+ } else {
+ numPartitions = ((AlgebricksAbsolutePartitionConstraint) spPc.second).getLocations().length;
+ }
+ int[][] partitionsMap = getPartitionsMap(numPartitions);
+ ITuplePartitionerFactory tuplePartitionerFactory = null;
+ if (partitionInputTuples) {
+ IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
+ tuplePartitionerFactory = new FieldHashPartitionerFactory(lowKeyFields, pkHashFunFactories, numPartitions);
+ }
+
if (dataset.getDatasetType() == DatasetType.INTERNAL) {
btreeSearchOp = !isSecondary && isPrimaryIndexPointSearch
? new LSMBTreeBatchPointSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields,
highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput,
retainMissing, nonMatchWriterFactory, searchCallbackFactory, minFilterFieldIndexes,
- maxFilterFieldIndexes, tupleFilterFactory, outputLimit, tupleProjectorFactory)
+ maxFilterFieldIndexes, tupleFilterFactory, outputLimit, tupleProjectorFactory,
+ tuplePartitionerFactory, partitionsMap)
: new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields, highKeyFields,
lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput, retainMissing,
nonMatchWriterFactory, searchCallbackFactory, minFilterFieldIndexes, maxFilterFieldIndexes,
propagateFilter, nonFilterWriterFactory, tupleFilterFactory, outputLimit,
proceedIndexOnlyPlan, failValueForIndexOnlyPlan, successValueForIndexOnlyPlan,
- tupleProjectorFactory);
+ tupleProjectorFactory, tuplePartitionerFactory, partitionsMap);
} else {
btreeSearchOp = null;
}
return new Pair<>(btreeSearchOp, spPc.second);
}
+ private static int[][] getPartitionsMap(int numPartitions) {
+ int[][] map = new int[numPartitions][1];
+ for (int i = 0; i < numPartitions; i++) {
+ map[i] = new int[] { i };
+ }
+ return map;
+ }
+
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(JobSpecification jobSpec,
List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
JobGenContext context, boolean retainInput, boolean retainMissing,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
index a2a0f19..708c2c2 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
@@ -64,7 +64,7 @@
throws AlgebricksException {
return metadataProvider.buildBtreeRuntime(jobSpec, opSchema, typeEnv, context, true, false, null, dataset,
sampleIndexName, null, null, true, true, false, null, null, null, tupleFilterFactory, outputLimit,
- false, false, DefaultTupleProjectorFactory.INSTANCE);
+ false, false, DefaultTupleProjectorFactory.INSTANCE, false);
}
@Override
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 79bf3e8..b5dd8bf 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -404,10 +404,10 @@
IRecoveryManager.ResourceType.LSM_BTREE);
IndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
metadataProvider.getStorageComponentProvider().getStorageManager(), primaryFileSplitProvider);
- BTreeSearchOperatorDescriptor primarySearchOp =
- new BTreeSearchOperatorDescriptor(spec, dataset.getPrimaryRecordDescriptor(metadataProvider),
- lowKeyFields, highKeyFields, true, true, indexHelperFactory, false, false, null,
- searchCallbackFactory, null, null, false, null, null, -1, false, null, null, projectorFactory);
+ BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec,
+ dataset.getPrimaryRecordDescriptor(metadataProvider), lowKeyFields, highKeyFields, true, true,
+ indexHelperFactory, false, false, null, searchCallbackFactory, null, null, false, null, null, -1, false,
+ null, null, projectorFactory, null, null);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
primaryPartitionConstraint);
return primarySearchOp;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitioner.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitioner.java
new file mode 100644
index 0000000..5e527dc
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitioner.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow.value;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface ITuplePartitioner {
+
+ /**
+ * For the tuple (located at tIndex in the frame), it determines which partition the tuple belongs to.
+ *
+ * @param accessor The accessor of the frame to access tuples
+ * @param tIndex The index of the tuple in consideration
+ * @return The partition number that the tuple belongs to
+ * @throws HyracksDataException
+ */
+ int partition(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionerFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionerFactory.java
new file mode 100644
index 0000000..df64d06
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionerFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public interface ITuplePartitionerFactory extends Serializable {
+
+ ITuplePartitioner createPartitioner(IHyracksTaskContext ctx);
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java
new file mode 100644
index 0000000..31a959f
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.data.partition;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FieldHashPartitionComputer extends HashPartitioner implements ITuplePartitionComputer {
+
+ public FieldHashPartitionComputer(int[] hashFields, IBinaryHashFunction[] hashFunctions) {
+ super(hashFields, hashFunctions);
+ }
+
+ @Override
+ public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+ return super.partition(accessor, tIndex, nParts);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
index ab5ab01..52df3b7 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
@@ -18,13 +18,11 @@
*/
package org.apache.hyracks.dataflow.common.data.partition;
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
public class FieldHashPartitionComputerFactory implements ITuplePartitionComputerFactory {
private static final long serialVersionUID = 1L;
@@ -37,34 +35,11 @@
}
@Override
- public ITuplePartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext) {
+ public ITuplePartitionComputer createPartitioner(IHyracksTaskContext ctx) {
final IBinaryHashFunction[] hashFunctions = new IBinaryHashFunction[hashFunctionFactories.length];
for (int i = 0; i < hashFunctionFactories.length; ++i) {
hashFunctions[i] = hashFunctionFactories[i].createBinaryHashFunction();
}
- return new ITuplePartitionComputer() {
- @Override
- public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
- if (nParts == 1) {
- return 0;
- }
- int h = 0;
- int startOffset = accessor.getTupleStartOffset(tIndex);
- int slotLength = accessor.getFieldSlotsLength();
- for (int j = 0; j < hashFields.length; ++j) {
- int fIdx = hashFields[j];
- IBinaryHashFunction hashFn = hashFunctions[j];
- int fStart = accessor.getFieldStartOffset(tIndex, fIdx);
- int fEnd = accessor.getFieldEndOffset(tIndex, fIdx);
- int fh = hashFn.hash(accessor.getBuffer().array(), startOffset + slotLength + fStart,
- fEnd - fStart);
- h = h * 31 + fh;
- }
- if (h < 0) {
- h = -(h + 1);
- }
- return h % nParts;
- }
- };
+ return new FieldHashPartitionComputer(hashFields, hashFunctions);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java
new file mode 100644
index 0000000..5620a95
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.data.partition;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitioner;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FieldHashPartitioner extends HashPartitioner implements ITuplePartitioner {
+
+ private final int numPartitions;
+
+ public FieldHashPartitioner(int[] hashFields, IBinaryHashFunction[] hashFunctions, int numPartitions) {
+ super(hashFields, hashFunctions);
+ this.numPartitions = numPartitions;
+ }
+
+ @Override
+ public int partition(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ return partition(accessor, tIndex, numPartitions);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionerFactory.java
new file mode 100644
index 0000000..fb62fd8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionerFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.data.partition;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitioner;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
+
+public class FieldHashPartitionerFactory implements ITuplePartitionerFactory {
+
+ private static final long serialVersionUID = 1L;
+ private final int[] hashFields;
+ private final IBinaryHashFunctionFactory[] hashFunctionFactories;
+ private final int numPartitions;
+
+ public FieldHashPartitionerFactory(int[] hashFields, IBinaryHashFunctionFactory[] hashFunctionFactories,
+ int numPartitions) {
+ this.hashFields = hashFields;
+ this.hashFunctionFactories = hashFunctionFactories;
+ this.numPartitions = numPartitions;
+ }
+
+ @Override
+ public ITuplePartitioner createPartitioner(IHyracksTaskContext ctx) {
+ final IBinaryHashFunction[] hashFunctions = new IBinaryHashFunction[hashFunctionFactories.length];
+ for (int i = 0; i < hashFunctionFactories.length; ++i) {
+ hashFunctions[i] = hashFunctionFactories[i].createBinaryHashFunction();
+ }
+ return new FieldHashPartitioner(hashFields, hashFunctions, numPartitions);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java
new file mode 100644
index 0000000..b09bcb8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.data.partition;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+class HashPartitioner {
+
+ private final int[] hashFields;
+ private final IBinaryHashFunction[] hashFunctions;
+
+ public HashPartitioner(int[] hashFields, IBinaryHashFunction[] hashFunctions) {
+ this.hashFields = hashFields;
+ this.hashFunctions = hashFunctions;
+ }
+
+ protected int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+ if (nParts == 1) {
+ return 0;
+ }
+ int h = 0;
+ int startOffset = accessor.getTupleStartOffset(tIndex);
+ int slotLength = accessor.getFieldSlotsLength();
+ for (int j = 0; j < hashFields.length; ++j) {
+ int fIdx = hashFields[j];
+ IBinaryHashFunction hashFn = hashFunctions[j];
+ int fStart = accessor.getFieldStartOffset(tIndex, fIdx);
+ int fEnd = accessor.getFieldEndOffset(tIndex, fIdx);
+ int fh = hashFn.hash(accessor.getBuffer().array(), startOffset + slotLength + fStart, fEnd - fStart);
+ h = h * 31 + fh;
+ }
+ if (h < 0) {
+ h = -(h + 1);
+ }
+ return h % nParts;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
index 0ab88a5..1c961c5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -34,7 +35,7 @@
public class BTreeSearchOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 2L;
+ private static final long serialVersionUID = 3L;
protected final int[] lowKeyFields;
protected final int[] highKeyFields;
@@ -55,6 +56,8 @@
protected final ITupleFilterFactory tupleFilterFactory;
protected final long outputLimit;
protected final ITupleProjectorFactory tupleProjectorFactory;
+ protected final ITuplePartitionerFactory tuplePartitionerFactory;
+ protected final int[][] map;
public BTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
@@ -65,7 +68,7 @@
this(spec, outRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory,
retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, minFilterFieldIndexes,
maxFilterFieldIndexes, appendIndexFilter, nonFilterWriterFactory, null, -1, false, null, null,
- DefaultTupleProjectorFactory.INSTANCE);
+ DefaultTupleProjectorFactory.INSTANCE, null, null);
}
public BTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
@@ -75,7 +78,8 @@
int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, boolean appendIndexFilter,
IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory tupleFilterFactory, long outputLimit,
boolean appendOpCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
- byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory tupleProjectorFactory) {
+ byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory tupleProjectorFactory,
+ ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) {
super(spec, 1, 1);
this.indexHelperFactory = indexHelperFactory;
this.retainInput = retainInput;
@@ -97,6 +101,8 @@
this.searchCallbackProceedResultFalseValue = searchCallbackProceedResultFalseValue;
this.searchCallbackProceedResultTrueValue = searchCallbackProceedResultTrueValue;
this.tupleProjectorFactory = tupleProjectorFactory;
+ this.tuplePartitionerFactory = tuplePartitionerFactory;
+ this.map = map;
}
@Override
@@ -107,7 +113,8 @@
lowKeyInclusive, highKeyInclusive, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter,
nonFilterWriterFactory, tupleFilterFactory, outputLimit, appendOpCallbackProceedResult,
- searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue, tupleProjectorFactory);
+ searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue, tupleProjectorFactory,
+ tuplePartitionerFactory, map);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index a56e305..24163ea 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -20,6 +20,7 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.HyracksConstants;
@@ -32,6 +33,7 @@
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.IndexSearchOperatorNodePushable;
import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
+import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
@@ -55,7 +57,7 @@
this(ctx, partition, inputRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive,
minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory, retainInput, retainMissing,
nonMatchWriterFactory, searchCallbackFactory, appendIndexFilter, nonFilterWriterFactory, null, -1,
- false, null, null, DefaultTupleProjectorFactory.INSTANCE);
+ false, null, null, DefaultTupleProjectorFactory.INSTANCE, null, null);
}
public BTreeSearchOperatorNodePushable(IHyracksTaskContext ctx, int partition, RecordDescriptor inputRecDesc,
@@ -65,12 +67,13 @@
ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter,
IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory tupleFilterFactory, long outputLimit,
boolean appendOpCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
- byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory projectorFactory)
- throws HyracksDataException {
+ byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory projectorFactory,
+ ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) throws HyracksDataException {
super(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
retainInput, retainMissing, nonMatchWriterFactory, searchCallbackFactory, appendIndexFilter,
nonFilterWriterFactory, tupleFilterFactory, outputLimit, appendOpCallbackProceedResult,
- searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue, projectorFactory);
+ searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue, projectorFactory,
+ tuplePartitionerFactory, map);
this.lowKeyInclusive = lowKeyInclusive;
this.highKeyInclusive = highKeyInclusive;
if (lowKeyFields != null && lowKeyFields.length > 0) {
@@ -100,7 +103,7 @@
}
@Override
- protected ISearchPredicate createSearchPredicate() {
+ protected ISearchPredicate createSearchPredicate(IIndex index) {
ITreeIndex treeIndex = (ITreeIndex) index;
lowKeySearchCmp = BTreeUtils.getSearchMultiComparator(treeIndex.getComparatorFactories(), lowKey);
highKeySearchCmp = BTreeUtils.getSearchMultiComparator(treeIndex.getComparatorFactories(), highKey);
@@ -109,7 +112,7 @@
}
@Override
- protected int getFieldCount() {
+ protected int getFieldCount(IIndex index) {
return ((ITreeIndex) index).getFieldCount();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
index b42337c..a2487d3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
@@ -230,7 +230,7 @@
}
System.out.println("Number of passed tests: " + successes);
System.out.println("Number of failed tests: " + failures);
- Assert.assertEquals(failures, 0);
+ Assert.assertEquals(0, failures);
}
private void testBTreeSearchOperatorNodePushable() throws Exception {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml
index 9d869e9..9bdbf3c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml
@@ -113,5 +113,9 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil-core</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 4fc8057..c6cdd66 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -27,6 +27,8 @@
import org.apache.hyracks.api.dataflow.IIntrospectingOperator;
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitioner;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.profiling.IOperatorStats;
@@ -46,7 +48,6 @@
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ITupleFilter;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
-import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.tuples.ReferenceFrameTupleReference;
@@ -64,22 +65,24 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable
implements IIntrospectingOperator {
static final Logger LOGGER = LogManager.getLogger();
protected final IHyracksTaskContext ctx;
- protected final IIndexDataflowHelper indexHelper;
protected FrameTupleAccessor accessor;
-
protected FrameTupleAppender appender;
protected ArrayTupleBuilder tb;
protected DataOutput dos;
- protected IIndex index;
protected ISearchPredicate searchPred;
- protected IIndexCursor cursor;
- protected IIndexAccessor indexAccessor;
+ protected final IIndexDataflowHelper[] indexHelpers;
+ protected IIndex[] indexes;
+ protected IIndexAccessor[] indexAccessors;
+ protected IIndexCursor[] cursors;
protected final RecordDescriptor inputRecDesc;
protected final boolean retainInput;
@@ -114,28 +117,31 @@
protected long outputCount = 0;
protected boolean finished;
protected final ITupleProjector tupleProjector;
-
- // no filter and limit pushdown
- public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition,
- int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory,
- boolean retainInput, boolean retainMissing, IMissingWriterFactory nonMatchWriterFactory,
- ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter,
- IMissingWriterFactory nonFilterWriterFactory) throws HyracksDataException {
- this(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
- retainInput, retainMissing, nonMatchWriterFactory, searchCallbackFactory, appendIndexFilter,
- nonFilterWriterFactory, null, -1, false, null, null, DefaultTupleProjectorFactory.INSTANCE);
- }
+ protected final ITuplePartitioner tuplePartitioner;
+ protected final int[] partitions;
+ private final Int2IntMap storagePartitionId2Index = new Int2IntOpenHashMap();
public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition,
int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory,
boolean retainInput, boolean retainMissing, IMissingWriterFactory nonMatchWriterFactory,
ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter,
- IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory tupleFactoryFactory, long outputLimit,
+ IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory tupleFilterFactory, long outputLimit,
boolean appendSearchCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
- byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory projectorFactory)
- throws HyracksDataException {
+ byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory projectorFactory,
+ ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) throws HyracksDataException {
this.ctx = ctx;
- this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
+ this.appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
+ this.partitions = map != null ? map[partition] : new int[] { partition };
+ for (int i = 0; i < partitions.length; i++) {
+ storagePartitionId2Index.put(partitions[i], i);
+ }
+ this.indexHelpers = new IIndexDataflowHelper[partitions.length];
+ this.indexes = new IIndex[partitions.length];
+ this.indexAccessors = new IIndexAccessor[partitions.length];
+ this.cursors = new IIndexCursor[partitions.length];
+ for (int i = 0; i < partitions.length; i++) {
+ indexHelpers[i] = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partitions[i]);
+ }
this.retainInput = retainInput;
this.retainMissing = retainMissing;
this.appendIndexFilter = appendIndexFilter;
@@ -160,7 +166,7 @@
this.appendSearchCallbackProceedResult = appendSearchCallbackProceedResult;
this.searchCallbackProceedResultFalseValue = searchCallbackProceedResultFalseValue;
this.searchCallbackProceedResultTrueValue = searchCallbackProceedResultTrueValue;
- this.tupleFilterFactory = tupleFactoryFactory;
+ this.tupleFilterFactory = tupleFilterFactory;
this.outputLimit = outputLimit;
this.stats = new NoOpOperatorStats();
@@ -169,30 +175,43 @@
}
tupleProjector = projectorFactory.createTupleProjector(ctx);
+ tuplePartitioner = tuplePartitionerFactory == null ? null : tuplePartitionerFactory.createPartitioner(ctx);
}
- protected abstract ISearchPredicate createSearchPredicate();
+ protected abstract ISearchPredicate createSearchPredicate(IIndex index);
protected abstract void resetSearchPredicate(int tupleIndex);
// Assigns any index-type specific related accessor parameters
protected abstract void addAdditionalIndexAccessorParams(IIndexAccessParameters iap) throws HyracksDataException;
- protected IIndexCursor createCursor() throws HyracksDataException {
- return indexAccessor.createSearchCursor(false);
+ protected IIndexCursor createCursor(IIndex idx, IIndexAccessor idxAccessor) throws HyracksDataException {
+ return idxAccessor.createSearchCursor(false);
}
- protected abstract int getFieldCount();
+ protected abstract int getFieldCount(IIndex index);
@Override
public void open() throws HyracksDataException {
writer.open();
- indexHelper.open();
- index = indexHelper.getIndexInstance();
- subscribeForStats(index);
+ ISearchOperationCallback[] searchCallbacks = new ISearchOperationCallback[partitions.length];
+ IIndexAccessParameters[] iaps = new IndexAccessParameters[partitions.length];
+
+ for (int i = 0; i < partitions.length; i++) {
+ indexHelpers[i].open();
+ indexes[i] = indexHelpers[i].getIndexInstance();
+ searchCallbacks[i] = searchCallbackFactory
+ .createSearchOperationCallback(indexHelpers[i].getResource().getId(), ctx, null);
+ iaps[i] = new IndexAccessParameters(NoOpOperationCallback.INSTANCE, searchCallbacks[i]);
+ addAdditionalIndexAccessorParams(iaps[i]);
+ indexAccessors[i] = indexes[i].createAccessor(iaps[i]);
+ cursors[i] = createCursor(indexes[i], indexAccessors[i]);
+ }
+
+ subscribeForStats(indexes[0]);
accessor = new FrameTupleAccessor(inputRecDesc);
if (retainMissing) {
- int fieldCount = getFieldCount();
+ int fieldCount = getFieldCount(indexes[0]);
// Field count in case searchCallback.proceed() result is needed.
int finalFieldCount = appendSearchCallbackProceedResult ? fieldCount + 1 : fieldCount;
nonMatchTupleBuild = new ArrayTupleBuilder(finalFieldCount);
@@ -206,7 +225,7 @@
nonMatchTupleBuild = null;
}
if (appendIndexFilter) {
- int numIndexFilterFields = index.getNumOfFilterFields();
+ int numIndexFilterFields = indexes[0].getNumOfFilterFields();
nonFilterTupleBuild = new ArrayTupleBuilder(numIndexFilterFields);
buildMissingTuple(numIndexFilterFields, nonFilterTupleBuild, nonFilterWriter);
}
@@ -219,16 +238,9 @@
outputCount = 0;
try {
- searchPred = createSearchPredicate();
+ searchPred = createSearchPredicate(indexes[0]);
tb = new ArrayTupleBuilder(recordDesc.getFieldCount());
dos = tb.getDataOutput();
- appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
- ISearchOperationCallback searchCallback =
- searchCallbackFactory.createSearchOperationCallback(indexHelper.getResource().getId(), ctx, null);
- IIndexAccessParameters iap = new IndexAccessParameters(NoOpOperationCallback.INSTANCE, searchCallback);
- addAdditionalIndexAccessorParams(iap);
- indexAccessor = index.createAccessor(iap);
- cursor = createCursor();
if (retainInput) {
frameTuple = new FrameTupleReference();
}
@@ -237,7 +249,7 @@
}
}
- protected void writeSearchResults(int tupleIndex) throws Exception {
+ protected void writeSearchResults(int tupleIndex, IIndexCursor cursor) throws Exception {
long matchingTupleCount = 0;
while (cursor.hasNext()) {
cursor.next();
@@ -291,11 +303,10 @@
accessor.reset(buffer);
int tupleCount = accessor.getTupleCount();
try {
- for (int i = 0; i < tupleCount && !finished; i++) {
- resetSearchPredicate(i);
- cursor.close();
- indexAccessor.search(cursor, searchPred);
- writeSearchResults(i);
+ if (tuplePartitioner != null) {
+ searchPartition(tupleCount);
+ } else {
+ searchAllPartitions(tupleCount);
}
} catch (Exception e) {
throw HyracksDataException.create(e);
@@ -309,40 +320,52 @@
@Override
public void close() throws HyracksDataException {
- Throwable failure = releaseResources();
+ Throwable failure = flushFrame();
+ failure = releaseResources(failure);
failure = CleanupUtils.close(writer, failure);
if (failure != null) {
throw HyracksDataException.create(failure);
}
}
- private Throwable releaseResources() {
+ private Throwable flushFrame() {
Throwable failure = null;
- if (index != null) {
- // if index == null, then the index open was not successful
- if (!failed) {
- try {
- if (appender.getTupleCount() > 0) {
- appender.write(writer, true);
- }
- stats.getPageReads().update(ctx.getThreadStats().getPinnedPagesCount());
- stats.coldReadCounter().update(ctx.getThreadStats().getColdReadCount());
- } catch (Throwable th) { // NOSONAR Must ensure writer.fail is called.
- // subsequently, the failure will be thrown
- failure = th;
+ if (!failed) {
+ try {
+ if (appender.getTupleCount() > 0) {
+ appender.write(writer, true);
}
- if (failure != null) {
- try {
- writer.fail();
- } catch (Throwable th) {// NOSONAR Must cursor.close is called.
- // subsequently, the failure will be thrown
- failure = ExceptionUtils.suppress(failure, th);
- }
+ stats.getPageReads().update(ctx.getThreadStats().getPinnedPagesCount());
+ stats.coldReadCounter().update(ctx.getThreadStats().getColdReadCount());
+ } catch (Throwable th) { // NOSONAR Must ensure writer.fail is called.
+ // subsequently, the failure will be thrown
+ failure = th;
+ }
+ if (failure != null) {
+ try {
+ writer.fail();
+ } catch (Throwable th) {
+ // subsequently, the failure will be thrown
+ failure = ExceptionUtils.suppress(failure, th);
}
}
- failure = ResourceReleaseUtils.close(cursor, failure);
- failure = CleanupUtils.destroy(failure, cursor, indexAccessor);
- failure = ResourceReleaseUtils.close(indexHelper, failure);
+ }
+ return failure;
+ }
+
+ private Throwable releaseResources(Throwable failure) {
+ for (int i = 0; i < indexes.length; i++) {
+ // if index == null, then the index open was not successful
+ try {
+ if (indexes[i] != null) {
+ failure = ResourceReleaseUtils.close(cursors[i], failure);
+ failure = CleanupUtils.destroy(failure, cursors[i], indexAccessors[i]);
+ failure = ResourceReleaseUtils.close(indexHelpers[i], failure);
+ }
+ } catch (Throwable th) {// NOSONAR ensure closing other indexes
+ // subsequently, the failure will be thrown
+ failure = ExceptionUtils.suppress(failure, th);
+ }
}
return failure;
}
@@ -413,4 +436,25 @@
this.stats = stats;
}
+ private void searchPartition(int tupleCount) throws Exception {
+ for (int i = 0; i < tupleCount && !finished; i++) {
+ int storagePartition = tuplePartitioner.partition(accessor, i);
+ int pIdx = storagePartitionId2Index.get(storagePartition);
+ resetSearchPredicate(i);
+ cursors[pIdx].close();
+ indexAccessors[pIdx].search(cursors[pIdx], searchPred);
+ writeSearchResults(i, cursors[pIdx]);
+ }
+ }
+
+ private void searchAllPartitions(int tupleCount) throws Exception {
+ for (int p = 0; p < partitions.length; p++) {
+ for (int i = 0; i < tupleCount && !finished; i++) {
+ resetSearchPredicate(i);
+ cursors[p].close();
+ indexAccessors[p].search(cursors[p], searchPred);
+ writeSearchResults(i, cursors[p]);
+ }
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
index 3e03e5c..9ed0782 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -39,11 +40,12 @@
IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing,
IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory,
int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, ITupleFilterFactory tupleFilterFactory,
- long outputLimit, ITupleProjectorFactory tupleProjectorFactory) {
+ long outputLimit, ITupleProjectorFactory tupleProjectorFactory,
+ ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) {
super(spec, outRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory,
retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, minFilterFieldIndexes,
maxFilterFieldIndexes, false, null, tupleFilterFactory, outputLimit, false, null, null,
- tupleProjectorFactory);
+ tupleProjectorFactory, tuplePartitionerFactory, map);
}
@Override
@@ -53,7 +55,7 @@
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), lowKeyFields, highKeyFields,
lowKeyInclusive, highKeyInclusive, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, tupleFilterFactory,
- outputLimit, tupleProjectorFactory);
+ outputLimit, tupleProjectorFactory, tuplePartitionerFactory, map);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
index f6f97b7..47d515a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -37,6 +38,8 @@
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeBatchPointSearchCursor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.IIndexAccessor;
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
@@ -50,23 +53,23 @@
boolean highKeyInclusive, int[] minFilterKeyFields, int[] maxFilterKeyFields,
IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing,
IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory,
- ITupleFilterFactory tupleFilterFactory, long outputLimit, ITupleProjectorFactory tupleProjectorFactory)
- throws HyracksDataException {
+ ITupleFilterFactory tupleFilterFactory, long outputLimit, ITupleProjectorFactory tupleProjectorFactory,
+ ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) throws HyracksDataException {
super(ctx, partition, inputRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive,
minFilterKeyFields, maxFilterKeyFields, indexHelperFactory, retainInput, retainMissing,
missingWriterFactory, searchCallbackFactory, false, null, tupleFilterFactory, outputLimit, false, null,
- null, tupleProjectorFactory);
+ null, tupleProjectorFactory, tuplePartitionerFactory, map);
this.keyFields = lowKeyFields;
}
@Override
- protected IIndexCursor createCursor() throws HyracksDataException {
- ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
- return ((LSMBTree) index).createBatchPointSearchCursor(lsmAccessor.getOpContext());
+ protected IIndexCursor createCursor(IIndex idx, IIndexAccessor idxAccessor) throws HyracksDataException {
+ ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) idxAccessor;
+ return ((LSMBTree) idx).createBatchPointSearchCursor(lsmAccessor.getOpContext());
}
@Override
- protected ISearchPredicate createSearchPredicate() {
+ protected ISearchPredicate createSearchPredicate(IIndex index) {
ITreeIndex treeIndex = (ITreeIndex) index;
lowKeySearchCmp =
highKeySearchCmp = BTreeUtils.getSearchMultiComparator(treeIndex.getComparatorFactories(), lowKey);
@@ -78,19 +81,21 @@
accessor.reset(buffer);
if (accessor.getTupleCount() > 0) {
BatchPredicate batchPred = (BatchPredicate) searchPred;
- batchPred.reset(accessor);
- try {
- indexAccessor.search(cursor, batchPred);
- writeSearchResults();
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- } finally {
- cursor.close();
+ for (int p = 0; p < partitions.length; p++) {
+ batchPred.reset(accessor);
+ try {
+ indexAccessors[p].search(cursors[p], batchPred);
+ writeSearchResults(cursors[p]);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ } finally {
+ cursors[p].close();
+ }
}
}
}
- protected void writeSearchResults() throws IOException {
+ protected void writeSearchResults(IIndexCursor cursor) throws IOException {
long matchingTupleCount = 0;
LSMBTreeBatchPointSearchCursor batchCursor = (LSMBTreeBatchPointSearchCursor) cursor;
int tupleIndex = 0;
@@ -127,7 +132,6 @@
}
}
stats.getInputTupleCounter().update(matchingTupleCount);
-
}
private void appendMissingTuple(int start, int end) throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorNodePushable.java
index 4aa094b..1bf229b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorNodePushable.java
@@ -27,7 +27,9 @@
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.IndexSearchOperatorNodePushable;
+import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.ISearchPredicate;
@@ -37,21 +39,23 @@
RecordDescriptor inputRecDesc, IIndexDataflowHelperFactory indexHelperFactory,
ISearchOperationCallbackFactory searchCallbackFactory) throws HyracksDataException {
super(ctx, inputRecDesc, partition, null, null, indexHelperFactory, false, false, null, searchCallbackFactory,
- false, null);
+ false, null, null, -1, false, null, null, DefaultTupleProjectorFactory.INSTANCE, null, null);
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
try {
- ((ILSMIndexAccessor) indexAccessor).scanDiskComponents(cursor);
- writeSearchResults(0);
+ for (int p = 0; p < partitions.length; p++) {
+ ((ILSMIndexAccessor) indexAccessors[p]).scanDiskComponents(cursors[p]);
+ writeSearchResults(0, cursors[p]);
+ }
} catch (Exception e) {
throw HyracksDataException.create(e);
}
}
@Override
- protected ISearchPredicate createSearchPredicate() {
+ protected ISearchPredicate createSearchPredicate(IIndex index) {
// do nothing
// no need to create search predicate for disk component scan operation
return null;
@@ -63,7 +67,7 @@
}
@Override
- protected int getFieldCount() {
+ protected int getFieldCount(IIndex index) {
return ((ITreeIndex) index).getFieldCount() + 2;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java
index 571ae5c..996241d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java
@@ -33,12 +33,14 @@
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.IndexSearchOperatorNodePushable;
+import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearchModifier;
import org.apache.hyracks.storage.am.lsm.invertedindex.fulltext.IFullTextConfigEvaluator;
import org.apache.hyracks.storage.am.lsm.invertedindex.fulltext.IFullTextConfigEvaluatorFactory;
import org.apache.hyracks.storage.am.lsm.invertedindex.search.InvertedIndexSearchPredicate;
import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.ISearchPredicate;
@@ -66,7 +68,7 @@
IMissingWriterFactory nonFilterWriterFactory, int frameLimit) throws HyracksDataException {
super(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter,
- nonFilterWriterFactory);
+ nonFilterWriterFactory, null, -1, false, null, null, DefaultTupleProjectorFactory.INSTANCE, null, null);
this.searchModifier = searchModifier;
this.binaryTokenizerFactory = binaryTokenizerFactory;
this.fullTextConfigEvaluatorFactory = fullTextConfigEvaluatorFactory;
@@ -85,7 +87,7 @@
}
@Override
- protected ISearchPredicate createSearchPredicate() {
+ protected ISearchPredicate createSearchPredicate(IIndex index) {
IBinaryTokenizer tokenizer = binaryTokenizerFactory.createTokenizer();
IFullTextConfigEvaluator fullTextConfigEvaluator =
fullTextConfigEvaluatorFactory.createFullTextConfigEvaluator();
@@ -110,7 +112,7 @@
}
@Override
- protected int getFieldCount() {
+ protected int getFieldCount(IIndex index) {
return numOfFields;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
index e1c6f5b..aef624f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
@@ -31,11 +31,13 @@
import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
import org.apache.hyracks.storage.am.rtree.impls.SearchPredicate;
import org.apache.hyracks.storage.am.rtree.util.RTreeUtils;
+import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
public class RTreeSearchOperatorNodePushable extends IndexSearchOperatorNodePushable {
+
protected PermutingFrameTupleReference searchKey;
protected MultiComparator cmp;
@@ -43,16 +45,6 @@
int[] keyFields, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes,
IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing,
IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory,
- boolean appendIndexFilter, IMissingWriterFactory nonFilterWriterFactory) throws HyracksDataException {
- this(ctx, partition, inputRecDesc, keyFields, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
- retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter,
- nonFilterWriterFactory, false, null, null);
- }
-
- public RTreeSearchOperatorNodePushable(IHyracksTaskContext ctx, int partition, RecordDescriptor inputRecDesc,
- int[] keyFields, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes,
- IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing,
- IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory,
boolean appendIndexFilter, IMissingWriterFactory nonFilterWriterFactory,
boolean appendOpCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
byte[] searchCallbackProceedResultTrueValue) throws HyracksDataException {
@@ -60,7 +52,7 @@
super(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter,
nonFilterWriterFactory, null, -1, appendOpCallbackProceedResult, searchCallbackProceedResultFalseValue,
- searchCallbackProceedResultTrueValue, DefaultTupleProjectorFactory.INSTANCE);
+ searchCallbackProceedResultTrueValue, DefaultTupleProjectorFactory.INSTANCE, null, null);
if (keyFields != null && keyFields.length > 0) {
searchKey = new PermutingFrameTupleReference();
searchKey.setFieldPermutation(keyFields);
@@ -68,7 +60,7 @@
}
@Override
- protected ISearchPredicate createSearchPredicate() {
+ protected ISearchPredicate createSearchPredicate(IIndex index) {
ITreeIndex treeIndex = (ITreeIndex) index;
cmp = RTreeUtils.getSearchMultiComparator(treeIndex.getComparatorFactories(), searchKey);
return new SearchPredicate(searchKey, cmp, minFilterKey, maxFilterKey);
@@ -88,7 +80,7 @@
}
@Override
- protected int getFieldCount() {
+ protected int getFieldCount(IIndex index) {
return ((ITreeIndex) index).getFieldCount();
}