changed BTree ops to take FileSplitProvider instead of btree file name
git-svn-id: https://hyracks.googlecode.com/svn/trunk@175 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
index 2e57487..7a27c77 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
@@ -25,6 +25,7 @@
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
import edu.uci.ics.hyracks.examples.btree.helper.BTreeRegistryProvider;
import edu.uci.ics.hyracks.examples.btree.helper.BufferCacheProvider;
@@ -120,9 +121,10 @@
// comparator factories for primary index
IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[1];
primaryComparatorFactories[0] = IntegerBinaryComparatorFactory.INSTANCE;
+ IFileSplitProvider primarySplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.primaryBtreeName);
// create operator descriptor
- BTreeInsertUpdateDeleteOperatorDescriptor primaryInsert = new BTreeInsertUpdateDeleteOperatorDescriptor(spec, recDesc, bufferCacheProvider, btreeRegistryProvider, options.primaryBtreeName, fileMappingProviderProvider, interiorFrameFactory, leafFrameFactory, primaryFieldCount, primaryComparatorFactories, primaryFieldPermutation, BTreeOp.BTO_INSERT);
- PartitionConstraint primaryInsertConstraint = createPartitionConstraint(splitNCs);
+ BTreeInsertUpdateDeleteOperatorDescriptor primaryInsert = new BTreeInsertUpdateDeleteOperatorDescriptor(spec, recDesc, bufferCacheProvider, btreeRegistryProvider, primarySplitProvider, fileMappingProviderProvider, interiorFrameFactory, leafFrameFactory, primaryFieldCount, primaryComparatorFactories, primaryFieldPermutation, BTreeOp.BTO_INSERT);
+ PartitionConstraint primaryInsertConstraint = JobHelper.createPartitionConstraint(splitNCs);
primaryInsert.setPartitionConstraint(primaryInsertConstraint);
// prepare insertion into secondary index
@@ -134,14 +136,15 @@
IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[2];
secondaryComparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
secondaryComparatorFactories[1] = IntegerBinaryComparatorFactory.INSTANCE;
+ IFileSplitProvider secondarySplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.secondaryBtreeName);
// create operator descriptor
- BTreeInsertUpdateDeleteOperatorDescriptor secondaryInsert = new BTreeInsertUpdateDeleteOperatorDescriptor(spec, recDesc, bufferCacheProvider, btreeRegistryProvider, options.secondaryBtreeName, fileMappingProviderProvider, interiorFrameFactory, leafFrameFactory, secondaryFieldCount, secondaryComparatorFactories, secondaryFieldPermutation, BTreeOp.BTO_INSERT);
- PartitionConstraint secondaryInsertConstraint = createPartitionConstraint(splitNCs);
+ BTreeInsertUpdateDeleteOperatorDescriptor secondaryInsert = new BTreeInsertUpdateDeleteOperatorDescriptor(spec, recDesc, bufferCacheProvider, btreeRegistryProvider, secondarySplitProvider, fileMappingProviderProvider, interiorFrameFactory, leafFrameFactory, secondaryFieldCount, secondaryComparatorFactories, secondaryFieldPermutation, BTreeOp.BTO_INSERT);
+ PartitionConstraint secondaryInsertConstraint = JobHelper.createPartitionConstraint(splitNCs);
secondaryInsert.setPartitionConstraint(secondaryInsertConstraint);
// end the insert pipeline at this sink operator
NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
- PartitionConstraint nullSinkPartitionConstraint = createPartitionConstraint(splitNCs);
+ PartitionConstraint nullSinkPartitionConstraint = JobHelper.createPartitionConstraint(splitNCs);
nullSink.setPartitionConstraint(nullSinkPartitionConstraint);
// distribute the records from the datagen via hashing to the bulk load ops
@@ -161,13 +164,5 @@
spec.addRoot(nullSink);
return spec;
- }
-
- private static PartitionConstraint createPartitionConstraint(String[] splitNCs) {
- LocationConstraint[] lConstraints = new LocationConstraint[splitNCs.length];
- for (int i = 0; i < splitNCs.length; ++i) {
- lConstraints[i] = new AbsoluteLocationConstraint(splitNCs[i]);
- }
- return new ExplicitPartitionConstraint(lConstraints);
- }
+ }
}
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/JobHelper.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/JobHelper.java
new file mode 100644
index 0000000..6799128
--- /dev/null
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/JobHelper.java
@@ -0,0 +1,31 @@
+package edu.uci.ics.hyracks.examples.btree.client;
+
+import java.io.File;
+
+import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
+import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class JobHelper {
+ public static IFileSplitProvider createFileSplitProvider(String[] splitNCs, String btreeFileName) {
+ FileSplit[] fileSplits = new FileSplit[splitNCs.length];
+ for (int i = 0; i < splitNCs.length; ++i) {
+ String fileName = btreeFileName + "." + splitNCs[i];
+ fileSplits[i] = new FileSplit(splitNCs[i], new File(fileName));
+ }
+ IFileSplitProvider splitProvider = new ConstantFileSplitProvider(fileSplits);
+ return splitProvider;
+ }
+
+ public static PartitionConstraint createPartitionConstraint(String[] splitNCs) {
+ LocationConstraint[] lConstraints = new LocationConstraint[splitNCs.length];
+ for (int i = 0; i < splitNCs.length; ++i) {
+ lConstraints[i] = new AbsoluteLocationConstraint(splitNCs[i]);
+ }
+ return new ExplicitPartitionConstraint(lConstraints);
+ }
+}
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
index 0725929..dd0d11b 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
@@ -38,6 +38,7 @@
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.examples.btree.helper.BTreeRegistryProvider;
import edu.uci.ics.hyracks.examples.btree.helper.BufferCacheProvider;
@@ -124,7 +125,7 @@
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = IntegerBinaryComparatorFactory.INSTANCE;
ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, options.sbSize, sortFields, comparatorFactories, recDesc);
- PartitionConstraint sorterConstraint = createPartitionConstraint(splitNCs);
+ PartitionConstraint sorterConstraint = JobHelper.createPartitionConstraint(splitNCs);
sorter.setPartitionConstraint(sorterConstraint);
// create factories and providers for B-Tree
@@ -138,10 +139,11 @@
int fieldCount = 4;
// the B-Tree expects its keyfields to be at the front of its input tuple
int[] fieldPermutation = { 2, 1, 3, 4 }; // map field 2 of input tuple to field 0 of B-Tree tuple, etc.
+ IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.btreeName);
BTreeBulkLoadOperatorDescriptor btreeBulkLoad = new BTreeBulkLoadOperatorDescriptor(spec,
- bufferCacheProvider, btreeRegistryProvider, options.btreeName, fileMappingProviderProvider, interiorFrameFactory,
+ bufferCacheProvider, btreeRegistryProvider, btreeSplitProvider, fileMappingProviderProvider, interiorFrameFactory,
leafFrameFactory, fieldCount, comparatorFactories, fieldPermutation, 0.7f);
- PartitionConstraint bulkLoadConstraint = createPartitionConstraint(splitNCs);
+ PartitionConstraint bulkLoadConstraint = JobHelper.createPartitionConstraint(splitNCs);
btreeBulkLoad.setPartitionConstraint(bulkLoadConstraint);
// distribute the records from the datagen via hashing to the bulk load ops
@@ -157,13 +159,5 @@
spec.addRoot(btreeBulkLoad);
return spec;
- }
-
- private static PartitionConstraint createPartitionConstraint(String[] splitNCs) {
- LocationConstraint[] lConstraints = new LocationConstraint[splitNCs.length];
- for (int i = 0; i < splitNCs.length; ++i) {
- lConstraints[i] = new AbsoluteLocationConstraint(splitNCs[i]);
- }
- return new ExplicitPartitionConstraint(lConstraints);
- }
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
index 0d419e6..1d72cc3 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
@@ -22,9 +22,6 @@
import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
-import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
-import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -36,6 +33,7 @@
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
import edu.uci.ics.hyracks.examples.btree.helper.BTreeRegistryProvider;
import edu.uci.ics.hyracks.examples.btree.helper.BufferCacheProvider;
@@ -127,20 +125,21 @@
RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
- PartitionConstraint keyProviderPartitionConstraint = createPartitionConstraint(splitNCs);
+ PartitionConstraint keyProviderPartitionConstraint = JobHelper.createPartitionConstraint(splitNCs);
keyProviderOp.setPartitionConstraint(keyProviderPartitionConstraint);
int[] lowKeyFields = { 0 }; // low key is in field 0 of tuples going into search op
int[] highKeyFields = { 1 }; // low key is in field 1 of tuples going into search op
- BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(spec, recDesc, bufferCacheProvider, btreeRegistryProvider, options.btreeName, fileMappingProviderProvider, interiorFrameFactory, leafFrameFactory, recDesc.getFields().length, comparatorFactories, true, lowKeyFields, highKeyFields);
- PartitionConstraint btreeSearchConstraint = createPartitionConstraint(splitNCs);
+ IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.btreeName);
+ BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(spec, recDesc, bufferCacheProvider, btreeRegistryProvider, btreeSplitProvider, fileMappingProviderProvider, interiorFrameFactory, leafFrameFactory, recDesc.getFields().length, comparatorFactories, true, lowKeyFields, highKeyFields);
+ PartitionConstraint btreeSearchConstraint = JobHelper.createPartitionConstraint(splitNCs);
btreeSearchOp.setPartitionConstraint(btreeSearchConstraint);
// have each node print the results of its respective B-Tree
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerConstraint = createPartitionConstraint(splitNCs);
+ PartitionConstraint printerConstraint = JobHelper.createPartitionConstraint(splitNCs);
printer.setPartitionConstraint(printerConstraint);
spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, btreeSearchOp, 0);
@@ -150,13 +149,5 @@
spec.addRoot(printer);
return spec;
- }
-
- private static PartitionConstraint createPartitionConstraint(String[] splitNCs) {
- LocationConstraint[] lConstraints = new LocationConstraint[splitNCs.length];
- for (int i = 0; i < splitNCs.length; ++i) {
- lConstraints[i] = new AbsoluteLocationConstraint(splitNCs[i]);
- }
- return new ExplicitPartitionConstraint(lConstraints);
- }
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
index a2efc18..dc6f420 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
@@ -21,9 +21,6 @@
import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
-import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
-import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -34,6 +31,7 @@
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.examples.btree.helper.BTreeRegistryProvider;
import edu.uci.ics.hyracks.examples.btree.helper.BufferCacheProvider;
@@ -114,8 +112,9 @@
IFileMappingProviderProvider fileMappingProviderProvider = FileMappingProviderProvider.INSTANCE;
// use a disk-order scan to read primary index
- BTreeDiskOrderScanOperatorDescriptor btreeScanOp = new BTreeDiskOrderScanOperatorDescriptor(spec, recDesc, bufferCacheProvider, btreeRegistryProvider, options.primaryBtreeName, fileMappingProviderProvider, interiorFrameFactory, leafFrameFactory, recDesc.getFields().length);
- PartitionConstraint scanPartitionConstraint = createPartitionConstraint(splitNCs);
+ IFileSplitProvider primarySplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.primaryBtreeName);
+ BTreeDiskOrderScanOperatorDescriptor btreeScanOp = new BTreeDiskOrderScanOperatorDescriptor(spec, recDesc, bufferCacheProvider, btreeRegistryProvider, primarySplitProvider, fileMappingProviderProvider, interiorFrameFactory, leafFrameFactory, recDesc.getFields().length);
+ PartitionConstraint scanPartitionConstraint = JobHelper.createPartitionConstraint(splitNCs);
btreeScanOp.setPartitionConstraint(scanPartitionConstraint);
// sort the tuples as preparation for bulk load into secondary index
@@ -126,17 +125,18 @@
comparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
comparatorFactories[1] = IntegerBinaryComparatorFactory.INSTANCE;
ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, options.sbSize, sortFields, comparatorFactories, recDesc);
- PartitionConstraint sorterConstraint = createPartitionConstraint(splitNCs);
+ PartitionConstraint sorterConstraint = JobHelper.createPartitionConstraint(splitNCs);
sorter.setPartitionConstraint(sorterConstraint);
// tuples to be put into B-Tree shall have 2 fields
int fieldCount = 2;
// the B-Tree expects its keyfields to be at the front of its input tuple
int[] fieldPermutation = { 1, 0 };
+ IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.btreeName);
BTreeBulkLoadOperatorDescriptor btreeBulkLoad = new BTreeBulkLoadOperatorDescriptor(spec,
- bufferCacheProvider, btreeRegistryProvider, options.btreeName, fileMappingProviderProvider, interiorFrameFactory,
+ bufferCacheProvider, btreeRegistryProvider, btreeSplitProvider, fileMappingProviderProvider, interiorFrameFactory,
leafFrameFactory, fieldCount, comparatorFactories, fieldPermutation, 0.7f);
- PartitionConstraint bulkLoadConstraint = createPartitionConstraint(splitNCs);
+ PartitionConstraint bulkLoadConstraint = JobHelper.createPartitionConstraint(splitNCs);
btreeBulkLoad.setPartitionConstraint(bulkLoadConstraint);
// connect the ops
@@ -149,13 +149,5 @@
spec.addRoot(btreeBulkLoad);
return spec;
- }
-
- private static PartitionConstraint createPartitionConstraint(String[] splitNCs) {
- LocationConstraint[] lConstraints = new LocationConstraint[splitNCs.length];
- for (int i = 0; i < splitNCs.length; ++i) {
- lConstraints[i] = new AbsoluteLocationConstraint(splitNCs[i]);
- }
- return new ExplicitPartitionConstraint(lConstraints);
- }
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
index 3302674..e69641a 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
@@ -22,9 +22,6 @@
import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
-import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
-import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -36,6 +33,7 @@
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
import edu.uci.ics.hyracks.examples.btree.helper.BTreeRegistryProvider;
import edu.uci.ics.hyracks.examples.btree.helper.BufferCacheProvider;
@@ -137,14 +135,15 @@
RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
- PartitionConstraint keyProviderPartitionConstraint = createPartitionConstraint(splitNCs);
+ PartitionConstraint keyProviderPartitionConstraint = JobHelper.createPartitionConstraint(splitNCs);
keyProviderOp.setPartitionConstraint(keyProviderPartitionConstraint);
int[] secondaryLowKeyFields = { 0 }; // low key is in field 0 of tuples going into secondary index search op
int[] secondaryHighKeyFields = { 1 }; // high key is in field 1 of tuples going into secondary index search op
- BTreeSearchOperatorDescriptor secondarySearchOp = new BTreeSearchOperatorDescriptor(spec, secondaryRecDesc, bufferCacheProvider, btreeRegistryProvider, options.secondaryBTreeName, fileMappingProviderProvider, interiorFrameFactory, leafFrameFactory, secondaryRecDesc.getFields().length, comparatorFactories, true, secondaryLowKeyFields, secondaryHighKeyFields);
- PartitionConstraint secondarySearchConstraint = createPartitionConstraint(splitNCs);
+ IFileSplitProvider secondarySplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.secondaryBTreeName);
+ BTreeSearchOperatorDescriptor secondarySearchOp = new BTreeSearchOperatorDescriptor(spec, secondaryRecDesc, bufferCacheProvider, btreeRegistryProvider, secondarySplitProvider, fileMappingProviderProvider, interiorFrameFactory, leafFrameFactory, secondaryRecDesc.getFields().length, comparatorFactories, true, secondaryLowKeyFields, secondaryHighKeyFields);
+ PartitionConstraint secondarySearchConstraint = JobHelper.createPartitionConstraint(splitNCs);
secondarySearchOp.setPartitionConstraint(secondarySearchConstraint);
// secondary index will output tuples with [UTF8String, Integer]
@@ -152,13 +151,14 @@
int[] primaryLowKeyFields = { 1 }; // low key is in field 0 of tuples going into primary index search op
int[] primaryHighKeyFields = { 1 }; // high key is in field 1 of tuples going into primary index search op
- BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc, bufferCacheProvider, btreeRegistryProvider, options.primaryBTreeName, fileMappingProviderProvider, interiorFrameFactory, leafFrameFactory, primaryRecDesc.getFields().length, comparatorFactories, true, primaryLowKeyFields, primaryHighKeyFields);
- PartitionConstraint primarySearchConstraint = createPartitionConstraint(splitNCs);
+ IFileSplitProvider primarySplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.primaryBTreeName);
+ BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc, bufferCacheProvider, btreeRegistryProvider, primarySplitProvider, fileMappingProviderProvider, interiorFrameFactory, leafFrameFactory, primaryRecDesc.getFields().length, comparatorFactories, true, primaryLowKeyFields, primaryHighKeyFields);
+ PartitionConstraint primarySearchConstraint = JobHelper.createPartitionConstraint(splitNCs);
primarySearchOp.setPartitionConstraint(primarySearchConstraint);
// have each node print the results of its respective B-Tree
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerConstraint = createPartitionConstraint(splitNCs);
+ PartitionConstraint printerConstraint = JobHelper.createPartitionConstraint(splitNCs);
printer.setPartitionConstraint(printerConstraint);
spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, secondarySearchOp, 0);
@@ -170,13 +170,5 @@
spec.addRoot(printer);
return spec;
- }
-
- private static PartitionConstraint createPartitionConstraint(String[] splitNCs) {
- LocationConstraint[] lConstraints = new LocationConstraint[splitNCs.length];
- for (int i = 0; i < splitNCs.length; ++i) {
- lConstraints[i] = new AbsoluteLocationConstraint(splitNCs[i]);
- }
- return new ExplicitPartitionConstraint(lConstraints);
- }
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeOperatorsTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeOperatorsTest.java
index cf7c847..7ec3883 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeOperatorsTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeOperatorsTest.java
@@ -77,7 +77,7 @@
@Test
public void bulkLoadTest() throws Exception {
// relies on the fact that NCs are run from same process
- System.setProperty("NodeControllerDataPath", System.getProperty("java.io.tmpdir") + "/");
+ //System.setProperty("NodeControllerDataPath", System.getProperty("java.io.tmpdir") + "/");
JobSpecification spec = new JobSpecification();
@@ -121,7 +121,12 @@
int[] fieldPermutation = { 0, 4, 5 };
- BTreeBulkLoadOperatorDescriptor btreeBulkLoad = new BTreeBulkLoadOperatorDescriptor(spec, bufferCacheProvider, btreeRegistryProvider, "btreetest.bin", fileMappingProviderProvider, interiorFrameFactory, leafFrameFactory, fieldCount, comparatorFactories, fieldPermutation, 0.7f);
+ String btreeName = "btree.bin";
+ String nc1FileName = System.getProperty("java.io.tmpdir") + "/nc1/" + btreeName;
+ IFileSplitProvider btreeSplitProvider = new ConstantFileSplitProvider(
+ new FileSplit[] { new FileSplit(NC1_ID, new File(nc1FileName)) } );
+
+ BTreeBulkLoadOperatorDescriptor btreeBulkLoad = new BTreeBulkLoadOperatorDescriptor(spec, bufferCacheProvider, btreeRegistryProvider, btreeSplitProvider, fileMappingProviderProvider, interiorFrameFactory, leafFrameFactory, fieldCount, comparatorFactories, fieldPermutation, 0.7f);
PartitionConstraint btreePartitionConstraintA = new ExplicitPartitionConstraint(new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
btreeBulkLoad.setPartitionConstraint(btreePartitionConstraintA);
@@ -139,7 +144,6 @@
}
MultiComparator cmp = new MultiComparator(fieldCount, comparators);
-
// try an ordered scan on the bulk-loaded btree
int btreeFileId = 0; // TODO: this relies on the way FileMappingProvider assignds ids (in sequence starting from 0)
@@ -158,19 +162,16 @@
e.printStackTrace();
} finally {
scanCursor.close();
- }
+ }
}
-
+
@Test
public void btreeSearchTest() throws Exception {
// relies on the fact that NCs are run from same process
System.setProperty("NodeControllerDataPath", System.getProperty("java.io.tmpdir") + "/");
JobSpecification spec = new JobSpecification();
-
- IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] {
- new FileSplit(NC2_ID, new File("data/words.txt")), new FileSplit(NC1_ID, new File("data/words.txt")) });
-
+
IBTreeInteriorFrameFactory interiorFrameFactory = new NSMInteriorFrameFactory();
IBTreeLeafFrameFactory leafFrameFactory = new NSMLeafFrameFactory();
@@ -210,7 +211,12 @@
RecordDescriptor recDesc = new RecordDescriptor(
new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
- BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(spec, recDesc, bufferCacheProvider, btreeRegistryProvider, "btreetest.bin", fileMappingProviderProvider, interiorFrameFactory, leafFrameFactory, fieldCount, comparatorFactories, true, new int[]{0}, new int[]{1});
+ String btreeName = "btree.bin";
+ String nc1FileName = System.getProperty("java.io.tmpdir") + "/nc1/" + btreeName;
+ IFileSplitProvider btreeSplitProvider = new ConstantFileSplitProvider(
+ new FileSplit[] { new FileSplit(NC1_ID, new File(nc1FileName)) } );
+
+ BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(spec, recDesc, bufferCacheProvider, btreeRegistryProvider, btreeSplitProvider, fileMappingProviderProvider, interiorFrameFactory, leafFrameFactory, fieldCount, comparatorFactories, true, new int[]{0}, new int[]{1});
//BTreeDiskOrderScanOperatorDescriptor btreeSearchOp = new BTreeDiskOrderScanOperatorDescriptor(spec, splitProvider, recDesc, bufferCacheProvider, btreeRegistryProvider, 0, "btreetest.bin", interiorFrameFactory, leafFrameFactory, cmp);
PartitionConstraint btreePartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
@@ -336,21 +342,27 @@
// create insert operators
- // primary index
+ // primary index
+ IFileSplitProvider btreeSplitProviderA = new ConstantFileSplitProvider(
+ new FileSplit[] { new FileSplit(NC1_ID, new File("/tmp/btreetestA.ix")) } );
int[] fieldPermutationA = { 0,1,2,3,4,5 };
- BTreeInsertUpdateDeleteOperatorDescriptor insertOpA = new BTreeInsertUpdateDeleteOperatorDescriptor(spec, ordersDesc, bufferCacheProvider, btreeRegistryProvider, "btreetestA.ix", fileMappingProviderProvider, interiorFrameFactory, leafFrameFactory, primaryFieldCount, primaryComparatorFactories, fieldPermutationA, BTreeOp.BTO_INSERT);
+ BTreeInsertUpdateDeleteOperatorDescriptor insertOpA = new BTreeInsertUpdateDeleteOperatorDescriptor(spec, ordersDesc, bufferCacheProvider, btreeRegistryProvider, btreeSplitProviderA, fileMappingProviderProvider, interiorFrameFactory, leafFrameFactory, primaryFieldCount, primaryComparatorFactories, fieldPermutationA, BTreeOp.BTO_INSERT);
PartitionConstraint insertPartitionConstraintA = new ExplicitPartitionConstraint(new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
insertOpA.setPartitionConstraint(insertPartitionConstraintA);
// first secondary index
+ IFileSplitProvider btreeSplitProviderB = new ConstantFileSplitProvider(
+ new FileSplit[] { new FileSplit(NC1_ID, new File("/tmp/btreetestB.ix")) } );
int[] fieldPermutationB = { 3, 0 };
- BTreeInsertUpdateDeleteOperatorDescriptor insertOpB = new BTreeInsertUpdateDeleteOperatorDescriptor(spec, ordersDesc, bufferCacheProvider, btreeRegistryProvider, "btreetestB.ix", fileMappingProviderProvider, interiorFrameFactory, leafFrameFactory, secondaryFieldCount, secondaryComparatorFactories, fieldPermutationB, BTreeOp.BTO_INSERT);
+ BTreeInsertUpdateDeleteOperatorDescriptor insertOpB = new BTreeInsertUpdateDeleteOperatorDescriptor(spec, ordersDesc, bufferCacheProvider, btreeRegistryProvider, btreeSplitProviderB, fileMappingProviderProvider, interiorFrameFactory, leafFrameFactory, secondaryFieldCount, secondaryComparatorFactories, fieldPermutationB, BTreeOp.BTO_INSERT);
PartitionConstraint insertPartitionConstraintB = new ExplicitPartitionConstraint(new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
insertOpB.setPartitionConstraint(insertPartitionConstraintB);
// second secondary index
+ IFileSplitProvider btreeSplitProviderC = new ConstantFileSplitProvider(
+ new FileSplit[] { new FileSplit(NC1_ID, new File("/tmp/btreetestC.ix")) } );
int[] fieldPermutationC = { 4, 0 };
- BTreeInsertUpdateDeleteOperatorDescriptor insertOpC = new BTreeInsertUpdateDeleteOperatorDescriptor(spec, ordersDesc, bufferCacheProvider, btreeRegistryProvider, "btreetestC.ix", fileMappingProviderProvider, interiorFrameFactory, leafFrameFactory, secondaryFieldCount, secondaryComparatorFactories, fieldPermutationC, BTreeOp.BTO_INSERT);
+ BTreeInsertUpdateDeleteOperatorDescriptor insertOpC = new BTreeInsertUpdateDeleteOperatorDescriptor(spec, ordersDesc, bufferCacheProvider, btreeRegistryProvider, btreeSplitProviderC, fileMappingProviderProvider, interiorFrameFactory, leafFrameFactory, secondaryFieldCount, secondaryComparatorFactories, fieldPermutationC, BTreeOp.BTO_INSERT);
PartitionConstraint insertPartitionConstraintC = new ExplicitPartitionConstraint(new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
insertOpC.setPartitionConstraint(insertPartitionConstraintC);
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/AbstractBTreeOperatorDescriptor.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/AbstractBTreeOperatorDescriptor.java
index 03f60a0..083afef 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/AbstractBTreeOperatorDescriptor.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/AbstractBTreeOperatorDescriptor.java
@@ -19,6 +19,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrameFactory;
@@ -26,21 +27,22 @@
private static final long serialVersionUID = 1L;
- protected String btreeFileName;
protected IFileMappingProviderProvider fileMappingProviderProvider;
- protected int fieldCount;
- protected IBinaryComparatorFactory[] comparatorFactories;
+ protected final IFileSplitProvider fileSplitProvider;
- protected IBTreeInteriorFrameFactory interiorFrameFactory;
- protected IBTreeLeafFrameFactory leafFrameFactory;
+ protected final int fieldCount;
+ protected final IBinaryComparatorFactory[] comparatorFactories;
- protected IBufferCacheProvider bufferCacheProvider;
- protected IBTreeRegistryProvider btreeRegistryProvider;
-
- public AbstractBTreeOperatorDescriptor(JobSpecification spec, int inputArity, int outputArity, RecordDescriptor recDesc, IBufferCacheProvider bufferCacheProvider, IBTreeRegistryProvider btreeRegistryProvider, String btreeFileName, IFileMappingProviderProvider fileMappingProviderProvider, IBTreeInteriorFrameFactory interiorFactory, IBTreeLeafFrameFactory leafFactory, int fieldCount, IBinaryComparatorFactory[] comparatorFactories) {
+ protected final IBTreeInteriorFrameFactory interiorFrameFactory;
+ protected final IBTreeLeafFrameFactory leafFrameFactory;
+
+ protected final IBufferCacheProvider bufferCacheProvider;
+ protected final IBTreeRegistryProvider btreeRegistryProvider;
+
+ public AbstractBTreeOperatorDescriptor(JobSpecification spec, int inputArity, int outputArity, RecordDescriptor recDesc, IBufferCacheProvider bufferCacheProvider, IBTreeRegistryProvider btreeRegistryProvider, IFileSplitProvider fileSplitProvider, IFileMappingProviderProvider fileMappingProviderProvider, IBTreeInteriorFrameFactory interiorFactory, IBTreeLeafFrameFactory leafFactory, int fieldCount, IBinaryComparatorFactory[] comparatorFactories) {
super(spec, inputArity, outputArity);
- this.btreeFileName = btreeFileName;
+ this.fileSplitProvider = fileSplitProvider;
this.fileMappingProviderProvider = fileMappingProviderProvider;
this.bufferCacheProvider = bufferCacheProvider;
this.btreeRegistryProvider = btreeRegistryProvider;
@@ -51,8 +53,8 @@
if(outputArity > 0) recordDescriptors[0] = recDesc;
}
- public String getBtreeFileName() {
- return btreeFileName;
+ public IFileSplitProvider getFileSplitProvider() {
+ return fileSplitProvider;
}
public IFileMappingProviderProvider getFileMappingProviderProvider() {
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorDescriptor.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorDescriptor.java
index 2faca9d..d96ae15 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorDescriptor.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorDescriptor.java
@@ -21,6 +21,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrameFactory;
@@ -28,18 +29,18 @@
private static final long serialVersionUID = 1L;
- private int[] fieldPermutation;
- private float fillFactor;
-
+ private final int[] fieldPermutation;
+ private final float fillFactor;
+
public BTreeBulkLoadOperatorDescriptor(JobSpecification spec,
IBufferCacheProvider bufferCacheProvider,
IBTreeRegistryProvider btreeRegistryProvider,
- String btreeFileName, IFileMappingProviderProvider fileMappingProviderProvider, IBTreeInteriorFrameFactory interiorFactory,
+ IFileSplitProvider fileSplitProvider, IFileMappingProviderProvider fileMappingProviderProvider, IBTreeInteriorFrameFactory interiorFactory,
IBTreeLeafFrameFactory leafFactory, int fieldCount,
IBinaryComparatorFactory[] comparatorFactories,
int[] fieldPermutation, float fillFactor) {
super(spec, 1, 0, null, bufferCacheProvider,
- btreeRegistryProvider, btreeFileName, fileMappingProviderProvider, interiorFactory,
+ btreeRegistryProvider, fileSplitProvider, fileMappingProviderProvider, interiorFactory,
leafFactory, fieldCount, comparatorFactories);
this.fieldPermutation = fieldPermutation;
this.fillFactor = fillFactor;
@@ -49,7 +50,7 @@
public IOperatorNodePushable createPushRuntime(IHyracksContext ctx,
IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition,
- int nPartitions) {
- return new BTreeBulkLoadOperatorNodePushable(this, ctx, fieldPermutation, fillFactor, recordDescProvider);
+ int nPartitions) {
+ return new BTreeBulkLoadOperatorNodePushable(this, ctx, partition, fieldPermutation, fillFactor, recordDescProvider);
}
}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
index f348b0b..c2d3a10 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
@@ -31,14 +31,14 @@
private final BTreeOpHelper btreeOpHelper;
private FrameTupleAccessor accessor;
private BTree.BulkLoadContext bulkLoadCtx;
-
+
private IRecordDescriptorProvider recordDescProvider;
private PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
public BTreeBulkLoadOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx,
- int[] fieldPermutation, float fillFactor, IRecordDescriptorProvider recordDescProvider) {
- btreeOpHelper = new BTreeOpHelper(opDesc, ctx, true);
+ int partition, int[] fieldPermutation, float fillFactor, IRecordDescriptorProvider recordDescProvider) {
+ btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, true);
this.fillFactor = fillFactor;
this.recordDescProvider = recordDescProvider;
tuple.setFieldPermutation(fieldPermutation);
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorDescriptor.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorDescriptor.java
index 2ee8457..f0dbf9c 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorDescriptor.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorDescriptor.java
@@ -21,6 +21,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrameFactory;
@@ -32,11 +33,11 @@
RecordDescriptor recDesc,
IBufferCacheProvider bufferCacheProvider,
IBTreeRegistryProvider btreeRegistryProvider,
- String btreeFileName, IFileMappingProviderProvider fileMappingProviderProvider, IBTreeInteriorFrameFactory interiorFactory,
+ IFileSplitProvider fileSplitProvider, IFileMappingProviderProvider fileMappingProviderProvider, IBTreeInteriorFrameFactory interiorFactory,
IBTreeLeafFrameFactory leafFactory,
int fieldCount) {
super(spec, 0, 1, recDesc, bufferCacheProvider,
- btreeRegistryProvider, btreeFileName, fileMappingProviderProvider, interiorFactory,
+ btreeRegistryProvider, fileSplitProvider, fileMappingProviderProvider, interiorFactory,
leafFactory, fieldCount, null);
}
@@ -45,6 +46,6 @@
IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition,
int nPartitions) {
- return new BTreeDiskOrderScanOperatorNodePushable(this, ctx);
+ return new BTreeDiskOrderScanOperatorNodePushable(this, ctx, partition);
}
}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
index 3ba111e..50b4e94 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
@@ -34,8 +34,8 @@
public class BTreeDiskOrderScanOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
private final BTreeOpHelper btreeOpHelper;
- public BTreeDiskOrderScanOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx) {
- btreeOpHelper = new BTreeOpHelper(opDesc, ctx, false);
+ public BTreeDiskOrderScanOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx, int partition) {
+ btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, false);
}
@Override
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDropOperatorDescriptor.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDropOperatorDescriptor.java
index fa3a72f..295fd50 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDropOperatorDescriptor.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDropOperatorDescriptor.java
@@ -21,26 +21,26 @@
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.common.file.IFileMappingProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
public class BTreeDropOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
- private String btreeFileName;
private IBufferCacheProvider bufferCacheProvider;
private IBTreeRegistryProvider btreeRegistryProvider;
private IFileMappingProviderProvider fileMappingProviderProvider;
+ private IFileSplitProvider fileSplitProvider;
public BTreeDropOperatorDescriptor(JobSpecification spec,
IBufferCacheProvider bufferCacheProvider,
IBTreeRegistryProvider btreeRegistryProvider,
- String btreeFileName, IFileMappingProviderProvider fileMappingProviderProvider) {
+ IFileSplitProvider fileSplitProvider, IFileMappingProviderProvider fileMappingProviderProvider) {
super(spec, 0, 0);
- this.btreeFileName = btreeFileName;
this.fileMappingProviderProvider = fileMappingProviderProvider;
this.bufferCacheProvider = bufferCacheProvider;
this.btreeRegistryProvider = btreeRegistryProvider;
+ this.fileSplitProvider = fileSplitProvider;
}
@Override
@@ -48,6 +48,6 @@
IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition,
int nPartitions) {
- return new BTreeDropOperatorNodePushable(bufferCacheProvider, btreeRegistryProvider, btreeFileName, fileMappingProviderProvider);
+ return new BTreeDropOperatorNodePushable(bufferCacheProvider, btreeRegistryProvider, fileSplitProvider, partition, fileMappingProviderProvider);
}
}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDropOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDropOperatorNodePushable.java
index d2a73fb..3b3c94e 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDropOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDropOperatorNodePushable.java
@@ -6,6 +6,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.storage.common.file.FileManager;
public class BTreeDropOperatorNodePushable extends AbstractOperatorNodePushable {
@@ -14,14 +15,17 @@
private IBTreeRegistryProvider btreeRegistryProvider;
private IBufferCacheProvider bufferCacheProvider;
private IFileMappingProviderProvider fileMappingProviderProvider;
-
+ private IFileSplitProvider fileSplitProvider;
+ private int partition;
+
public BTreeDropOperatorNodePushable(IBufferCacheProvider bufferCacheProvider,
- IBTreeRegistryProvider btreeRegistryProvider, String btreeFileName,
+ IBTreeRegistryProvider btreeRegistryProvider, IFileSplitProvider fileSplitProvider, int partition,
IFileMappingProviderProvider fileMappingProviderProvider) {
- this.btreeFileName = btreeFileName;
this.fileMappingProviderProvider = fileMappingProviderProvider;
this.bufferCacheProvider = bufferCacheProvider;
this.btreeRegistryProvider = btreeRegistryProvider;
+ this.fileSplitProvider = fileSplitProvider;
+ this.partition = partition;
}
@Override
@@ -43,10 +47,10 @@
BTreeRegistry btreeRegistry = btreeRegistryProvider.getBTreeRegistry();
FileManager fileManager = bufferCacheProvider.getFileManager();
-
- String ncDataPath = System.getProperty("NodeControllerDataPath");
- String fileName = ncDataPath + btreeFileName;
-
+
+ File f = fileSplitProvider.getFileSplits()[partition].getLocalFile();
+ String fileName = f.getAbsolutePath();
+
int btreeFileId = fileMappingProviderProvider.getFileMappingProvider().mapNameToFileId(fileName, false);
// unregister btree instance
@@ -59,8 +63,7 @@
// unregister file
fileManager.unregisterFile(btreeFileId);
-
- File f = new File(fileName);
+
if (f.exists()) {
f.delete();
}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorDescriptor.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorDescriptor.java
index b516e8a..8a2e439 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorDescriptor.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorDescriptor.java
@@ -22,6 +22,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeOp;
@@ -38,13 +39,13 @@
RecordDescriptor recDesc,
IBufferCacheProvider bufferCacheProvider,
IBTreeRegistryProvider btreeRegistryProvider,
- String btreeFileName, IFileMappingProviderProvider fileMappingProviderProvider,
+ IFileSplitProvider fileSplitProvider, IFileMappingProviderProvider fileMappingProviderProvider,
IBTreeInteriorFrameFactory interiorFactory,
IBTreeLeafFrameFactory leafFactory, int fieldCount,
IBinaryComparatorFactory[] comparatorFactories,
int[] fieldPermutation, BTreeOp op) {
super(spec, 1, 1, recDesc, bufferCacheProvider,
- btreeRegistryProvider, btreeFileName, fileMappingProviderProvider, interiorFactory,
+ btreeRegistryProvider, fileSplitProvider, fileMappingProviderProvider, interiorFactory,
leafFactory, fieldCount, comparatorFactories);
this.fieldPermutation = fieldPermutation;
this.op = op;
@@ -55,6 +56,6 @@
IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition,
int nPartitions) {
- return new BTreeInsertUpdateDeleteOperatorNodePushable(this, ctx, fieldPermutation, recordDescProvider, op);
+ return new BTreeInsertUpdateDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation, recordDescProvider, op);
}
}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
index e1fedda..a272d56 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
@@ -46,8 +46,8 @@
private ByteBuffer writeBuffer;
public BTreeInsertUpdateDeleteOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx,
- int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, BTreeOp op) {
- btreeOpHelper = new BTreeOpHelper(opDesc, ctx, false);
+ int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, BTreeOp op) {
+ btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, false);
this.recordDescProvider = recordDescProvider;
this.op = op;
tuple.setFieldPermutation(fieldPermutation);
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java
index 97b5681..e423487 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java
@@ -19,6 +19,7 @@
import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrame;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
import edu.uci.ics.hyracks.storage.am.btree.frames.MetaDataFrame;
@@ -34,38 +35,40 @@
private BTree btree;
private int btreeFileId = -1;
+ private int partition;
private AbstractBTreeOperatorDescriptor opDesc;
private IHyracksContext ctx;
private boolean createBTree;
- BTreeOpHelper(AbstractBTreeOperatorDescriptor opDesc, final IHyracksContext ctx, boolean createBTree) {
+ BTreeOpHelper(AbstractBTreeOperatorDescriptor opDesc, final IHyracksContext ctx, int partition, boolean createBTree) {
this.opDesc = opDesc;
this.ctx = ctx;
this.createBTree = createBTree;
+ this.partition = partition;
}
void init() throws Exception {
IBufferCache bufferCache = opDesc.getBufferCacheProvider().getBufferCache();
FileManager fileManager = opDesc.getBufferCacheProvider().getFileManager();
- IFileMappingProviderProvider fileMappingProviderProvider = opDesc.getFileMappingProviderProvider();
-
- String ncDataPath = System.getProperty("NodeControllerDataPath");
- String fileName = ncDataPath + opDesc.getBtreeFileName();
+ IFileMappingProviderProvider fileMappingProviderProvider = opDesc.getFileMappingProviderProvider();
+ IFileSplitProvider fileSplitProvider = opDesc.getFileSplitProvider();
- btreeFileId = fileMappingProviderProvider.getFileMappingProvider().mapNameToFileId(fileName, createBTree);
-
- File f = new File(fileName);
+ //String ncDataPath = System.getProperty("NodeControllerDataPath");
+ File f = fileSplitProvider.getFileSplits()[partition].getLocalFile();
if(!f.exists()) {
File dir = new File(f.getParent());
dir.mkdirs();
}
RandomAccessFile raf = new RandomAccessFile(f, "rw");
+ String fileName = f.getAbsolutePath();
+ btreeFileId = fileMappingProviderProvider.getFileMappingProvider().mapNameToFileId(fileName, createBTree);
+
if (!f.exists() && !createBTree) {
- throw new Exception("Trying to open btree from file " + opDesc.getBtreeFileName() + " but file doesn't exist.");
+ throw new Exception("Trying to open btree from file " + fileName + " but file doesn't exist.");
}
try {
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
index b75b3a9..4d84c41 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
@@ -22,6 +22,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrameFactory;
@@ -33,8 +34,8 @@
private int[] lowKeyFields; // fields in input tuple to be used as low keys
private int[] highKeyFields; // fields in input tuple to be used as high keys
- public BTreeSearchOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc, IBufferCacheProvider bufferCacheProvider, IBTreeRegistryProvider btreeRegistryProvider, String btreeFileName, IFileMappingProviderProvider fileMappingProviderProvider, IBTreeInteriorFrameFactory interiorFactory, IBTreeLeafFrameFactory leafFactory, int fieldCount, IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields) {
- super(spec, 1, 1, recDesc, bufferCacheProvider, btreeRegistryProvider, btreeFileName, fileMappingProviderProvider, interiorFactory, leafFactory, fieldCount, comparatorFactories);
+ public BTreeSearchOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc, IBufferCacheProvider bufferCacheProvider, IBTreeRegistryProvider btreeRegistryProvider, IFileSplitProvider fileSplitProvider, IFileMappingProviderProvider fileMappingProviderProvider, IBTreeInteriorFrameFactory interiorFactory, IBTreeLeafFrameFactory leafFactory, int fieldCount, IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields) {
+ super(spec, 1, 1, recDesc, bufferCacheProvider, btreeRegistryProvider, fileSplitProvider, fileMappingProviderProvider, interiorFactory, leafFactory, fieldCount, comparatorFactories);
this.isForward = isForward;
this.lowKeyFields = lowKeyFields;
this.highKeyFields = highKeyFields;
@@ -43,6 +44,6 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new BTreeSearchOperatorNodePushable(this, ctx, recordDescProvider, isForward, lowKeyFields, highKeyFields);
+ return new BTreeSearchOperatorNodePushable(this, ctx, partition, recordDescProvider, isForward, lowKeyFields, highKeyFields);
}
}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index 80b04ac..391659d 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -58,8 +58,8 @@
private RecordDescriptor recDesc;
- public BTreeSearchOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx, IRecordDescriptorProvider recordDescProvider, boolean isForward, int[] lowKeyFields, int[] highKeyFields) {
- btreeOpHelper = new BTreeOpHelper(opDesc, ctx, false);
+ public BTreeSearchOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward, int[] lowKeyFields, int[] highKeyFields) {
+ btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, false);
this.isForward = isForward;
this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
if(lowKeyFields != null && lowKeyFields.length > 0) {