merge master
diff --git a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index 7c91c02..4175078 100644
--- a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -420,7 +420,7 @@
private M allocateMessage() {
M message;
if (usedMessage < msgPool.size()) {
- message = msgPool.get(usedEdge);
+ message = msgPool.get(usedMessage);
usedMessage++;
} else {
message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
diff --git a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexPartitioner.java b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexPartitioner.java
new file mode 100644
index 0000000..f51ad88
--- /dev/null
+++ b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexPartitioner.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.graph;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Users can extend this class to implement the desired vertex partitioning behavior.
+ *
+ * @author yingyib
+ */
+@SuppressWarnings("rawtypes")
+public abstract class VertexPartitioner<I extends WritableComparable> {
+
+ /**
+ * @param vertexId
+ * The input vertex id.
+ * @param nPartitions
+ * The total number of partitions.
+ * @return The partition id.
+ */
+ public abstract int getPartitionId(I vertexId, int nPartitions);
+
+}
diff --git a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 81472aa..4cddaf0 100644
--- a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -24,6 +24,7 @@
import edu.uci.ics.pregelix.api.graph.MessageCombiner;
import edu.uci.ics.pregelix.api.graph.NormalizedKeyComputer;
import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
@@ -59,6 +60,8 @@
public static final String FINAL_AGGREGATE_VALUE_CLASS = "pregelix.finalAggregateValueClass";
/** The normalized key computer class */
public static final String NMK_COMPUTER_CLASS = "pregelix.nmkComputerClass";
+ /** The partitioner class */
+ public static final String PARTITIONER_CLASS = "pregelix.partitionerClass";
/** num of vertices */
public static final String NUM_VERTICE = "pregelix.numVertices";
/** num of edges */
@@ -69,6 +72,8 @@
public static final String JOB_ID = "pregelix.jobid";
/** frame size */
public static final String FRAME_SIZE = "pregelix.framesize";
+ /** update intensive */
+ public static final String UPDATE_INTENSIVE = "pregelix.updateIntensive";
/**
* Constructor that will instantiate the configuration
@@ -178,4 +183,22 @@
final public void setNoramlizedKeyComputerClass(Class<?> nkcClass) {
getConfiguration().setClass(NMK_COMPUTER_CLASS, nkcClass, NormalizedKeyComputer.class);
}
+
+ /**
+ * Set the vertex partitioner class
+ *
+ * @param partitionerClass
+ */
+ final public void setVertexPartitionerClass(Class<?> partitionerClass) {
+ getConfiguration().setClass(PARTITIONER_CLASS, partitionerClass, VertexPartitioner.class);
+ }
+
+ /**
+ * Indicate if the job needs to do a lot of graph mutations or variable size updates
+ *
+ * @param updateHeavyFlag
+ */
+ final public void setMutationOrVariableSizedUpdateHeavy(boolean variableSizedUpdateHeavyFlag) {
+ getConfiguration().setBoolean(UPDATE_INTENSIVE, variableSizedUpdateHeavyFlag);
+ }
}
diff --git a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 6bac923..03c37dc 100644
--- a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -25,6 +25,7 @@
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.graph.NormalizedKeyComputer;
import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -156,7 +157,7 @@
}
/**
- * Create a global aggregator class
+ * Create a global aggregator object
*
* @param conf
* Configuration to check
@@ -391,9 +392,9 @@
try {
return aggregateValueClass.newInstance();
} catch (InstantiationException e) {
- throw new IllegalArgumentException("createMessageValue: Failed to instantiate", e);
+ throw new IllegalArgumentException("createPartialAggregateValue: Failed to instantiate", e);
} catch (IllegalAccessException e) {
- throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
+ throw new IllegalArgumentException("createPartialAggregateValue: Illegally accessed", e);
}
}
@@ -415,9 +416,9 @@
}
return instance;
} catch (InstantiationException e) {
- throw new IllegalArgumentException("createMessageValue: Failed to instantiate", e);
+ throw new IllegalArgumentException("createPartialCombineValue: Failed to instantiate", e);
} catch (IllegalAccessException e) {
- throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
+ throw new IllegalArgumentException("createPartialCombineValue: Illegally accessed", e);
}
}
@@ -433,13 +434,46 @@
try {
return aggregateValueClass.newInstance();
} catch (InstantiationException e) {
- throw new IllegalArgumentException("createMessageValue: Failed to instantiate", e);
+ throw new IllegalArgumentException("createAggregateValue: Failed to instantiate", e);
} catch (IllegalAccessException e) {
- throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
+ throw new IllegalArgumentException("createAggregateValue: Illegally accessed", e);
}
}
/**
+ * Create a user aggregate value
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user aggregate value
+ */
+ @SuppressWarnings("rawtypes")
+ public static VertexPartitioner createVertexPartitioner(Configuration conf) {
+ Class<? extends VertexPartitioner> vertexPartitionerClass = getVertexPartitionerClass(conf);
+ try {
+ return vertexPartitionerClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("createVertexPartitioner: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("createVertexPartitioner: Illegally accessed", e);
+ }
+ }
+
+ /**
+ * Get the user's subclassed vertex partitioner class.
+ *
+ * @param conf
+ * Configuration to check
+ * @return The user defined vertex partitioner class
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public static <V extends VertexPartitioner> Class<V> getVertexPartitionerClass(Configuration conf) {
+ if (conf == null)
+ conf = defaultConf;
+ return (Class<V>) conf.getClass(PregelixJob.PARTITIONER_CLASS, null, VertexPartitioner.class);
+ }
+
+ /**
* Get the job configuration parameter whether the vertex states will increase dynamically
*
* @param conf
@@ -460,4 +494,14 @@
public static int getFrameSize(Configuration conf) {
return conf.getInt(PregelixJob.FRAME_SIZE, -1);
}
+
+ /**
+ * Should the job use LSM or B-tree to store vertices
+ *
+ * @param conf
+ * @return
+ */
+ public static boolean useLSM(Configuration conf) {
+ return conf.getBoolean(PregelixJob.UPDATE_INTENSIVE, false);
+ }
}
diff --git a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultVertexPartitioner.java b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultVertexPartitioner.java
new file mode 100644
index 0000000..263ec65
--- /dev/null
+++ b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultVertexPartitioner.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
+
+/**
+ * The deafult vertex partitioner which use the hashcode of the vertex id to determine the partition
+ * of the vertex.
+ *
+ * @author yingyib
+ */
+@SuppressWarnings("rawtypes")
+public class DefaultVertexPartitioner<I extends WritableComparable> extends VertexPartitioner<I> {
+
+ @Override
+ public int getPartitionId(I vertexId, int nPartitions) {
+ return vertexId.hashCode() % nPartitions;
+ }
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java
index c0bbafd..1600ab5 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java
@@ -45,4 +45,15 @@
throw new HyracksDataException(e);
}
}
+
+ @Override
+ public Configuration createConfiguration() throws HyracksDataException{
+ try {
+ Configuration conf = new Configuration();
+ SerDeUtils.deserialize(conf, data);
+ return conf;
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index dacb432..dc1c73f 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -53,15 +53,22 @@
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpOperationTrackerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.MessageCombiner;
import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
@@ -80,8 +87,10 @@
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.bootstrap.IndexLifeCycleManagerProvider;
import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
+import edu.uci.ics.pregelix.runtime.bootstrap.VirtualBufferCacheProvider;
import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexPartitionComputerFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.WritableSerializerDeserializerFactory;
public abstract class JobGen implements IJobGen {
@@ -169,7 +178,7 @@
IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(spec,
storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
- new BTreeDataflowHelperFactory(), new TransientLocalResourceFactoryProvider(),
+ getIndexDataflowHelperFactory(), new TransientLocalResourceFactoryProvider(),
NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, btreeCreate);
spec.setFrameSize(frameSize);
@@ -230,15 +239,14 @@
typeTraits[1] = new TypeTraits(false);
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
- fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, false, 0, false, new BTreeDataflowHelperFactory(),
+ fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, false, 0, false, getIndexDataflowHelperFactory(),
NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
/**
* connect operator descriptors
*/
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
- new WritableSerializerDeserializerFactory(vertexIdClass));
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sorter, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
spec.setFrameSize(frameSize);
@@ -313,8 +321,7 @@
/**
* connect operator descriptors
*/
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
- new WritableSerializerDeserializerFactory(vertexIdClass));
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
comparatorFactories), sorter, 0, writer, 0);
@@ -358,7 +365,7 @@
typeTraits[1] = new TypeTraits(false);
BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
- null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
+ null, null, true, true, getIndexDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, scanner);
/**
@@ -381,8 +388,7 @@
*/
int[] sortFields = new int[1];
sortFields[0] = 0;
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
- new WritableSerializerDeserializerFactory(vertexIdClass));
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
comparatorFactories), scanner, 0, writer, 0);
@@ -428,7 +434,7 @@
BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
- null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
+ null, null, true, true, getIndexDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, scanner);
/**
@@ -459,7 +465,7 @@
IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
IndexDropOperatorDescriptor drop = new IndexDropOperatorDescriptor(spec, storageManagerInterface,
- lcManagerProvider, fileSplitProvider, new BTreeDataflowHelperFactory());
+ lcManagerProvider, fileSplitProvider, getIndexDataflowHelperFactory());
ClusterConfig.setLocationConstraint(spec, drop);
spec.addRoot(drop);
@@ -467,6 +473,28 @@
return spec;
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ protected ITuplePartitionComputerFactory getVertexPartitionComputerFactory() {
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ Class<? extends VertexPartitioner> partitionerClazz = BspUtils.getVertexPartitionerClass(conf);
+ if (partitionerClazz != null) {
+ return new VertexPartitionComputerFactory(confFactory);
+ } else {
+ return new VertexIdPartitionComputerFactory(new WritableSerializerDeserializerFactory(
+ BspUtils.getVertexIndexClass(conf)));
+ }
+ }
+
+ protected IIndexDataflowHelperFactory getIndexDataflowHelperFactory() {
+ if (BspUtils.useLSM(conf)) {
+ return new LSMBTreeDataflowHelperFactory(new VirtualBufferCacheProvider(), new ConstantMergePolicyProvider(
+ 3), NoOpOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
+ NoOpIOOperationCallback.INSTANCE, 0.01);
+ } else {
+ return new BTreeDataflowHelperFactory();
+ }
+ }
+
/** generate non-first iteration job */
protected abstract JobSpecification generateNonFirstIteration(int iteration) throws HyracksException;
@@ -476,4 +504,4 @@
/** generate clean-up job */
public abstract JobSpecification[] generateCleanup() throws HyracksException;
-}
\ No newline at end of file
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index d9a24bc..2bab291 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -35,14 +35,9 @@
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
-import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
@@ -59,11 +54,11 @@
import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
-import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.TreeIndexBulkReLoadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.function.ComputeUpdateFunctionFactory;
@@ -72,8 +67,6 @@
import edu.uci.ics.pregelix.runtime.touchpoint.PostSuperStepRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.WritableSerializerDeserializerFactory;
public class JobGenInnerJoin extends JobGen {
@@ -135,12 +128,11 @@
vertexClass.getName());
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
- BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
+ TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
- new BTreeDataflowHelperFactory(), inputRdFactory, 6,
- new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
- rdPartialAggregate, rdInsert, rdDelete, rdFinal);
+ getIndexDataflowHelperFactory(), inputRdFactory, 6, new StartComputeUpdateFunctionFactory(confFactory),
+ preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
ClusterConfig.setLocationConstraint(spec, scanner);
/**
@@ -168,7 +160,7 @@
WritableComparator.get(vertexIdClass).getClass());
TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
storageManagerInterface, lcManagerProvider, secondaryFileSplitProvider, typeTraits, indexCmpFactories,
- fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
+ fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, getIndexDataflowHelperFactory());
ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
/**
@@ -223,7 +215,7 @@
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
@@ -234,7 +226,7 @@
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
- new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
+ getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -245,8 +237,7 @@
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
ClusterConfig.setLocationConstraint(spec, emptySink4);
- ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
- new WritableSerializerDeserializerFactory(vertexIdClass));
+ ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
/** connect all operators **/
@@ -339,14 +330,9 @@
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
- ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
- typeTraits));
- ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
- typeTraits));
IndexNestedLoopJoinOperatorDescriptor setUnion = new IndexNestedLoopJoinOperatorDescriptor(spec, rdFinal,
- storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, interiorFrameFactory,
- leafFrameFactory, typeTraits, comparatorFactories, true, keyFields, keyFields, true, true,
- new BTreeDataflowHelperFactory(), true);
+ storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, typeTraits,
+ comparatorFactories, true, keyFields, keyFields, true, true, getIndexDataflowHelperFactory(), true);
ClusterConfig.setLocationConstraint(spec, setUnion);
/**
@@ -364,7 +350,7 @@
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
- new BTreeDataflowHelperFactory(), inputRdFactory, 6, new ComputeUpdateFunctionFactory(confFactory),
+ getIndexDataflowHelperFactory(), inputRdFactory, 6, new ComputeUpdateFunctionFactory(confFactory),
preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
ClusterConfig.setLocationConstraint(spec, join);
@@ -379,7 +365,7 @@
IFileSplitProvider secondaryFileSplitProviderWrite = ClusterConfig.getFileSplitProvider(jobId, writeFile);
TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderWrite, typeTraits,
- indexCmpFactories, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
+ indexCmpFactories, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, getIndexDataflowHelperFactory());
ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
/**
@@ -447,7 +433,7 @@
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
@@ -458,7 +444,7 @@
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
- new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
+ getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -470,8 +456,7 @@
ClusterConfig.setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
- ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
- new WritableSerializerDeserializerFactory(vertexIdClass));
+ ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
/** connect all operators **/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 5faf122..3af8921 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -35,14 +35,9 @@
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
-import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
@@ -59,9 +54,9 @@
import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
-import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.function.ComputeUpdateFunctionFactory;
@@ -72,8 +67,6 @@
import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNullWriterFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.WritableSerializerDeserializerFactory;
public class JobGenOuterJoin extends JobGen {
@@ -129,12 +122,11 @@
vertexClass.getName());
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
- BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
+ TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
- new BTreeDataflowHelperFactory(), inputRdFactory, 5,
- new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
- rdPartialAggregate, rdInsert, rdDelete);
+ getIndexDataflowHelperFactory(), inputRdFactory, 5, new StartComputeUpdateFunctionFactory(confFactory),
+ preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
ClusterConfig.setLocationConstraint(spec, scanner);
/**
@@ -206,8 +198,8 @@
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
- null, NoOpOperationCallbackFactory.INSTANCE);
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
+ NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
@@ -216,8 +208,8 @@
int[] fieldPermutationDelete = new int[] { 0 };
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
- new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
+ comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -227,8 +219,7 @@
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
ClusterConfig.setLocationConstraint(spec, emptySink4);
- ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
- new WritableSerializerDeserializerFactory(vertexIdClass));
+ ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
/** connect all operators **/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
@@ -318,10 +309,6 @@
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
- ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
- typeTraits));
- ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
- typeTraits));
INullWriterFactory[] nullWriterFactories = new INullWriterFactory[2];
nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
@@ -335,11 +322,10 @@
vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
- spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory,
- leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
- keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
- new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
- rdPartialAggregate, rdInsert, rdDelete);
+ spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true, getIndexDataflowHelperFactory(), true,
+ nullWriterFactories, inputRdFactory, 5, new ComputeUpdateFunctionFactory(confFactory), preHookFactory,
+ null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
ClusterConfig.setLocationConstraint(spec, join);
/**
@@ -408,8 +394,8 @@
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
- null, NoOpOperationCallbackFactory.INSTANCE);
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
+ NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
@@ -418,7 +404,7 @@
int[] fieldPermutationDelete = new int[] { 0 };
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
@@ -431,8 +417,7 @@
ClusterConfig.setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
- ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
- new WritableSerializerDeserializerFactory(vertexIdClass));
+ ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
/** connect all operators **/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index fee738e..50949aa 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -34,14 +34,9 @@
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
-import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
@@ -58,9 +53,9 @@
import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
-import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.function.ComputeUpdateFunctionFactory;
@@ -71,8 +66,6 @@
import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNullWriterFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.WritableSerializerDeserializerFactory;
public class JobGenOuterJoinSingleSort extends JobGen {
@@ -131,12 +124,11 @@
vertexClass.getName());
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
- BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
+ TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
- new BTreeDataflowHelperFactory(), inputRdFactory, 5,
- new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
- rdPartialAggregate, rdInsert, rdDelete);
+ getIndexDataflowHelperFactory(), inputRdFactory, 5, new StartComputeUpdateFunctionFactory(confFactory),
+ preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
ClusterConfig.setLocationConstraint(spec, scanner);
/**
@@ -198,8 +190,8 @@
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
- null, NoOpOperationCallbackFactory.INSTANCE);
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
+ NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
@@ -208,8 +200,8 @@
int[] fieldPermutationDelete = new int[] { 0 };
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
- new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
+ comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -221,8 +213,7 @@
ClusterConfig.setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
- ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
- new WritableSerializerDeserializerFactory(vertexIdClass));
+ ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
/** connect all operators **/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
@@ -307,10 +298,6 @@
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
- ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
- typeTraits));
- ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
- typeTraits));
INullWriterFactory[] nullWriterFactories = new INullWriterFactory[2];
nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
@@ -324,11 +311,10 @@
vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
- spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory,
- leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
- keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
- new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
- rdPartialAggregate, rdInsert, rdDelete);
+ spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true, getIndexDataflowHelperFactory(), true,
+ nullWriterFactories, inputRdFactory, 5, new ComputeUpdateFunctionFactory(confFactory), preHookFactory,
+ null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
ClusterConfig.setLocationConstraint(spec, join);
/**
@@ -385,8 +371,8 @@
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
- null, NoOpOperationCallbackFactory.INSTANCE);
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
+ NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
@@ -395,8 +381,8 @@
int[] fieldPermutationDelete = new int[] { 0 };
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
- new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
+ comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -408,8 +394,7 @@
ClusterConfig.setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
- ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
- new WritableSerializerDeserializerFactory(vertexIdClass));
+ ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
/** connect all operators **/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index eef6b7e..362e413 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -34,14 +34,9 @@
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
-import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
@@ -58,9 +53,9 @@
import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
-import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.function.ComputeUpdateFunctionFactory;
@@ -71,8 +66,6 @@
import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNullWriterFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.WritableSerializerDeserializerFactory;
public class JobGenOuterJoinSort extends JobGen {
@@ -128,12 +121,11 @@
vertexClass.getName());
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
- BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
+ TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
- new BTreeDataflowHelperFactory(), inputRdFactory, 5,
- new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
- rdPartialAggregate, rdInsert, rdDelete);
+ getIndexDataflowHelperFactory(), inputRdFactory, 5, new StartComputeUpdateFunctionFactory(confFactory),
+ preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
ClusterConfig.setLocationConstraint(spec, scanner);
/**
@@ -212,8 +204,8 @@
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
- null, NoOpOperationCallbackFactory.INSTANCE);
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
+ NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
@@ -222,8 +214,8 @@
int[] fieldPermutationDelete = new int[] { 0 };
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
- new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
+ comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -234,8 +226,7 @@
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
ClusterConfig.setLocationConstraint(spec, emptySink4);
- ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
- new WritableSerializerDeserializerFactory(vertexIdClass));
+ ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
/** connect all operators **/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
@@ -321,10 +312,6 @@
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
- ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
- typeTraits));
- ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
- typeTraits));
INullWriterFactory[] nullWriterFactories = new INullWriterFactory[2];
nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
@@ -338,11 +325,10 @@
vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
- spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory,
- leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
- keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
- new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
- rdPartialAggregate, rdInsert, rdDelete);
+ spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true, getIndexDataflowHelperFactory(), true,
+ nullWriterFactories, inputRdFactory, 5, new ComputeUpdateFunctionFactory(confFactory), preHookFactory,
+ null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
ClusterConfig.setLocationConstraint(spec, join);
/**
@@ -418,8 +404,8 @@
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
- null, NoOpOperationCallbackFactory.INSTANCE);
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
+ NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
@@ -428,8 +414,8 @@
int[] fieldPermutationDelete = new int[] { 0 };
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
- new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
+ comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -441,8 +427,7 @@
ClusterConfig.setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
- ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
- new WritableSerializerDeserializerFactory(vertexIdClass));
+ ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
/** connect all operators **/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java
index 0b3a7fe..e450380 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java
@@ -26,7 +26,6 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
@@ -91,8 +90,7 @@
public IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(JobSpecification spec,
IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
- IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
- ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
+ IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
boolean isRightOuter, INullWriterFactory[] nullWriterFactories, IRecordDescriptorFactory inputRdFactory,
@@ -125,8 +123,7 @@
public IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(JobSpecification spec,
IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
- IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
- ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
+ IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
boolean isSetUnion, IRecordDescriptorFactory inputRdFactory, int outputArity,
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index bd45509..238b775 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -28,13 +28,10 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -54,7 +51,7 @@
private ByteBuffer writeBuffer;
private FrameTupleAppender appender;
- private BTree btree;
+ private ITreeIndex index;
private PermutingFrameTupleReference lowKey;
private PermutingFrameTupleReference highKey;
private boolean lowKeyInclusive;
@@ -62,9 +59,8 @@
private RangePredicate rangePred;
private MultiComparator lowKeySearchCmp;
private MultiComparator highKeySearchCmp;
- private ITreeIndexCursor cursor;
- private ITreeIndexFrame cursorFrame;
- protected ITreeIndexAccessor indexAccessor;
+ private IIndexCursor cursor;
+ protected IIndexAccessor indexAccessor;
private RecordDescriptor recDesc;
private final IFrameWriter[] writers;
@@ -99,7 +95,7 @@
}
protected void setCursor() {
- cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, true);
+ cursor = indexAccessor.createSearchCursor();
}
@Override
@@ -112,13 +108,11 @@
try {
treeIndexOpHelper.open();
- btree = (BTree) treeIndexOpHelper.getIndexInstance();
- cursorFrame = btree.getLeafFrameFactory().createFrame();
- setCursor();
+ index = (ITreeIndex) treeIndexOpHelper.getIndexInstance();
// TODO: Can we construct the multicmps using helper methods?
- int lowKeySearchFields = btree.getComparatorFactories().length;
- int highKeySearchFields = btree.getComparatorFactories().length;
+ int lowKeySearchFields = index.getComparatorFactories().length;
+ int highKeySearchFields = index.getComparatorFactories().length;
if (lowKey != null)
lowKeySearchFields = lowKey.getFieldCount();
if (highKey != null)
@@ -126,7 +120,7 @@
IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
for (int i = 0; i < lowKeySearchFields; i++) {
- lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ lowKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
}
lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
@@ -135,10 +129,9 @@
} else {
IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
for (int i = 0; i < highKeySearchFields; i++) {
- highKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ highKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
}
highKeySearchCmp = new MultiComparator(highKeySearchComparators);
-
}
rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
@@ -147,9 +140,10 @@
appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
- updateBuffer.setFieldCount(btree.getFieldCount());
+ indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ setCursor();
+ cloneUpdateTb = new ArrayTupleBuilder(index.getFieldCount());
+ updateBuffer.setFieldCount(index.getFieldCount());
} catch (Exception e) {
treeIndexOpHelper.close();
throw new HyracksDataException(e);
@@ -201,7 +195,7 @@
try {
cursor.close();
//batch update
- updateBuffer.updateBTree(indexAccessor);
+ updateBuffer.updateIndex(indexAccessor);
} catch (Exception e) {
throw new HyracksDataException(e);
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java
index 6dc713c..440ae86 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java
@@ -87,8 +87,7 @@
public IndexNestedLoopJoinOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
- IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
- ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
+ IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
boolean isSetUnion) {
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
index d3114d2..221a818 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
@@ -29,10 +29,10 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -48,7 +48,7 @@
private ArrayTupleBuilder tb;
private DataOutput dos;
- private BTree btree;
+ private ITreeIndex index;
private PermutingFrameTupleReference lowKey;
private PermutingFrameTupleReference highKey;
private boolean lowKeyInclusive;
@@ -57,7 +57,7 @@
private MultiComparator lowKeySearchCmp;
private MultiComparator highKeySearchCmp;
private IIndexCursor cursor;
- protected ITreeIndexAccessor indexAccessor;
+ protected IIndexAccessor indexAccessor;
private RecordDescriptor recDesc;
private final RecordDescriptor inputRecDesc;
@@ -91,11 +91,11 @@
try {
treeIndexOpHelper.open();
- btree = (BTree) treeIndexOpHelper.getIndexInstance();
+ index = (ITreeIndex) treeIndexOpHelper.getIndexInstance();
writer.open();
- int lowKeySearchFields = btree.getComparatorFactories().length;
- int highKeySearchFields = btree.getComparatorFactories().length;
+ int lowKeySearchFields = index.getComparatorFactories().length;
+ int highKeySearchFields = index.getComparatorFactories().length;
if (lowKey != null)
lowKeySearchFields = lowKey.getFieldCount();
if (highKey != null)
@@ -103,7 +103,7 @@
IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
for (int i = 0; i < lowKeySearchFields; i++) {
- lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ lowKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
}
lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
@@ -112,7 +112,7 @@
} else {
IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
for (int i = 0; i < highKeySearchFields; i++) {
- highKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ highKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
}
highKeySearchCmp = new MultiComparator(highKeySearchComparators);
}
@@ -120,11 +120,11 @@
rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
highKeySearchCmp);
writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
- tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
+ tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + index.getFieldCount());
dos = tb.getDataOutput();
appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
setCursor();
} catch (Exception e) {
treeIndexOpHelper.close();
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index 289128e..06119ea 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -30,13 +30,10 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -60,13 +57,12 @@
private ArrayTupleBuilder nullTupleBuilder;
private DataOutput dos;
- private BTree btree;
+ private ITreeIndex index;
private RangePredicate rangePred;
private MultiComparator lowKeySearchCmp;
private MultiComparator highKeySearchCmp;
- private ITreeIndexCursor cursor;
- private ITreeIndexFrame cursorFrame;
- protected ITreeIndexAccessor indexAccessor;
+ private IIndexCursor cursor;
+ protected IIndexAccessor indexAccessor;
private RecordDescriptor recDesc;
private final RecordDescriptor inputRecDesc;
@@ -111,7 +107,7 @@
}
protected void setCursor() {
- cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, true);
+ cursor = indexAccessor.createSearchCursor();
}
@Override
@@ -124,18 +120,16 @@
try {
treeIndexOpHelper.open();
- btree = (BTree) treeIndexOpHelper.getIndexInstance();
- cursorFrame = btree.getLeafFrameFactory().createFrame();
- setCursor();
+ index = (ITreeIndex) treeIndexOpHelper.getIndexInstance();
// construct range predicate
// TODO: Can we construct the multicmps using helper methods?
- int lowKeySearchFields = btree.getComparatorFactories().length;
- int highKeySearchFields = btree.getComparatorFactories().length;
+ int lowKeySearchFields = index.getComparatorFactories().length;
+ int highKeySearchFields = index.getComparatorFactories().length;
IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
for (int i = 0; i < lowKeySearchFields; i++) {
- lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ lowKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
}
lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
@@ -144,7 +138,7 @@
} else {
IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
for (int i = 0; i < highKeySearchFields; i++) {
- highKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ highKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
}
highKeySearchCmp = new MultiComparator(highKeySearchComparators);
}
@@ -164,7 +158,8 @@
appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ setCursor();
/** set the search cursor */
rangePred.setLowKey(null, true);
@@ -179,8 +174,8 @@
match = false;
}
- cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
- updateBuffer.setFieldCount(btree.getFieldCount());
+ cloneUpdateTb = new ArrayTupleBuilder(index.getFieldCount());
+ updateBuffer.setFieldCount(index.getFieldCount());
} catch (Exception e) {
treeIndexOpHelper.close();
throw new HyracksDataException(e);
@@ -239,7 +234,7 @@
try {
cursor.close();
//batch update
- updateBuffer.updateBTree(indexAccessor);
+ updateBuffer.updateIndex(indexAccessor);
} catch (Exception e) {
throw new HyracksDataException(e);
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
index 6f7643e..48812b7 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
@@ -30,13 +30,10 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -53,14 +50,13 @@
private ArrayTupleBuilder tb;
private DataOutput dos;
- private BTree btree;
+ private ITreeIndex index;
private boolean isForward;
private RangePredicate rangePred;
private MultiComparator lowKeySearchCmp;
private MultiComparator highKeySearchCmp;
- private ITreeIndexCursor cursor;
- private ITreeIndexFrame cursorFrame;
- protected ITreeIndexAccessor indexAccessor;
+ private IIndexCursor cursor;
+ protected IIndexAccessor indexAccessor;
private RecordDescriptor recDesc;
private final RecordDescriptor inputRecDesc;
@@ -93,7 +89,7 @@
}
protected void setCursor() {
- cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, false);
+ cursor = indexAccessor.createSearchCursor();
}
@Override
@@ -101,19 +97,18 @@
accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
try {
treeIndexOpHelper.open();
- btree = (BTree) treeIndexOpHelper.getIndexInstance();
- cursorFrame = btree.getLeafFrameFactory().createFrame();
+ index = (ITreeIndex) treeIndexOpHelper.getIndexInstance();
setCursor();
writer.open();
// construct range predicate
// TODO: Can we construct the multicmps using helper methods?
- int lowKeySearchFields = btree.getComparatorFactories().length;
- int highKeySearchFields = btree.getComparatorFactories().length;
+ int lowKeySearchFields = index.getComparatorFactories().length;
+ int highKeySearchFields = index.getComparatorFactories().length;
IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
for (int i = 0; i < lowKeySearchFields; i++) {
- lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ lowKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
}
lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
@@ -122,7 +117,7 @@
} else {
IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
for (int i = 0; i < highKeySearchFields; i++) {
- highKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ highKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
}
highKeySearchCmp = new MultiComparator(highKeySearchComparators);
@@ -131,12 +126,12 @@
rangePred = new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp);
writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
- tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
+ tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + index.getFieldCount());
dos = tb.getDataOutput();
appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
/** set the search cursor */
rangePred.setLowKey(null, true);
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index c978614..ee821df 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -28,13 +28,10 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -55,13 +52,12 @@
private ByteBuffer writeBuffer;
private FrameTupleAppender appender;
- private BTree btree;
+ private ITreeIndex index;
private boolean isForward;
private RangePredicate rangePred;
private MultiComparator lowKeySearchCmp;
- private ITreeIndexCursor cursor;
- private ITreeIndexFrame cursorFrame;
- protected ITreeIndexAccessor indexAccessor;
+ private IIndexCursor cursor;
+ protected IIndexAccessor indexAccessor;
private RecordDescriptor recDesc;
private PermutingFrameTupleReference lowKey;
@@ -102,7 +98,7 @@
}
protected void setCursor() {
- cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, true);
+ cursor = indexAccessor.createSearchCursor();
}
@Override
@@ -112,15 +108,13 @@
try {
treeIndexOpHelper.open();
- btree = (BTree) treeIndexOpHelper.getIndexInstance();
- cursorFrame = btree.getLeafFrameFactory().createFrame();
- setCursor();
+ index = (ITreeIndex) treeIndexOpHelper.getIndexInstance();
rangePred = new RangePredicate(null, null, true, true, null, null);
- int lowKeySearchFields = btree.getComparatorFactories().length;
+ int lowKeySearchFields = index.getComparatorFactories().length;
IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
for (int i = 0; i < lowKeySearchFields; i++) {
- lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ lowKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
}
lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
@@ -128,7 +122,8 @@
appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ setCursor();
/** set the search cursor */
rangePred.setLowKey(null, true);
@@ -142,8 +137,8 @@
currentTopTuple = cursor.getTuple();
match = false;
}
- cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
- updateBuffer.setFieldCount(btree.getFieldCount());
+ cloneUpdateTb = new ArrayTupleBuilder(index.getFieldCount());
+ updateBuffer.setFieldCount(index.getFieldCount());
} catch (Exception e) {
treeIndexOpHelper.close();
throw new HyracksDataException(e);
@@ -210,7 +205,7 @@
cursor.close();
//batch update
- updateBuffer.updateBTree(indexAccessor);
+ updateBuffer.updateIndex(indexAccessor);
} catch (Exception e) {
throw new HyracksDataException(e);
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
index eef7c85..d6f95f9 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
@@ -29,13 +29,10 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -51,12 +48,11 @@
private ArrayTupleBuilder tb;
private DataOutput dos;
- private BTree btree;
+ private ITreeIndex index;
private RangePredicate rangePred;
private MultiComparator lowKeySearchCmp;
- private ITreeIndexCursor cursor;
- private ITreeIndexFrame cursorFrame;
- protected ITreeIndexAccessor indexAccessor;
+ private IIndexCursor cursor;
+ protected IIndexAccessor indexAccessor;
private RecordDescriptor recDesc;
private final RecordDescriptor inputRecDesc;
@@ -86,7 +82,7 @@
}
protected void setCursor() {
- cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, false);
+ cursor = indexAccessor.createSearchCursor();
}
@Override
@@ -95,26 +91,25 @@
try {
treeIndexOpHelper.open();
- btree = (BTree) treeIndexOpHelper.getIndexInstance();
- cursorFrame = btree.getLeafFrameFactory().createFrame();
- setCursor();
+ index = (ITreeIndex) treeIndexOpHelper.getIndexInstance();
writer.open();
rangePred = new RangePredicate(null, null, true, true, null, null);
- int lowKeySearchFields = btree.getComparatorFactories().length;
+ int lowKeySearchFields = index.getComparatorFactories().length;
IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
for (int i = 0; i < lowKeySearchFields; i++) {
- lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ lowKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
}
lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
- tb = new ArrayTupleBuilder(btree.getFieldCount());
+ tb = new ArrayTupleBuilder(index.getFieldCount());
dos = tb.getDataOutput();
appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ setCursor();
/** set the search cursor */
rangePred.setLowKey(null, true);
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorDescriptor.java
similarity index 93%
rename from pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java
rename to pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorDescriptor.java
index 784288f..f651d68 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorDescriptor.java
@@ -33,7 +33,7 @@
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
-public class BTreeSearchFunctionUpdateOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
+public class TreeSearchFunctionUpdateOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
private static final long serialVersionUID = 1L;
@@ -52,7 +52,7 @@
private final int outputArity;
- public BTreeSearchFunctionUpdateOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
+ public TreeSearchFunctionUpdateOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
@@ -84,7 +84,7 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new BTreeSearchFunctionUpdateOperatorNodePushable(this, ctx, partition, recordDescProvider, isForward,
+ return new TreeSearchFunctionUpdateOperatorNodePushable(this, ctx, partition, recordDescProvider, isForward,
lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, functionFactory, preHookFactory,
postHookFactory, inputRdFactory, outputArity);
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
similarity index 76%
rename from pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
rename to pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
index 0216b52..13aab31 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
@@ -19,6 +19,7 @@
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -27,13 +28,10 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
-import edu.uci.ics.hyracks.storage.am.btree.util.BTreeUtils;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
@@ -48,7 +46,7 @@
import edu.uci.ics.pregelix.dataflow.util.SearchKeyTupleReference;
import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
-public class BTreeSearchFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
+public class TreeSearchFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
protected TreeIndexDataflowHelper treeIndexHelper;
protected FrameTupleAccessor accessor;
@@ -57,7 +55,7 @@
protected ArrayTupleBuilder tb;
protected DataOutput dos;
- protected BTree btree;
+ protected ITreeIndex index;
protected boolean isForward;
protected PermutingFrameTupleReference lowKey;
protected PermutingFrameTupleReference highKey;
@@ -66,9 +64,11 @@
protected RangePredicate rangePred;
protected MultiComparator lowKeySearchCmp;
protected MultiComparator highKeySearchCmp;
- protected ITreeIndexCursor cursor;
+ protected IIndexCursor cursor;
protected ITreeIndexFrame cursorFrame;
- protected ITreeIndexAccessor indexAccessor;
+ protected IIndexAccessor indexAccessor;
+ protected int[] lowKeyFields;
+ protected int[] highKeyFields;
protected RecordDescriptor recDesc;
@@ -78,7 +78,7 @@
private final UpdateBuffer updateBuffer;
private final SearchKeyTupleReference tempTupleReference = new SearchKeyTupleReference();
- public BTreeSearchFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
+ public TreeSearchFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
IUpdateFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
@@ -88,6 +88,8 @@
this.isForward = isForward;
this.lowKeyInclusive = lowKeyInclusive;
this.highKeyInclusive = highKeyInclusive;
+ this.lowKeyFields = lowKeyFields;
+ this.highKeyFields = highKeyFields;
this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
if (lowKeyFields != null && lowKeyFields.length > 0) {
lowKey = new PermutingFrameTupleReference();
@@ -114,25 +116,46 @@
try {
treeIndexHelper.open();
- btree = (BTree) treeIndexHelper.getIndexInstance();
- cursorFrame = btree.getLeafFrameFactory().createFrame();
- setCursor();
+ index = (ITreeIndex) treeIndexHelper.getIndexInstance();
+ cursorFrame = index.getLeafFrameFactory().createFrame();
// Construct range predicate.
- lowKeySearchCmp = BTreeUtils.getSearchMultiComparator(btree.getComparatorFactories(), lowKey);
- highKeySearchCmp = BTreeUtils.getSearchMultiComparator(btree.getComparatorFactories(), highKey);
+ int lowKeySearchFields = index.getComparatorFactories().length;
+ int highKeySearchFields = index.getComparatorFactories().length;
+ if (lowKey != null)
+ lowKeySearchFields = lowKey.getFieldCount();
+ if (highKey != null)
+ highKeySearchFields = highKey.getFieldCount();
+
+ IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
+ for (int i = 0; i < lowKeySearchFields; i++) {
+ lowKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
+ }
+ lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
+
+ if (lowKeySearchFields == highKeySearchFields) {
+ highKeySearchCmp = lowKeySearchCmp;
+ } else {
+ IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
+ for (int i = 0; i < highKeySearchFields; i++) {
+ highKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
+ }
+ highKeySearchCmp = new MultiComparator(highKeySearchComparators);
+ }
+
rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
highKeySearchCmp);
writeBuffer = treeIndexHelper.getTaskContext().allocateFrame();
- tb = new ArrayTupleBuilder(btree.getFieldCount());
+ tb = new ArrayTupleBuilder(index.getFieldCount());
dos = tb.getDataOutput();
appender = new FrameTupleAppender(treeIndexHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ setCursor();
- cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
- updateBuffer.setFieldCount(btree.getFieldCount());
+ cloneUpdateTb = new ArrayTupleBuilder(index.getFieldCount());
+ updateBuffer.setFieldCount(index.getFieldCount());
} catch (Exception e) {
treeIndexHelper.close();
throw new HyracksDataException(e);
@@ -140,7 +163,7 @@
}
protected void setCursor() {
- cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, false);
+ cursor = indexAccessor.createSearchCursor();
}
protected void writeSearchResults() throws Exception {
@@ -184,7 +207,7 @@
try {
cursor.close();
//batch update
- updateBuffer.updateBTree(indexAccessor);
+ updateBuffer.updateIndex(indexAccessor);
} catch (Exception e) {
throw new HyracksDataException(e);
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
index b141eaf..8709301 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
@@ -19,15 +19,15 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
public class CopyUpdateUtil {
public static void copyUpdate(SearchKeyTupleReference tempTupleReference, ITupleReference frameTuple,
- UpdateBuffer updateBuffer, ArrayTupleBuilder cloneUpdateTb, ITreeIndexAccessor indexAccessor,
- ITreeIndexCursor cursor, RangePredicate rangePred) throws HyracksDataException, IndexException {
+ UpdateBuffer updateBuffer, ArrayTupleBuilder cloneUpdateTb, IIndexAccessor indexAccessor,
+ IIndexCursor cursor, RangePredicate rangePred) throws HyracksDataException, IndexException {
if (cloneUpdateTb.getSize() > 0) {
int[] fieldEndOffsets = cloneUpdateTb.getFieldEndOffsets();
int srcStart = fieldEndOffsets[0];
@@ -46,7 +46,7 @@
//release the cursor/latch
cursor.close();
//batch update
- updateBuffer.updateBTree(indexAccessor);
+ updateBuffer.updateIndex(indexAccessor);
//try append the to-be-updated tuple again
if (!updateBuffer.appendTuple(cloneUpdateTb)) {
throw new HyracksDataException("cannot append tuple builder!");
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
index 327178c..1ff3959 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
@@ -25,7 +25,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
/**
@@ -87,7 +87,7 @@
}
}
- public void updateBTree(ITreeIndexAccessor bta) throws HyracksDataException, IndexException {
+ public void updateIndex(IIndexAccessor bta) throws HyracksDataException, IndexException {
// batch update
for (int i = 0; i <= currentInUse; i++) {
ByteBuffer buffer = buffers.get(i);
diff --git a/pregelix-dataflow/pom.xml b/pregelix-dataflow/pom.xml
index 962c9f6..2828451 100644
--- a/pregelix-dataflow/pom.xml
+++ b/pregelix-dataflow/pom.xml
@@ -1,18 +1,14 @@
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!-- ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License"); ! you may
+ not use this file except in compliance with the License. ! you may obtain
+ a copy of the License from ! ! http://www.apache.org/licenses/LICENSE-2.0
+ ! ! Unless required by applicable law or agreed to in writing, software !
+ distributed under the License is distributed on an "AS IS" BASIS, ! WITHOUT
+ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ! See the
+ License for the specific language governing permissions and ! limitations
+ under the License. ! -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>pregelix-dataflow</artifactId>
<packaging>jar</packaging>
@@ -134,6 +130,13 @@
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-lsm-common</artifactId>
+ <version>0.2.7-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-control-cc</artifactId>
<version>0.2.7-SNAPSHOT</version>
<type>jar</type>
diff --git a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java
index dc9698b..0f41568 100644
--- a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java
+++ b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java
@@ -25,4 +25,6 @@
public Configuration createConfiguration(IHyracksTaskContext ctx) throws HyracksDataException;
+ public Configuration createConfiguration() throws HyracksDataException;
+
}
diff --git a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index b86691c..2008cf0 100644
--- a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -31,6 +31,7 @@
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.BufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
@@ -52,6 +53,7 @@
private final ILocalResourceRepository localResourceRepository;
private final ResourceIdFactory resourceIdFactory;
private final IBufferCache bufferCache;
+ private final IVirtualBufferCache vBufferCache = null;
private final IFileMapManager fileMapManager;
private final Map<StateKey, IStateObject> appStateMap = new ConcurrentHashMap<StateKey, IStateObject>();
private final Map<String, Long> giraphJobIdToSuperStep = new ConcurrentHashMap<String, Long>();
@@ -76,6 +78,8 @@
bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs,
new PreDelayPageCleanerPolicy(Long.MAX_VALUE), fileMapManager, pageSize, numPages, 1000000,
threadFactory);
+ //vBufferCache = new MultitenantVirtualBufferCache(new VirtualBufferCache(new HeapBufferAllocator(), pageSize,
+ // numPages / 2));
ioManager = (IOManager) appCtx.getRootContext().getIOManager();
lcManager = new IndexLifecycleManager();
localResourceRepository = new TransientLocalResourceRepository();
@@ -110,6 +114,10 @@
return bufferCache;
}
+ public IVirtualBufferCache getVirtualBufferCache() {
+ return vBufferCache;
+ }
+
public IFileMapProvider getFileMapManager() {
return fileMapManager;
}
diff --git a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index 69c5612..a3a00f8 100644
--- a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner;
import edu.uci.ics.pregelix.example.ConnectedComponentsVertex;
import edu.uci.ics.pregelix.example.ConnectedComponentsVertex.SimpleConnectedComponentsVertexOutputFormat;
import edu.uci.ics.pregelix.example.GraphMutationVertex;
@@ -80,6 +81,7 @@
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setVertexPartitionerClass(DefaultVertexPartitioner.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
@@ -132,6 +134,7 @@
job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setVertexPartitionerClass(DefaultVertexPartitioner.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
@@ -234,7 +237,7 @@
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
-
+
private static void generateMaximalCliqueJob2(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
job.setVertexClass(MaximalCliqueVertex.class);
@@ -247,7 +250,7 @@
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
-
+
private static void generateMaximalCliqueJob3(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
job.setVertexClass(MaximalCliqueVertex.class);
@@ -256,6 +259,7 @@
job.setVertexInputFormatClass(TextMaximalCliqueInputFormat.class);
job.setVertexOutputFormatClass(MaximalCliqueVertexOutputFormat.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setVertexPartitionerClass(DefaultVertexPartitioner.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH5);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
diff --git a/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml b/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
index 46444b3..f510b7f 100644
--- a/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
+++ b/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
@@ -137,6 +137,7 @@
<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex$SimpleConnectedComponentsVertexOutputFormat</value></property>
<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>pregelix.partitionerClass</name><value>edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner</value></property>
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
diff --git a/pregelix-example/src/test/resources/jobs/MaximalClique3.xml b/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
index ee13335..43c6dd1 100644
--- a/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
+++ b/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
@@ -135,6 +135,7 @@
<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueVertex$MaximalCliqueVertexOutputFormat</value></property>
<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>pregelix.partitionerClass</name><value>edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner</value></property>
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.maximalclique.TextMaximalCliqueInputFormat</value></property>
diff --git a/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml b/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
index 314ca55..3f74cfb 100644
--- a/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
+++ b/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
@@ -137,6 +137,7 @@
<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexOutputFormat</value></property>
<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>pregelix.partitionerClass</name><value>edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner</value></property>
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
diff --git a/pregelix-runtime/pom.xml b/pregelix-runtime/pom.xml
index ae9f47e..54e2256 100644
--- a/pregelix-runtime/pom.xml
+++ b/pregelix-runtime/pom.xml
@@ -148,6 +148,13 @@
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-lsm-common</artifactId>
+ <version>0.2.7-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-control-cc</artifactId>
<version>0.2.7-SNAPSHOT</version>
<type>jar</type>
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/VirtualBufferCacheProvider.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/VirtualBufferCacheProvider.java
new file mode 100644
index 0000000..ec51047
--- /dev/null
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/VirtualBufferCacheProvider.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.bootstrap;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
+import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
+
+/**
+ * The virtual buffer cache provider
+ *
+ * @author yingyib
+ */
+public class VirtualBufferCacheProvider implements IVirtualBufferCacheProvider {
+
+ private static final long serialVersionUID = 1L;
+
+ public VirtualBufferCacheProvider(){
+
+ }
+
+ @Override
+ public synchronized IVirtualBufferCache getVirtualBufferCache(IHyracksTaskContext ctx) {
+ return RuntimeContext.get(ctx).getVirtualBufferCache();
+ }
+}
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/NoOpUpdateFunctionFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/NoOpUpdateFunctionFactory.java
new file mode 100644
index 0000000..88577c2
--- /dev/null
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/NoOpUpdateFunctionFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.function;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+
+/**
+ * No operation update function factory
+ *
+ * @author yingyib
+ */
+public class NoOpUpdateFunctionFactory implements IUpdateFunctionFactory {
+ private static final long serialVersionUID = 1L;
+ public static NoOpUpdateFunctionFactory INSTANCE = new NoOpUpdateFunctionFactory();
+
+ private NoOpUpdateFunctionFactory() {
+
+ }
+
+ @Override
+ public IUpdateFunction createFunction() {
+ return new IUpdateFunction() {
+
+ @Override
+ public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writer)
+ throws HyracksDataException {
+
+ }
+
+ @Override
+ public void process(Object[] tuple) throws HyracksDataException {
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+
+ }
+
+ };
+ }
+
+}
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexPartitionComputerFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexPartitionComputerFactory.java
new file mode 100644
index 0000000..acccabb
--- /dev/null
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexPartitionComputerFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import java.io.DataInputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+
+/**
+ * The vertex-based partition computer factory.
+ * It is used to support customized graph partitioning function.
+ *
+ * @author yingyib
+ */
+public class VertexPartitionComputerFactory implements ITuplePartitionComputerFactory {
+
+ private static final long serialVersionUID = 1L;
+ private final IConfigurationFactory confFactory;
+
+ public VertexPartitionComputerFactory(IConfigurationFactory confFactory) {
+ this.confFactory = confFactory;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public ITuplePartitionComputer createPartitioner() {
+ try {
+ final Configuration conf = confFactory.createConfiguration();
+ return new ITuplePartitionComputer() {
+ private final ByteBufferInputStream bbis = new ByteBufferInputStream();
+ private final DataInputStream dis = new DataInputStream(bbis);
+ private final VertexPartitioner partitioner = BspUtils.createVertexPartitioner(conf);
+ private final WritableComparable vertexId = BspUtils.createVertexIndex(conf);
+
+ @SuppressWarnings("unchecked")
+ public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+ try {
+ int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, 0);
+ bbis.setByteBuffer(accessor.getBuffer(), keyStart);
+ vertexId.readFields(dis);
+ return Math.abs(partitioner.getPartitionId(vertexId, nParts) % nParts);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}