[ASTERIXDB-3144][HYR][RT] Make other operators support multiple partitions
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
This patch changes the following operators to support
operating on multiple partitions. This is a step towards
achieving compute/storage separation:
- compact dataset/index
- drop dataverse
- remove feed storage
- RTree search
Remove unused getWriteResultRuntime from IMetadataProvider.
Change-Id: I2b1223c0d1c3248305014396dec4c1d0bdef13f6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17505
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
index d5743cb..bfba414 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
@@ -34,8 +34,8 @@
public static JobSpecification dropDataverseJobSpec(Dataverse dataverse, MetadataProvider metadata) {
JobSpecification jobSpec = RuntimeUtils.createJobSpecification(metadata.getApplicationContext());
PartitioningProperties partitioningProperties = metadata.splitAndConstraints(dataverse.getDataverseName());
- FileRemoveOperatorDescriptor frod =
- new FileRemoveOperatorDescriptor(jobSpec, partitioningProperties.getSpiltsProvider(), false);
+ FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(jobSpec,
+ partitioningProperties.getSpiltsProvider(), false, partitioningProperties.getComputeStorageMap());
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, frod,
partitioningProperties.getConstraints());
jobSpec.addRoot(frod);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 07500f7..72961b8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -26,21 +26,17 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeSet;
import org.apache.asterix.app.result.ResponsePrinter;
import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
-import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.common.transactions.TxnId;
-import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.feed.management.FeedConnectionId;
@@ -50,7 +46,6 @@
import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
import org.apache.asterix.file.StorageComponentProvider;
import org.apache.asterix.lang.common.base.Expression;
@@ -114,13 +109,11 @@
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor;
import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor;
@@ -155,23 +148,14 @@
}
public static JobSpecification buildRemoveFeedStorageJob(MetadataProvider metadataProvider, Feed feed)
- throws AsterixException {
+ throws AlgebricksException {
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
JobSpecification spec = RuntimeUtils.createJobSpecification(appCtx);
- IClusterStateManager csm = appCtx.getClusterStateManager();
- AlgebricksAbsolutePartitionConstraint allCluster = csm.getClusterLocations();
- Set<String> nodes = new TreeSet<>();
- for (String node : allCluster.getLocations()) {
- nodes.add(node);
- }
- AlgebricksAbsolutePartitionConstraint locations =
- new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[nodes.size()]));
- FileSplit[] feedLogFileSplits =
- FeedUtils.splitsForAdapter(appCtx, feed.getDataverseName(), feed.getFeedName(), locations);
- org.apache.hyracks.algebricks.common.utils.Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spC =
- StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits);
- FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, spC.first, true);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod, spC.second);
+ PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(feed);
+ FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec,
+ partitioningProperties.getSpiltsProvider(), true, partitioningProperties.getComputeStorageMap());
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod,
+ partitioningProperties.getConstraints());
spec.addRoot(frod);
return spec;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index c648b33..a8df92a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -683,10 +683,12 @@
IIndexDataflowHelperFactory indexDataflowHelperFactory = new IndexDataflowHelperFactory(
storageComponentProvider.getStorageManager(), partitioningProperties.getSpiltsProvider());
if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+ int[][] partitionsMap = partitioningProperties.getComputeStorageMap();
rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, keyFields, true, true,
indexDataflowHelperFactory, retainInput, retainMissing, nonMatchWriterFactory,
searchCallbackFactory, minFilterFieldIndexes, maxFilterFieldIndexes, propagateFilter,
- nonFilterWriterFactory, isIndexOnlyPlan, failValueForIndexOnlyPlan, successValueForIndexOnlyPlan);
+ nonFilterWriterFactory, isIndexOnlyPlan, failValueForIndexOnlyPlan, successValueForIndexOnlyPlan,
+ partitionsMap);
} else {
// Create the operator
rtreeSearchOp = null;
@@ -732,52 +734,6 @@
}
@Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(
- IDataSource<DataSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
- LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields, JobGenContext context,
- JobSpecification spec) throws AlgebricksException {
- DataverseName dataverseName = dataSource.getId().getDataverseName();
- String datasetName = dataSource.getId().getDatasourceName();
- Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
- int numKeys = keys.size();
- int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1;
-
- // move key fields to front
- int[] fieldPermutation = new int[numKeys + 1 + numFilterFields];
- int[] pkFields = new int[numKeys];
- int i = 0;
- for (LogicalVariable varKey : keys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- pkFields[i] = idx;
- i++;
- }
- fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
- if (numFilterFields > 0) {
- int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
- fieldPermutation[numKeys + 1] = idx;
- }
-
- PartitioningProperties partitioningProperties = getPartitioningProperties(dataset);
- long numElementsHint = getCardinalityPerPartitionHint(dataset);
- // TODO
- // figure out the right behavior of the bulkload and then give the
- // right callback
- // (ex. what's the expected behavior when there is an error during
- // bulkload?)
- IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
- ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
- partitioningProperties.getNumberOfPartitions());
- IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
- storageComponentProvider.getStorageManager(), partitioningProperties.getSpiltsProvider());
- LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad = new LSMIndexBulkLoadOperatorDescriptor(spec, null,
- fieldPermutation, StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true,
- indexHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId(), null, partitionerFactory,
- partitioningProperties.getComputeStorageMap());
- return new Pair<>(btreeBulkLoad, partitioningProperties.getConstraints());
- }
-
- @Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(
IDataSource<DataSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
@@ -972,7 +928,7 @@
}
public PartitioningProperties splitAndConstraints(DataverseName dataverseName) {
- return dataPartitioningProvider.splitAndConstraints(dataverseName);
+ return dataPartitioningProvider.getPartitioningProperties(dataverseName);
}
public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx, Dataset dataset, String indexName)
@@ -1837,6 +1793,10 @@
return dataPartitioningProvider.getPartitioningProperties(mdTxnCtx, ds, indexName);
}
+ public PartitioningProperties getPartitioningProperties(Feed feed) throws AlgebricksException {
+ return dataPartitioningProvider.getPartitioningProperties(feed);
+ }
+
public List<Pair<IFileSplitProvider, String>> getSplitProviderOfAllIndexes(Dataset ds) throws AlgebricksException {
List<Index> dsIndexes = getDatasetIndexes(ds.getDataverseName(), ds.getDatasetName()).stream()
.filter(idx -> idx.getIndexType() != IndexType.SAMPLE && idx.isSecondaryIndex())
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
index f5e96b1..ec4c985 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
@@ -21,14 +21,22 @@
import static org.apache.asterix.common.utils.PartitioningScheme.DYNAMIC;
import static org.apache.asterix.common.utils.PartitioningScheme.STATIC;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.dataflow.IDataPartitioningProvider;
+import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.common.utils.PartitioningScheme;
import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Feed;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -48,7 +56,7 @@
scheme = appCtx.getStorageProperties().getPartitioningScheme();
}
- public PartitioningProperties splitAndConstraints(DataverseName dataverseName) {
+ public PartitioningProperties getPartitioningProperties(DataverseName dataverseName) {
if (scheme == DYNAMIC) {
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraints = SplitsAndConstraintsUtil
.getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(), dataverseName);
@@ -75,6 +83,25 @@
throw new IllegalStateException();
}
+ public PartitioningProperties getPartitioningProperties(Feed feed) throws AsterixException {
+ if (scheme == DYNAMIC) {
+ IClusterStateManager csm = appCtx.getClusterStateManager();
+ AlgebricksAbsolutePartitionConstraint allCluster = csm.getClusterLocations();
+ Set<String> nodes = new TreeSet<>(Arrays.asList(allCluster.getLocations()));
+ AlgebricksAbsolutePartitionConstraint locations =
+ new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[0]));
+ FileSplit[] feedLogFileSplits =
+ FeedUtils.splitsForAdapter(appCtx, feed.getDataverseName(), feed.getFeedName(), locations);
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spC =
+ StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits);
+ int[][] partitionsMap = getPartitionsMap(getNumPartitions(spC.second));
+ return PartitioningProperties.of(spC.first, spC.second, partitionsMap);
+ } else if (scheme == STATIC) {
+ throw new NotImplementedException();
+ }
+ throw new IllegalStateException();
+ }
+
private static int getNumPartitions(AlgebricksPartitionConstraint constraint) {
if (constraint.getPartitionConstraintType() == AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
return ((AlgebricksCountPartitionConstraint) constraint).getCount();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 27e06eb..130e39c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -370,8 +370,8 @@
IIndexDataflowHelperFactory indexHelperFactory =
new IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
partitioningProperties.getSpiltsProvider());
- LSMTreeIndexCompactOperatorDescriptor compactOp =
- new LSMTreeIndexCompactOperatorDescriptor(spec, indexHelperFactory);
+ LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
+ indexHelperFactory, partitioningProperties.getComputeStorageMap());
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
partitioningProperties.getConstraints());
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
index 498f3d4..8dc0d96 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
@@ -100,8 +100,8 @@
IIndexDataflowHelperFactory dataflowHelperFactory =
new IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
partitioningProperties.getSpiltsProvider());
- LSMTreeIndexCompactOperatorDescriptor compactOp =
- new LSMTreeIndexCompactOperatorDescriptor(spec, dataflowHelperFactory);
+ LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
+ dataflowHelperFactory, partitioningProperties.getComputeStorageMap());
compactOp.setSourceLocation(sourceLoc);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
secondaryPartitionConstraint);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
index ef9e75b..dce8638 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
@@ -59,6 +59,7 @@
public DatasetStreamStatsOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc,
String operatorName, IIndexDataflowHelperFactory[] indexes, String[] indexesNames) {
super(spec, 1, 1);
+ //TODO(partitioning)
outRecDescs[0] = rDesc;
this.operatorName = operatorName;
this.indexes = indexes;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
index 4f1dfd7..50fe998 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
@@ -47,6 +47,7 @@
IIndexDataflowHelperFactory secondaryIndexHelperFactory, int[] fieldPermutation, int numTagFields,
int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBTree) {
super(spec, 1, 1);
+ //TODO(partitioning) correlated
this.outRecDescs[0] = outRecDesc;
this.primaryIndexHelperFactory = primaryIndexHelperFactory;
this.secondaryIndexHelperFactory = secondaryIndexHelperFactory;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor.java
index f788d23..488b73b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor.java
@@ -48,6 +48,7 @@
int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBTree, boolean excludeUnknownKeys,
boolean forAnyUnknownKey) {
super(spec, 1, 1);
+ //TODO(partitioning) correlated
this.outRecDescs[0] = outRecDesc;
this.missingWriterFactory = missingWriterFactory;
this.numTagFields = numTagFields;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 4a6e76e..e34ff21 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -66,11 +66,6 @@
IResultSerializerFactoryProvider resultSerializerFactoryProvider, RecordDescriptor inputDesc,
IResultMetadata metadata, JobSpecification spec) throws AlgebricksException;
- Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> dataSource,
- IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
- List<LogicalVariable> additionalNonKeyFields, JobGenContext context, JobSpecification jobSpec)
- throws AlgebricksException;
-
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<S> dataSource,
IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
LogicalVariable payLoadVar, List<LogicalVariable> additionalFilterKeyFields,
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
index ed03cc7..49110b9 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -82,8 +82,8 @@
*
* @param constraintAcceptor
* - Constraint Acceptor
- * @param plan
- * - Job Plan
+ * @param ccServiceCtx
+ * - CC Service Context
*/
void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ICCServiceContext ccServiceCtx);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java
index 57a5f69..76cd48a 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java
@@ -20,7 +20,6 @@
package org.apache.hyracks.dataflow.std.file;
import java.io.File;
-import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -32,36 +31,31 @@
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
public class FileRemoveOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 2L;
+
private final IFileSplitProvider fileSplitProvider;
private final boolean quietly;
+ private final int[][] partitionsMap;
public FileRemoveOperatorDescriptor(IOperatorDescriptorRegistry spec, IFileSplitProvider fileSplitProvder,
- boolean quietly) {
+ boolean quietly, int[][] partitionsMap) {
super(spec, 0, 0);
this.fileSplitProvider = fileSplitProvder;
this.quietly = quietly;
+ this.partitionsMap = partitionsMap;
}
- /**
- *
- * @deprecated use {@link #FileRemoveOperatorDescriptor(IOperatorDescriptorRegistry spec, IFileSplitProvider fileSplitProvder, boolean quietly)} instead.
- */
- @Deprecated
- public FileRemoveOperatorDescriptor(IOperatorDescriptorRegistry spec, IFileSplitProvider fileSplitProvder) {
- this(spec, fileSplitProvder, false);
- }
-
- private static final long serialVersionUID = 1L;
-
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
- final FileSplit split = fileSplitProvider.getFileSplits()[partition];
+ final FileSplit[] splits = fileSplitProvider.getFileSplits();
+ final int[] splitsIndexes = partitionsMap[partition];
final IIOManager ioManager = ctx.getIoManager();
return new AbstractOperatorNodePushable() {
@@ -73,16 +67,7 @@
@Override
public void initialize() throws HyracksDataException {
// will only work for files inside the io devices
- File f = split.getFile(ioManager);
- if (quietly) {
- FileUtils.deleteQuietly(f);
- } else {
- try {
- FileUtils.deleteDirectory(f);
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
- }
+ deleteFiles();
}
@Override
@@ -98,6 +83,25 @@
@Override
public void deinitialize() throws HyracksDataException {
}
+
+ private void deleteFiles() throws HyracksDataException {
+ Throwable failure = null;
+ for (int splitsIndex : splitsIndexes) {
+ try {
+ File file = splits[splitsIndex].getFile(ioManager);
+ if (quietly) {
+ FileUtils.deleteQuietly(file);
+ } else {
+ FileUtils.deleteDirectory(file);
+ }
+ } catch (Throwable th) {
+ failure = ExceptionUtils.suppress(failure, th);
+ }
+ }
+ if (failure != null) {
+ throw HyracksDataException.create(failure);
+ }
+ }
};
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java
index 7b687c4..1302bce 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java
@@ -30,9 +30,9 @@
private static final long serialVersionUID = 1L;
- private int[] fieldSlots;
- private byte[] tupleData;
- private int tupleSize;
+ private final int[] fieldSlots;
+ private final byte[] tupleData;
+ private final int tupleSize;
public ConstantTupleSourceOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
int[] fieldSlots, byte[] tupleData, int tupleSize) {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
index 9bd0c59..785a330 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
@@ -26,11 +26,11 @@
import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public class ConstantTupleSourceOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
- private IHyracksTaskContext ctx;
- private int[] fieldSlots;
- private byte[] tupleData;
- private int tupleSize;
+ private final IHyracksTaskContext ctx;
+ private final int[] fieldSlots;
+ private final byte[] tupleData;
+ private final int tupleSize;
public ConstantTupleSourceOperatorNodePushable(IHyracksTaskContext ctx, int[] fieldSlots, byte[] tupleData,
int tupleSize) {
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesSecondaryIndexSearchOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesSecondaryIndexSearchOperatorTest.java
index 6a31962..68e4ac5 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesSecondaryIndexSearchOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesSecondaryIndexSearchOperatorTest.java
@@ -44,6 +44,7 @@
import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
import org.apache.hyracks.storage.common.IResourceFactory;
import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
+import org.apache.hyracks.test.support.TestUtils;
import org.apache.hyracks.tests.am.common.ITreeIndexOperatorTestHelper;
import org.apache.hyracks.tests.am.rtree.RTreeSecondaryIndexSearchOperatorTest;
import org.junit.Test;
@@ -95,7 +96,7 @@
int[] keyFields = { 0, 1, 2, 3 };
RTreeSearchOperatorDescriptor secondarySearchOp = new RTreeSearchOperatorDescriptor(spec,
secondaryWithFilterRecDesc, keyFields, true, true, secondaryHelperFactory, false, false, null,
- NoOpOperationCallbackFactory.INSTANCE, null, null, false, null);
+ NoOpOperationCallbackFactory.INSTANCE, null, null, false, null, TestUtils.getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp, NC1_ID);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { createFile(nc1) });
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java
index 0fcf892..f233417 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java
@@ -43,6 +43,7 @@
import org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
import org.apache.hyracks.storage.common.IResourceFactory;
+import org.apache.hyracks.test.support.TestUtils;
import org.junit.Before;
import org.junit.Test;
@@ -86,9 +87,9 @@
keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, NC1_ID);
int[] keyFields = { 0, 1, 2, 3 };
- RTreeSearchOperatorDescriptor secondarySearchOp =
- new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc, keyFields, true, true, secondaryHelperFactory,
- false, false, null, NoOpOperationCallbackFactory.INSTANCE, null, null, false, null);
+ RTreeSearchOperatorDescriptor secondarySearchOp = new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
+ keyFields, true, true, secondaryHelperFactory, false, false, null,
+ NoOpOperationCallbackFactory.INSTANCE, null, null, false, null, TestUtils.getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp, NC1_ID);
// fifth field from the tuples coming from secondary index
int[] primaryLowKeyFields = { 4 };
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java
index 3cfef19..2748988 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java
@@ -42,6 +42,7 @@
import org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
import org.apache.hyracks.storage.common.IResourceFactory;
+import org.apache.hyracks.test.support.TestUtils;
import org.junit.Before;
import org.junit.Test;
@@ -84,9 +85,9 @@
keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, NC1_ID);
int[] keyFields = null;
- RTreeSearchOperatorDescriptor secondarySearchOp =
- new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc, keyFields, true, true, secondaryHelperFactory,
- false, false, null, NoOpOperationCallbackFactory.INSTANCE, null, null, false, null);
+ RTreeSearchOperatorDescriptor secondarySearchOp = new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
+ keyFields, true, true, secondaryHelperFactory, false, false, null,
+ NoOpOperationCallbackFactory.INSTANCE, null, null, false, null, TestUtils.getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp, NC1_ID);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { createFile(nc1) });
IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java
index 8ea0701..da5de77 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java
@@ -43,6 +43,7 @@
import org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
import org.apache.hyracks.storage.common.IResourceFactory;
+import org.apache.hyracks.test.support.TestUtils;
import org.junit.Before;
import org.junit.Test;
@@ -84,9 +85,9 @@
keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, NC1_ID);
int[] keyFields = { 0, 1, 2, 3 };
- RTreeSearchOperatorDescriptor secondarySearchOp =
- new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc, keyFields, true, true, secondaryHelperFactory,
- false, false, null, NoOpOperationCallbackFactory.INSTANCE, null, null, false, null);
+ RTreeSearchOperatorDescriptor secondarySearchOp = new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
+ keyFields, true, true, secondaryHelperFactory, false, false, null,
+ NoOpOperationCallbackFactory.INSTANCE, null, null, false, null, TestUtils.getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp, NC1_ID);
// fifth field from the tuples coming from secondary index
int[] primaryLowKeyFields = { 4 };
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 88c06eb..c3719c7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -131,6 +131,7 @@
ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) throws HyracksDataException {
this.ctx = ctx;
this.appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
+ //TODO(partitioning) partitionsMap should not be null
this.partitions = partitionsMap != null ? partitionsMap[partition] : new int[] { partition };
for (int i = 0; i < partitions.length; i++) {
storagePartitionId2Index.put(partitions[i], i);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorDescriptor.java
index 099d668..06dad1c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorDescriptor.java
@@ -37,6 +37,7 @@
public TreeIndexDiskOrderScanOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
IIndexDataflowHelperFactory indexHelperFactory, ISearchOperationCallbackFactory searchCallbackFactory) {
super(spec, 0, 1);
+ //TODO(maybe don't fix)
this.indexHelperFactory = indexHelperFactory;
this.searchCallbackFactory = searchCallbackFactory;
this.outRecDescs[0] = outRecDesc;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorDescriptor.java
index 6ed1858..104eed4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorDescriptor.java
@@ -40,6 +40,7 @@
public TreeIndexStatsOperatorDescriptor(IOperatorDescriptorRegistry spec, IStorageManager storageManager,
IIndexDataflowHelperFactory indexHelperFactory) {
super(spec, 0, 1);
+ //TODO(maybe don't fix)
this.indexHelperFactory = indexHelperFactory;
this.storageManager = storageManager;
this.outRecDescs[0] = recDesc;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorDescriptor.java
index 291363d..da41f7e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorDescriptor.java
@@ -38,6 +38,7 @@
public LSMBTreeDiskComponentScanOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
IIndexDataflowHelperFactory indexHelperFactory, ISearchOperationCallbackFactory searchCallbackFactory) {
super(spec, 1, 1);
+ //TODO(partitioning) correlated
this.indexHelperFactory = indexHelperFactory;
this.searchCallbackFactory = searchCallbackFactory;
this.outRecDescs[0] = outRecDesc;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
index 682ffef..139a871 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -30,16 +31,25 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
public class LSMIndexCompactOperatorNodePushable extends AbstractOperatorNodePushable {
- private final IIndexDataflowHelper indexHelper;
+
+ private final IIndexDataflowHelper[] indexHelpers;
public LSMIndexCompactOperatorNodePushable(IHyracksTaskContext ctx, int partition,
- IIndexDataflowHelperFactory indexHelperFactory) throws HyracksDataException {
- this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
+ IIndexDataflowHelperFactory indexHelperFactory, int[][] partitionsMap) throws HyracksDataException {
+ int[] partitions = partitionsMap[partition];
+ indexHelpers = new IIndexDataflowHelper[partitions.length];
+ for (int i = 0; i < partitions.length; i++) {
+ indexHelpers[i] = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partitions[i]);
+ }
+
}
@Override
public void deinitialize() throws HyracksDataException {
- indexHelper.close();
+ Throwable failure = CleanupUtils.close(indexHelpers, null);
+ if (failure != null) {
+ throw HyracksDataException.create(failure);
+ }
}
@Override
@@ -54,10 +64,12 @@
@Override
public void initialize() throws HyracksDataException {
- indexHelper.open();
- ILSMIndex index = (ILSMIndex) indexHelper.getIndexInstance();
- ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- accessor.scheduleFullMerge();
+ for (IIndexDataflowHelper indexHelper : indexHelpers) {
+ indexHelper.open();
+ ILSMIndex index = (ILSMIndex) indexHelper.getIndexInstance();
+ ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ accessor.scheduleFullMerge();
+ }
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java
index 6da4c8f..aad1cc9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java
@@ -29,18 +29,20 @@
public class LSMTreeIndexCompactOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final IIndexDataflowHelperFactory indexHelperFactory;
+ private final int[][] partitionsMap;
public LSMTreeIndexCompactOperatorDescriptor(IOperatorDescriptorRegistry spec,
- IIndexDataflowHelperFactory indexHelperFactory) {
+ IIndexDataflowHelperFactory indexHelperFactory, int[][] partitionsMap) {
super(spec, 0, 0);
this.indexHelperFactory = indexHelperFactory;
+ this.partitionsMap = partitionsMap;
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
- return new LSMIndexCompactOperatorNodePushable(ctx, partition, indexHelperFactory);
+ return new LSMIndexCompactOperatorNodePushable(ctx, partition, indexHelperFactory, partitionsMap);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
index cc6d76d..8e7a7fe 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
@@ -32,7 +32,7 @@
public class RTreeSearchOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
protected final int[] keyFields;
protected final boolean lowKeyInclusive;
protected final boolean highKeyInclusive;
@@ -45,6 +45,7 @@
protected final boolean retainMissing;
protected final IMissingWriterFactory missingWriterFactory;
protected final ISearchOperationCallbackFactory searchCallbackFactory;
+ protected final int[][] partitionsMap;
protected boolean appendOpCallbackProceedResult;
protected byte[] searchCallbackProceedResultFalseValue;
protected byte[] searchCallbackProceedResultTrueValue;
@@ -53,10 +54,11 @@
boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory indexHelperFactory,
boolean retainInput, boolean retainMissing, IMissingWriterFactory missingWriterFactory,
ISearchOperationCallbackFactory searchCallbackFactory, int[] minFilterFieldIndexes,
- int[] maxFilterFieldIndexes, boolean appendIndexFilter, IMissingWriterFactory nonFilterWriterFactory) {
+ int[] maxFilterFieldIndexes, boolean appendIndexFilter, IMissingWriterFactory nonFilterWriterFactory,
+ int[][] partitionsMap) {
this(spec, outRecDesc, keyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput,
retainMissing, missingWriterFactory, searchCallbackFactory, minFilterFieldIndexes,
- maxFilterFieldIndexes, appendIndexFilter, nonFilterWriterFactory, false, null, null);
+ maxFilterFieldIndexes, appendIndexFilter, nonFilterWriterFactory, false, null, null, partitionsMap);
}
public RTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int[] keyFields,
@@ -65,7 +67,7 @@
ISearchOperationCallbackFactory searchCallbackFactory, int[] minFilterFieldIndexes,
int[] maxFilterFieldIndexes, boolean appendIndexFilter, IMissingWriterFactory nonFilterWriterFactory,
boolean appendOpCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
- byte[] searchCallbackProceedResultTrueValue) {
+ byte[] searchCallbackProceedResultTrueValue, int[][] partitionsMap) {
super(spec, 1, 1);
this.indexHelperFactory = indexHelperFactory;
this.retainInput = retainInput;
@@ -79,6 +81,7 @@
this.maxFilterFieldIndexes = maxFilterFieldIndexes;
this.appendIndexFilter = appendIndexFilter;
this.nonFilterWriterFactory = nonFilterWriterFactory;
+ this.partitionsMap = partitionsMap;
this.outRecDescs[0] = outRecDesc;
this.appendOpCallbackProceedResult = appendOpCallbackProceedResult;
this.searchCallbackProceedResultFalseValue = searchCallbackProceedResultFalseValue;
@@ -92,6 +95,6 @@
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), keyFields, minFilterFieldIndexes,
maxFilterFieldIndexes, indexHelperFactory, retainInput, retainMissing, missingWriterFactory,
searchCallbackFactory, appendIndexFilter, nonFilterWriterFactory, appendOpCallbackProceedResult,
- searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue);
+ searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue, partitionsMap);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
index aef624f..7a7dc0e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
@@ -47,12 +47,12 @@
IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory,
boolean appendIndexFilter, IMissingWriterFactory nonFilterWriterFactory,
boolean appendOpCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
- byte[] searchCallbackProceedResultTrueValue) throws HyracksDataException {
+ byte[] searchCallbackProceedResultTrueValue, int[][] partitionsMap) throws HyracksDataException {
// TODO: predicate & limit pushdown not enabled for RTree yet
super(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter,
nonFilterWriterFactory, null, -1, appendOpCallbackProceedResult, searchCallbackProceedResultFalseValue,
- searchCallbackProceedResultTrueValue, DefaultTupleProjectorFactory.INSTANCE, null, null);
+ searchCallbackProceedResultTrueValue, DefaultTupleProjectorFactory.INSTANCE, null, partitionsMap);
if (keyFields != null && keyFields.length > 0) {
searchKey = new PermutingFrameTupleReference();
searchKey.setFieldPermutation(keyFields);