Merged fullstack_staging branch into trunk
git-svn-id: https://hyracks.googlecode.com/svn/trunk@2372 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/pregelix/pregelix-dataflow-std/pom.xml b/fullstack/pregelix/pregelix-dataflow-std/pom.xml
new file mode 100644
index 0000000..8e8824b
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/pom.xml
@@ -0,0 +1,148 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>pregelix-dataflow-std</artifactId>
+ <packaging>jar</packaging>
+ <name>pregelix-dataflow-std</name>
+
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>pregelix</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ </parent>
+
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.7.2</version>
+ <configuration>
+ <forkMode>pertest</forkMode>
+ <argLine>-enableassertions -Xmx512m -Dfile.encoding=UTF-8
+ -Djava.util.logging.config.file=src/test/resources/logging.properties</argLine>
+ <includes>
+ <include>**/*TestSuite.java</include>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <configuration>
+ <filesets>
+ <fileset>
+ <directory>.</directory>
+ <includes>
+ <include>teststore*</include>
+ <include>edu*</include>
+ <include>actual*</include>
+ <include>build*</include>
+ <include>expect*</include>
+ <include>ClusterController*</include>
+ </includes>
+ </fileset>
+ </filesets>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>pregelix-dataflow-std-base</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-std</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-common</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-data-std</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>0.20.2</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-common</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-btree</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-cc</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-nc</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-ipc</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java
new file mode 100644
index 0000000..99e55f1
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.dataflow.std;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+
+public class BTreeSearchFunctionUpdateOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ protected boolean isForward;
+ protected int[] lowKeyFields; // fields in input tuple to be used as low
+ // keys
+ protected int[] highKeyFields; // fields in input tuple to be used as high
+ // keys
+ protected boolean lowKeyInclusive;
+ protected boolean highKeyInclusive;
+
+ private final IUpdateFunctionFactory functionFactory;
+ private final IRuntimeHookFactory preHookFactory;
+ private final IRuntimeHookFactory postHookFactory;
+ private final IRecordDescriptorFactory inputRdFactory;
+
+ private final int outputArity;
+
+ public BTreeSearchFunctionUpdateOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
+ IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> indexRegistryProvider,
+ IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
+ IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
+ boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory dataflowHelperFactory,
+ IRecordDescriptorFactory inputRdFactory, int outputArity, IUpdateFunctionFactory functionFactory,
+ IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
+ super(spec, 1, outputArity, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, dataflowHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ this.isForward = isForward;
+ this.lowKeyFields = lowKeyFields;
+ this.highKeyFields = highKeyFields;
+ this.lowKeyInclusive = lowKeyInclusive;
+ this.highKeyInclusive = highKeyInclusive;
+
+ this.functionFactory = functionFactory;
+ this.preHookFactory = preHookFactory;
+ this.postHookFactory = postHookFactory;
+ this.inputRdFactory = inputRdFactory;
+
+ for (int i = 0; i < rDescs.length; i++) {
+ this.recordDescriptors[i] = rDescs[i];
+ }
+
+ this.outputArity = outputArity;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new BTreeSearchFunctionUpdateOperatorNodePushable(this, ctx, partition, recordDescProvider, isForward,
+ lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, functionFactory, preHookFactory,
+ postHookFactory, inputRdFactory, outputArity);
+ }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
new file mode 100644
index 0000000..fb84aa0
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
@@ -0,0 +1,195 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.btree.util.BTreeUtils;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
+
+public class BTreeSearchFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
+ protected TreeIndexDataflowHelper treeIndexHelper;
+ protected FrameTupleAccessor accessor;
+
+ protected ByteBuffer writeBuffer;
+ protected FrameTupleAppender appender;
+ protected ArrayTupleBuilder tb;
+ protected DataOutput dos;
+
+ protected BTree btree;
+ protected boolean isForward;
+ protected PermutingFrameTupleReference lowKey;
+ protected PermutingFrameTupleReference highKey;
+ protected boolean lowKeyInclusive;
+ protected boolean highKeyInclusive;
+ protected RangePredicate rangePred;
+ protected MultiComparator lowKeySearchCmp;
+ protected MultiComparator highKeySearchCmp;
+ protected ITreeIndexCursor cursor;
+ protected ITreeIndexFrame cursorFrame;
+ protected ITreeIndexAccessor indexAccessor;
+
+ protected RecordDescriptor recDesc;
+
+ private final IFrameWriter[] writers;
+ private final FunctionProxy functionProxy;
+
+ public BTreeSearchFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
+ IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
+ int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
+ IUpdateFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
+ IRuntimeHookFactory postHookFactory, IRecordDescriptorFactory inputRdFactory, int outputArity) {
+ treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
+ opDesc, ctx, partition);
+ this.isForward = isForward;
+ this.lowKeyInclusive = lowKeyInclusive;
+ this.highKeyInclusive = highKeyInclusive;
+ this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+ if (lowKeyFields != null && lowKeyFields.length > 0) {
+ lowKey = new PermutingFrameTupleReference();
+ lowKey.setFieldPermutation(lowKeyFields);
+ }
+ if (highKeyFields != null && highKeyFields.length > 0) {
+ highKey = new PermutingFrameTupleReference();
+ highKey.setFieldPermutation(highKeyFields);
+ }
+
+ this.writers = new IFrameWriter[outputArity];
+ this.functionProxy = new FunctionProxy(ctx, functionFactory, preHookFactory, postHookFactory, inputRdFactory,
+ writers);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ /**
+ * open the function
+ */
+ functionProxy.functionOpen();
+ accessor = new FrameTupleAccessor(treeIndexHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+
+ try {
+ treeIndexHelper.init(false);
+ btree = (BTree) treeIndexHelper.getIndex();
+ cursorFrame = btree.getLeafFrameFactory().createFrame();
+ setCursor();
+
+ // Construct range predicate.
+ lowKeySearchCmp = BTreeUtils.getSearchMultiComparator(btree.getComparatorFactories(), lowKey);
+ highKeySearchCmp = BTreeUtils.getSearchMultiComparator(btree.getComparatorFactories(), highKey);
+ rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
+ highKeySearchCmp);
+
+ writeBuffer = treeIndexHelper.getHyracksTaskContext().allocateFrame();
+ tb = new ArrayTupleBuilder(btree.getFieldCount());
+ dos = tb.getDataOutput();
+ appender = new FrameTupleAppender(treeIndexHelper.getHyracksTaskContext().getFrameSize());
+ appender.reset(writeBuffer, true);
+ indexAccessor = btree.createAccessor();
+ } catch (Exception e) {
+ treeIndexHelper.deinit();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ protected void setCursor() {
+ cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, false);
+ }
+
+ protected void writeSearchResults() throws Exception {
+ while (cursor.hasNext()) {
+ cursor.next();
+ ITupleReference tuple = cursor.getTuple();
+ functionProxy.functionCall(tuple);
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ try {
+ for (int i = 0; i < tupleCount; i++) {
+ if (lowKey != null) {
+ lowKey.reset(accessor, i);
+ }
+ if (highKey != null) {
+ highKey.reset(accessor, i);
+ }
+ rangePred.setLowKey(lowKey, lowKeyInclusive);
+ rangePred.setHighKey(highKey, highKeyInclusive);
+ cursor.reset();
+ indexAccessor.search(cursor, rangePred);
+ writeSearchResults();
+ }
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ try {
+ cursor.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+
+ /**
+ * close the update function
+ */
+ functionProxy.functionClose();
+ } finally {
+ treeIndexHelper.deinit();
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ for (IFrameWriter writer : writers)
+ writer.fail();
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ writers[index] = writer;
+ }
+
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FileWriteOperatorDescriptor.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FileWriteOperatorDescriptor.java
new file mode 100644
index 0000000..356f06c
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FileWriteOperatorDescriptor.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.util.StringSerializationUtils;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+
+public class FileWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ private final FileSplit[] splits;
+ private final IRuntimeHookFactory preHookFactory;
+ private final IRuntimeHookFactory postHookFactory;
+ private final IRecordDescriptorFactory inputRdFactory;
+
+ public FileWriteOperatorDescriptor(JobSpecification spec, IRecordDescriptorFactory inputRdFactory,
+ IFileSplitProvider fileSplitProvider, IRuntimeHookFactory preHookFactory,
+ IRuntimeHookFactory postHookFactory) {
+ super(spec, 1, 0);
+ this.splits = fileSplitProvider.getFileSplits();
+ this.preHookFactory = preHookFactory;
+ this.postHookFactory = postHookFactory;
+ this.inputRdFactory = inputRdFactory;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+ throws HyracksDataException {
+ IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+ private RecordDescriptor rd0;
+ private FrameDeserializer frameDeserializer;
+ private PrintWriter outputWriter;
+ private final static String separator = "|";
+
+ @Override
+ public void open() throws HyracksDataException {
+ rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
+ : inputRdFactory.createRecordDescriptor();
+ frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+ try {
+ outputWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(splits[partition]
+ .getLocalFile().getFile())));
+ if (preHookFactory != null)
+ preHookFactory.createRuntimeHook().configure(ctx);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+ frameDeserializer.reset(frame);
+ while (!frameDeserializer.done()) {
+ Object[] tuple = frameDeserializer.deserializeRecord();
+ for (int i = 0; i < tuple.length - 1; i++) {
+ outputWriter.print(StringSerializationUtils.toString(tuple[i]));
+ outputWriter.print(separator);
+ }
+ outputWriter.print(StringSerializationUtils.toString(tuple[tuple.length - 1]));
+ outputWriter.println();
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (postHookFactory != null)
+ postHookFactory.createRuntimeHook().configure(ctx);
+ outputWriter.close();
+ }
+
+ };
+ return op;
+ }
+}
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
new file mode 100644
index 0000000..ee3ac82
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+import edu.uci.ics.pregelix.dataflow.std.base.IFunction;
+import edu.uci.ics.pregelix.dataflow.std.base.IFunctionFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+
+public class FunctionCallOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ private final IFunctionFactory functionFactory;
+ private final IRuntimeHookFactory preHookFactory;
+ private final IRuntimeHookFactory postHookFactory;
+ private final IRecordDescriptorFactory inputRdFactory;
+
+ public FunctionCallOperatorDescriptor(JobSpecification spec, IRecordDescriptorFactory inputRdFactory,
+ int outputArity, IFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
+ IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
+ super(spec, 1, outputArity);
+ this.functionFactory = functionFactory;
+ this.preHookFactory = preHookFactory;
+ this.postHookFactory = postHookFactory;
+ this.inputRdFactory = inputRdFactory;
+
+ for (int i = 0; i < rDescs.length; i++) {
+ this.recordDescriptors[i] = rDescs[i];
+ }
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+ throws HyracksDataException {
+ return new AbstractUnaryInputOperatorNodePushable() {
+ private RecordDescriptor rd0;
+ private FrameDeserializer frameDeserializer;
+ private final IFrameWriter[] writers = new IFrameWriter[outputArity];
+ private final IFunction function = functionFactory.createFunction();
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (postHookFactory != null)
+ postHookFactory.createRuntimeHook().configure(ctx);
+ function.close();
+ for (IFrameWriter writer : writers) {
+ writer.close();
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ for (IFrameWriter writer : writers) {
+ writer.fail();
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+ frameDeserializer.reset(frame);
+ while (!frameDeserializer.done()) {
+ Object[] tuple = frameDeserializer.deserializeRecord();
+ function.process(tuple);
+ }
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
+ : inputRdFactory.createRecordDescriptor();
+ frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ for (IFrameWriter writer : writers) {
+ writer.open();
+ }
+ if (preHookFactory != null)
+ preHookFactory.createRuntimeHook().configure(ctx);
+ function.open(ctx, rd0, writers);
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ writers[index] = writer;
+ }
+ };
+ }
+}
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java
new file mode 100644
index 0000000..60559e8
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.dataflow.std;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+
+public class IndexNestedLoopJoinFunctionUpdateOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ private boolean isForward;
+ private int[] lowKeyFields; // fields in input tuple to be used as low keys
+ private int[] highKeyFields; // fields in input tuple to be used as high
+ // keys
+ private boolean lowKeyInclusive;
+ private boolean highKeyInclusive;
+
+ // right outer join
+ private boolean isRightOuter = false;
+ private INullWriterFactory[] nullWriterFactories;
+
+ // set union
+ private boolean isSetUnion = false;
+
+ private final IUpdateFunctionFactory functionFactory;
+ private final IRuntimeHookFactory preHookFactory;
+ private final IRuntimeHookFactory postHookFactory;
+ private final IRecordDescriptorFactory inputRdFactory;
+
+ private final int outputArity;
+
+ public IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(JobSpecification spec,
+ IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
+ IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
+ boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
+ IRecordDescriptorFactory inputRdFactory, int outputArity, IUpdateFunctionFactory functionFactory,
+ IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
+ super(spec, 1, outputArity, rDescs[0], storageManager, treeIndexRegistryProvider, fileSplitProvider,
+ typeTraits, comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ this.isForward = isForward;
+ this.lowKeyFields = lowKeyFields;
+ this.highKeyFields = highKeyFields;
+ this.lowKeyInclusive = lowKeyInclusive;
+ this.highKeyInclusive = highKeyInclusive;
+
+ this.functionFactory = functionFactory;
+ this.preHookFactory = preHookFactory;
+ this.postHookFactory = postHookFactory;
+ this.inputRdFactory = inputRdFactory;
+
+ for (int i = 0; i < rDescs.length; i++) {
+ this.recordDescriptors[i] = rDescs[i];
+ }
+
+ this.outputArity = outputArity;
+ }
+
+ public IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(JobSpecification spec,
+ IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
+ ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
+ IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
+ boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
+ boolean isRightOuter, INullWriterFactory[] nullWriterFactories, IRecordDescriptorFactory inputRdFactory,
+ int outputArity, IUpdateFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
+ IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
+ super(spec, 1, outputArity, rDescs[0], storageManager, treeIndexRegistryProvider, fileSplitProvider,
+ typeTraits, comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ this.isForward = isForward;
+ this.lowKeyFields = lowKeyFields;
+ this.highKeyFields = highKeyFields;
+ this.lowKeyInclusive = lowKeyInclusive;
+ this.highKeyInclusive = highKeyInclusive;
+
+ this.isRightOuter = isRightOuter;
+ this.nullWriterFactories = nullWriterFactories;
+
+ this.functionFactory = functionFactory;
+ this.preHookFactory = preHookFactory;
+ this.postHookFactory = postHookFactory;
+ this.inputRdFactory = inputRdFactory;
+
+ for (int i = 0; i < rDescs.length; i++) {
+ this.recordDescriptors[i] = rDescs[i];
+ }
+
+ this.outputArity = outputArity;
+ }
+
+ public IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(JobSpecification spec,
+ IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
+ ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
+ IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
+ boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
+ boolean isSetUnion, IRecordDescriptorFactory inputRdFactory, int outputArity,
+ IUpdateFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
+ IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
+ super(spec, 1, outputArity, rDescs[0], storageManager, treeIndexRegistryProvider, fileSplitProvider,
+ typeTraits, comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ this.isForward = isForward;
+ this.lowKeyFields = lowKeyFields;
+ this.highKeyFields = highKeyFields;
+ this.lowKeyInclusive = lowKeyInclusive;
+ this.highKeyInclusive = highKeyInclusive;
+
+ this.isSetUnion = isSetUnion;
+
+ this.functionFactory = functionFactory;
+ this.preHookFactory = preHookFactory;
+ this.postHookFactory = postHookFactory;
+ this.inputRdFactory = inputRdFactory;
+
+ for (int i = 0; i < rDescs.length; i++) {
+ this.recordDescriptors[i] = rDescs[i];
+ }
+
+ this.outputArity = outputArity;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ if (isRightOuter) {
+ INullWriter[] nullWriters = new INullWriter[nullWriterFactories.length];
+ for (int i = 0; i < nullWriters.length; i++)
+ nullWriters[i] = nullWriterFactories[i].createNullWriter();
+ return new IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable(this, ctx, partition,
+ recordDescProvider, isForward, lowKeyFields, highKeyFields, nullWriters, functionFactory,
+ preHookFactory, postHookFactory, inputRdFactory, outputArity);
+ } else if (isSetUnion) {
+ return new IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable(this, ctx, partition,
+ recordDescProvider, isForward, lowKeyFields, highKeyFields, functionFactory, preHookFactory,
+ postHookFactory, inputRdFactory, outputArity);
+ } else {
+ return new IndexNestedLoopJoinFunctionUpdateOperatorNodePushable(this, ctx, partition, recordDescProvider,
+ isForward, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, functionFactory,
+ preHookFactory, postHookFactory, inputRdFactory, outputArity);
+ }
+ }
+}
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
new file mode 100644
index 0000000..75a8087
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
+
+public class IndexNestedLoopJoinFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
+ private TreeIndexDataflowHelper treeIndexOpHelper;
+ private FrameTupleAccessor accessor;
+
+ private ByteBuffer writeBuffer;
+ private FrameTupleAppender appender;
+ private ArrayTupleBuilder tb;
+ private DataOutput dos;
+
+ private BTree btree;
+ private PermutingFrameTupleReference lowKey;
+ private PermutingFrameTupleReference highKey;
+ private boolean lowKeyInclusive;
+ private boolean highKeyInclusive;
+ private RangePredicate rangePred;
+ private MultiComparator lowKeySearchCmp;
+ private MultiComparator highKeySearchCmp;
+ private ITreeIndexCursor cursor;
+ private ITreeIndexFrame cursorFrame;
+ protected ITreeIndexAccessor indexAccessor;
+
+ private RecordDescriptor recDesc;
+ private final RecordDescriptor inputRecDesc;
+
+ private final IFrameWriter[] writers;
+ private final FunctionProxy functionProxy;
+
+ public IndexNestedLoopJoinFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
+ IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
+ int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
+ IUpdateFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
+ IRuntimeHookFactory postHookFactory, IRecordDescriptorFactory inputRdFactory, int outputArity) {
+ inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+ treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
+ opDesc, ctx, partition);
+ this.lowKeyInclusive = lowKeyInclusive;
+ this.highKeyInclusive = highKeyInclusive;
+ this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+ if (lowKeyFields != null && lowKeyFields.length > 0) {
+ lowKey = new PermutingFrameTupleReference();
+ lowKey.setFieldPermutation(lowKeyFields);
+ }
+ if (highKeyFields != null && highKeyFields.length > 0) {
+ highKey = new PermutingFrameTupleReference();
+ highKey.setFieldPermutation(highKeyFields);
+ }
+
+ this.writers = new IFrameWriter[outputArity];
+ this.functionProxy = new FunctionProxy(ctx, functionFactory, preHookFactory, postHookFactory, inputRdFactory,
+ writers);
+ }
+
+ protected void setCursor() {
+ cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, true);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ /**
+ * open the function
+ */
+ functionProxy.functionOpen();
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+
+ try {
+ treeIndexOpHelper.init(false);
+ btree = (BTree) treeIndexOpHelper.getIndex();
+ btree.open(treeIndexOpHelper.getIndexFileId());
+ cursorFrame = btree.getLeafFrameFactory().createFrame();
+ setCursor();
+
+ // TODO: Can we construct the multicmps using helper methods?
+ int lowKeySearchFields = btree.getComparatorFactories().length;
+ int highKeySearchFields = btree.getComparatorFactories().length;
+ if (lowKey != null)
+ lowKeySearchFields = lowKey.getFieldCount();
+ if (highKey != null)
+ highKeySearchFields = highKey.getFieldCount();
+
+ IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
+ for (int i = 0; i < lowKeySearchFields; i++) {
+ lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ }
+ lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
+
+ if (lowKeySearchFields == highKeySearchFields) {
+ highKeySearchCmp = lowKeySearchCmp;
+ } else {
+ IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
+ for (int i = 0; i < highKeySearchFields; i++) {
+ highKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ }
+ highKeySearchCmp = new MultiComparator(highKeySearchComparators);
+
+ }
+
+ rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
+ highKeySearchCmp);
+ writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+ tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
+ dos = tb.getDataOutput();
+ appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+ appender.reset(writeBuffer, true);
+
+ indexAccessor = btree.createAccessor();
+ } catch (Exception e) {
+ treeIndexOpHelper.deinit();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void writeSearchResults(IFrameTupleAccessor leftAccessor, int tIndex) throws Exception {
+ while (cursor.hasNext()) {
+ tb.reset();
+ cursor.next();
+
+ ITupleReference tupleRef = cursor.getTuple();
+ for (int i = 0; i < inputRecDesc.getFields().length; i++) {
+ int tupleStart = leftAccessor.getTupleStartOffset(tIndex);
+ int fieldStart = leftAccessor.getFieldStartOffset(tIndex, i);
+ int offset = leftAccessor.getFieldSlotsLength() + tupleStart + fieldStart;
+ int len = leftAccessor.getFieldEndOffset(tIndex, i) - fieldStart;
+ dos.write(leftAccessor.getBuffer().array(), offset, len);
+ tb.addFieldEndOffset();
+ }
+ for (int i = 0; i < tupleRef.getFieldCount(); i++) {
+ dos.write(tupleRef.getFieldData(i), tupleRef.getFieldStart(i), tupleRef.getFieldLength(i));
+ tb.addFieldEndOffset();
+ }
+
+ /**
+ * call the update function
+ */
+ functionProxy.functionCall(tb, tupleRef);
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+
+ int tupleCount = accessor.getTupleCount();
+ try {
+ for (int i = 0; i < tupleCount; i++) {
+ if (lowKey != null)
+ lowKey.reset(accessor, i);
+ if (highKey != null)
+ highKey.reset(accessor, i);
+ rangePred.setLowKey(lowKey, lowKeyInclusive);
+ rangePred.setHighKey(highKey, highKeyInclusive);
+
+ cursor.reset();
+ indexAccessor.search(cursor, rangePred);
+ writeSearchResults(accessor, i);
+ }
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ try {
+ cursor.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+
+ /**
+ * close the update function
+ */
+ functionProxy.functionClose();
+ } finally {
+ treeIndexOpHelper.deinit();
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ for (IFrameWriter writer : writers)
+ writer.fail();
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ writers[index] = writer;
+ }
+
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java
new file mode 100644
index 0000000..ed177e3
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+
+public class IndexNestedLoopJoinOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ private boolean isForward;
+ private int[] lowKeyFields; // fields in input tuple to be used as low keys
+ private int[] highKeyFields; // fields in input tuple to be used as high
+ // keys
+ private boolean lowKeyInclusive;
+ private boolean highKeyInclusive;
+
+ // right outer join
+ private boolean isRightOuter = false;
+ private INullWriterFactory[] nullWriterFactories;
+
+ // set union
+ private boolean isSetUnion = false;
+
+ public IndexNestedLoopJoinOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
+ IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
+ IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
+ boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory) {
+ super(spec, 1, 1, recDesc, storageManager, treeIndexRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ this.isForward = isForward;
+ this.lowKeyFields = lowKeyFields;
+ this.highKeyFields = highKeyFields;
+ this.lowKeyInclusive = lowKeyInclusive;
+ this.highKeyInclusive = highKeyInclusive;
+ }
+
+ public IndexNestedLoopJoinOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
+ IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
+ ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
+ IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
+ boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
+ boolean isRightOuter, INullWriterFactory[] nullWriterFactories) {
+ super(spec, 1, 1, recDesc, storageManager, treeIndexRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ this.isForward = isForward;
+ this.lowKeyFields = lowKeyFields;
+ this.highKeyFields = highKeyFields;
+ this.lowKeyInclusive = lowKeyInclusive;
+ this.highKeyInclusive = highKeyInclusive;
+
+ this.isRightOuter = isRightOuter;
+ this.nullWriterFactories = nullWriterFactories;
+ }
+
+ public IndexNestedLoopJoinOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
+ IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
+ ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
+ IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
+ boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
+ boolean isSetUnion) {
+ super(spec, 1, 1, recDesc, storageManager, treeIndexRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ this.isForward = isForward;
+ this.lowKeyFields = lowKeyFields;
+ this.highKeyFields = highKeyFields;
+ this.lowKeyInclusive = lowKeyInclusive;
+ this.highKeyInclusive = highKeyInclusive;
+
+ this.isSetUnion = isSetUnion;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ if (isRightOuter) {
+ INullWriter[] nullWriters = new INullWriter[nullWriterFactories.length];
+ for (int i = 0; i < nullWriters.length; i++)
+ nullWriters[i] = nullWriterFactories[i].createNullWriter();
+ return new IndexNestedLoopRightOuterJoinOperatorNodePushable(this, ctx, partition, recordDescProvider,
+ isForward, lowKeyFields, highKeyFields, nullWriters);
+ } else if (isSetUnion) {
+ return new IndexNestedLoopSetUnionOperatorNodePushable(this, ctx, partition, recordDescProvider, isForward,
+ lowKeyFields, highKeyFields);
+ } else {
+ return new IndexNestedLoopJoinOperatorNodePushable(this, ctx, partition, recordDescProvider, isForward,
+ lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive);
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
new file mode 100644
index 0000000..bd076d3
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+
+public class IndexNestedLoopJoinOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+ private TreeIndexDataflowHelper treeIndexOpHelper;
+ private FrameTupleAccessor accessor;
+
+ private ByteBuffer writeBuffer;
+ private FrameTupleAppender appender;
+ private ArrayTupleBuilder tb;
+ private DataOutput dos;
+
+ private BTree btree;
+ private PermutingFrameTupleReference lowKey;
+ private PermutingFrameTupleReference highKey;
+ private boolean lowKeyInclusive;
+ private boolean highKeyInclusive;
+ private RangePredicate rangePred;
+ private MultiComparator lowKeySearchCmp;
+ private MultiComparator highKeySearchCmp;
+ private IIndexCursor cursor;
+ protected ITreeIndexAccessor indexAccessor;
+
+ private RecordDescriptor recDesc;
+ private final RecordDescriptor inputRecDesc;
+
+ public IndexNestedLoopJoinOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+ int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward, int[] lowKeyFields,
+ int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive) {
+ inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+ treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
+ opDesc, ctx, partition);
+ this.lowKeyInclusive = lowKeyInclusive;
+ this.highKeyInclusive = highKeyInclusive;
+ this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+ if (lowKeyFields != null && lowKeyFields.length > 0) {
+ lowKey = new PermutingFrameTupleReference();
+ lowKey.setFieldPermutation(lowKeyFields);
+ }
+ if (highKeyFields != null && highKeyFields.length > 0) {
+ highKey = new PermutingFrameTupleReference();
+ highKey.setFieldPermutation(highKeyFields);
+ }
+ }
+
+ protected void setCursor() {
+ cursor = indexAccessor.createSearchCursor();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+
+ try {
+ treeIndexOpHelper.init(false);
+ btree = (BTree) treeIndexOpHelper.getIndex();
+ writer.open();
+
+ int lowKeySearchFields = btree.getComparatorFactories().length;
+ int highKeySearchFields = btree.getComparatorFactories().length;
+ if (lowKey != null)
+ lowKeySearchFields = lowKey.getFieldCount();
+ if (highKey != null)
+ highKeySearchFields = highKey.getFieldCount();
+
+ IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
+ for (int i = 0; i < lowKeySearchFields; i++) {
+ lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ }
+ lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
+
+ if (lowKeySearchFields == highKeySearchFields) {
+ highKeySearchCmp = lowKeySearchCmp;
+ } else {
+ IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
+ for (int i = 0; i < highKeySearchFields; i++) {
+ highKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ }
+ highKeySearchCmp = new MultiComparator(highKeySearchComparators);
+ }
+
+ rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
+ highKeySearchCmp);
+ writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+ tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
+ dos = tb.getDataOutput();
+ appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+ appender.reset(writeBuffer, true);
+ indexAccessor = btree.createAccessor();
+ setCursor();
+ } catch (Exception e) {
+ treeIndexOpHelper.deinit();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void writeSearchResults(IFrameTupleAccessor leftAccessor, int tIndex) throws Exception {
+ while (cursor.hasNext()) {
+ tb.reset();
+ cursor.next();
+
+ ITupleReference frameTuple = cursor.getTuple();
+ for (int i = 0; i < inputRecDesc.getFields().length; i++) {
+ int tupleStart = leftAccessor.getTupleStartOffset(tIndex);
+ int fieldStart = leftAccessor.getFieldStartOffset(tIndex, i);
+ int offset = leftAccessor.getFieldSlotsLength() + tupleStart + fieldStart;
+ int len = leftAccessor.getFieldEndOffset(tIndex, i) - fieldStart;
+ dos.write(leftAccessor.getBuffer().array(), offset, len);
+ tb.addFieldEndOffset();
+ }
+ for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+ dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+ tb.addFieldEndOffset();
+ }
+
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ appender.reset(writeBuffer, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+
+ int tupleCount = accessor.getTupleCount();
+ try {
+ for (int i = 0; i < tupleCount; i++) {
+ if (lowKey != null)
+ lowKey.reset(accessor, i);
+ if (highKey != null)
+ highKey.reset(accessor, i);
+ rangePred.setLowKey(lowKey, lowKeyInclusive);
+ rangePred.setHighKey(highKey, highKeyInclusive);
+
+ cursor.reset();
+ indexAccessor.search(cursor, rangePred);
+ writeSearchResults(accessor, i);
+ }
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ }
+ writer.close();
+ try {
+ cursor.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ } finally {
+ treeIndexOpHelper.deinit();
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
new file mode 100644
index 0000000..c31ebd4
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -0,0 +1,294 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
+
+public class IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable extends
+ AbstractUnaryInputOperatorNodePushable {
+ private TreeIndexDataflowHelper treeIndexOpHelper;
+ private FrameTupleAccessor accessor;
+
+ private ByteBuffer writeBuffer;
+ private FrameTupleAppender appender;
+ private ArrayTupleBuilder tb;
+ private DataOutput dos;
+
+ private BTree btree;
+ private RangePredicate rangePred;
+ private MultiComparator lowKeySearchCmp;
+ private MultiComparator highKeySearchCmp;
+ private ITreeIndexCursor cursor;
+ private ITreeIndexFrame cursorFrame;
+ protected ITreeIndexAccessor indexAccessor;
+
+ private RecordDescriptor recDesc;
+ private final RecordDescriptor inputRecDesc;
+
+ private PermutingFrameTupleReference lowKey;
+ private PermutingFrameTupleReference highKey;
+
+ private INullWriter[] nullWriter;
+ private ITupleReference currentTopTuple;
+ private boolean match;
+
+ private final IFrameWriter[] writers;
+ private final FunctionProxy functionProxy;
+
+ public IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
+ IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
+ int[] lowKeyFields, int[] highKeyFields, INullWriter[] nullWriter, IUpdateFunctionFactory functionFactory,
+ IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory,
+ IRecordDescriptorFactory inputRdFactory, int outputArity) {
+ inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+ treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
+ opDesc, ctx, partition);
+ this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+
+ if (lowKeyFields != null && lowKeyFields.length > 0) {
+ lowKey = new PermutingFrameTupleReference();
+ lowKey.setFieldPermutation(lowKeyFields);
+ }
+ if (highKeyFields != null && highKeyFields.length > 0) {
+ highKey = new PermutingFrameTupleReference();
+ highKey.setFieldPermutation(highKeyFields);
+ }
+ this.nullWriter = nullWriter;
+
+ this.writers = new IFrameWriter[outputArity];
+ this.functionProxy = new FunctionProxy(ctx, functionFactory, preHookFactory, postHookFactory, inputRdFactory,
+ writers);
+ }
+
+ protected void setCursor() {
+ cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, true);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ /**
+ * function open
+ */
+ functionProxy.functionOpen();
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+
+ try {
+ treeIndexOpHelper.init(false);
+ btree = (BTree) treeIndexOpHelper.getIndex();
+ cursorFrame = btree.getLeafFrameFactory().createFrame();
+ setCursor();
+
+ // construct range predicate
+ // TODO: Can we construct the multicmps using helper methods?
+ int lowKeySearchFields = btree.getComparatorFactories().length;
+ int highKeySearchFields = btree.getComparatorFactories().length;
+
+ IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
+ for (int i = 0; i < lowKeySearchFields; i++) {
+ lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ }
+ lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
+
+ if (lowKeySearchFields == highKeySearchFields) {
+ highKeySearchCmp = lowKeySearchCmp;
+ } else {
+ IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
+ for (int i = 0; i < highKeySearchFields; i++) {
+ highKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ }
+ highKeySearchCmp = new MultiComparator(highKeySearchComparators);
+ }
+
+ rangePred = new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp);
+
+ writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+ tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
+ dos = tb.getDataOutput();
+ appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+ appender.reset(writeBuffer, true);
+
+ indexAccessor = btree.createAccessor();
+
+ /** set the search cursor */
+ rangePred.setLowKey(null, true);
+ rangePred.setHighKey(null, true);
+ cursor.reset();
+ indexAccessor.search(cursor, rangePred);
+
+ /** set up current top tuple */
+ if (cursor.hasNext()) {
+ cursor.next();
+ currentTopTuple = cursor.getTuple();
+ match = false;
+ }
+
+ } catch (Exception e) {
+ treeIndexOpHelper.deinit();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void writeResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference frameTuple)
+ throws Exception {
+ tb.reset();
+ for (int i = 0; i < inputRecDesc.getFields().length; i++) {
+ int tupleStart = leftAccessor.getTupleStartOffset(tIndex);
+ int fieldStart = leftAccessor.getFieldStartOffset(tIndex, i);
+ int offset = leftAccessor.getFieldSlotsLength() + tupleStart + fieldStart;
+ int len = leftAccessor.getFieldEndOffset(tIndex, i) - fieldStart;
+ dos.write(leftAccessor.getBuffer().array(), offset, len);
+ tb.addFieldEndOffset();
+ }
+ for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+ dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+ tb.addFieldEndOffset();
+ }
+
+ /**
+ * function call
+ */
+ functionProxy.functionCall(tb, frameTuple);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ try {
+ for (int i = 0; i < tupleCount && currentTopTuple != null;) {
+ if (lowKey != null)
+ lowKey.reset(accessor, i);
+ if (highKey != null)
+ highKey.reset(accessor, i);
+ // TODO: currently use low key only, check what they mean
+ int cmp = compare(lowKey, currentTopTuple);
+ if (cmp <= 0) {
+ if (cmp == 0)
+ outputMatch(i);
+ i++;
+ } else {
+ moveTreeCursor();
+ }
+ }
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void outputMatch(int i) throws Exception {
+ writeResults(accessor, i, currentTopTuple);
+ match = true;
+ }
+
+ private void moveTreeCursor() throws Exception {
+ if (!match) {
+ writeResults(currentTopTuple);
+ }
+ if (cursor.hasNext()) {
+ cursor.next();
+ currentTopTuple = cursor.getTuple();
+ match = false;
+ } else {
+ currentTopTuple = null;
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ while (currentTopTuple != null) {
+ moveTreeCursor();
+ }
+ try {
+ cursor.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+
+ /**
+ * function close
+ */
+ functionProxy.functionClose();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ } finally {
+ treeIndexOpHelper.deinit();
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ for (IFrameWriter writer : writers)
+ writer.fail();
+ }
+
+ /** compare tuples */
+ private int compare(ITupleReference left, ITupleReference right) throws Exception {
+ return lowKeySearchCmp.compare(left, right);
+ }
+
+ /** write result for outer case */
+ private void writeResults(ITupleReference frameTuple) throws Exception {
+ tb.reset();
+ for (int i = 0; i < inputRecDesc.getFields().length; i++) {
+ nullWriter[i].writeNull(dos);
+ tb.addFieldEndOffset();
+ }
+ for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+ dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+ tb.addFieldEndOffset();
+ }
+
+ /**
+ * function call
+ */
+ functionProxy.functionCall(tb, frameTuple);
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ writers[index] = writer;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
new file mode 100644
index 0000000..9f1e1ad
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
@@ -0,0 +1,280 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+
+public class IndexNestedLoopRightOuterJoinOperatorNodePushable extends
+ AbstractUnaryInputUnaryOutputOperatorNodePushable {
+ private TreeIndexDataflowHelper treeIndexOpHelper;
+ private FrameTupleAccessor accessor;
+
+ private ByteBuffer writeBuffer;
+ private FrameTupleAppender appender;
+ private ArrayTupleBuilder tb;
+ private DataOutput dos;
+
+ private BTree btree;
+ private boolean isForward;
+ private RangePredicate rangePred;
+ private MultiComparator lowKeySearchCmp;
+ private MultiComparator highKeySearchCmp;
+ private ITreeIndexCursor cursor;
+ private ITreeIndexFrame cursorFrame;
+ protected ITreeIndexAccessor indexAccessor;
+
+ private RecordDescriptor recDesc;
+ private final RecordDescriptor inputRecDesc;
+
+ private PermutingFrameTupleReference lowKey;
+ private PermutingFrameTupleReference highKey;
+
+ private INullWriter[] nullWriter;
+ private ITupleReference currentTopTuple;
+ private boolean match;
+
+ public IndexNestedLoopRightOuterJoinOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
+ IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
+ int[] lowKeyFields, int[] highKeyFields, INullWriter[] nullWriter) {
+ inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+ treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
+ opDesc, ctx, partition);
+ this.isForward = isForward;
+ this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+
+ if (lowKeyFields != null && lowKeyFields.length > 0) {
+ lowKey = new PermutingFrameTupleReference();
+ lowKey.setFieldPermutation(lowKeyFields);
+ }
+ if (highKeyFields != null && highKeyFields.length > 0) {
+ highKey = new PermutingFrameTupleReference();
+ highKey.setFieldPermutation(highKeyFields);
+ }
+ this.nullWriter = nullWriter;
+ }
+
+ protected void setCursor() {
+ cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, false);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ try {
+ treeIndexOpHelper.init(false);
+ btree = (BTree) treeIndexOpHelper.getIndex();
+ cursorFrame = btree.getLeafFrameFactory().createFrame();
+ setCursor();
+ writer.open();
+
+ // construct range predicate
+ // TODO: Can we construct the multicmps using helper methods?
+ int lowKeySearchFields = btree.getComparatorFactories().length;
+ int highKeySearchFields = btree.getComparatorFactories().length;
+
+ IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
+ for (int i = 0; i < lowKeySearchFields; i++) {
+ lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ }
+ lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
+
+ if (lowKeySearchFields == highKeySearchFields) {
+ highKeySearchCmp = lowKeySearchCmp;
+ } else {
+ IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
+ for (int i = 0; i < highKeySearchFields; i++) {
+ highKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ }
+ highKeySearchCmp = new MultiComparator(highKeySearchComparators);
+
+ }
+
+ rangePred = new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp);
+
+ writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+ tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
+ dos = tb.getDataOutput();
+ appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+ appender.reset(writeBuffer, true);
+
+ indexAccessor = btree.createAccessor();
+
+ /** set the search cursor */
+ rangePred.setLowKey(null, true);
+ rangePred.setHighKey(null, true);
+ cursor.reset();
+ indexAccessor.search(cursor, rangePred);
+
+ /** set up current top tuple */
+ if (cursor.hasNext()) {
+ cursor.next();
+ currentTopTuple = cursor.getTuple();
+ match = false;
+ }
+
+ } catch (Exception e) {
+ treeIndexOpHelper.deinit();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void writeResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference frameTuple)
+ throws Exception {
+ tb.reset();
+ for (int i = 0; i < inputRecDesc.getFields().length; i++) {
+ int tupleStart = leftAccessor.getTupleStartOffset(tIndex);
+ int fieldStart = leftAccessor.getFieldStartOffset(tIndex, i);
+ int offset = leftAccessor.getFieldSlotsLength() + tupleStart + fieldStart;
+ int len = leftAccessor.getFieldEndOffset(tIndex, i) - fieldStart;
+ dos.write(leftAccessor.getBuffer().array(), offset, len);
+ tb.addFieldEndOffset();
+ }
+ for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+ dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+ tb.addFieldEndOffset();
+ }
+
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ appender.reset(writeBuffer, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ try {
+ for (int i = 0; i < tupleCount && currentTopTuple != null;) {
+ if (lowKey != null)
+ lowKey.reset(accessor, i);
+ if (highKey != null)
+ highKey.reset(accessor, i);
+ // TODO: currently use low key only, check what they mean
+ int cmp = compare(lowKey, currentTopTuple);
+ if ((cmp <= 0 && isForward) || (cmp >= 0 && !isForward)) {
+ if (cmp == 0)
+ outputMatch(i);
+ i++;
+ } else {
+ moveTreeCursor();
+ }
+ }
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void outputMatch(int i) throws Exception {
+ writeResults(accessor, i, currentTopTuple);
+ match = true;
+ }
+
+ private void moveTreeCursor() throws Exception {
+ if (!match) {
+ writeResults(currentTopTuple);
+ }
+ if (cursor.hasNext()) {
+ cursor.next();
+ currentTopTuple = cursor.getTuple();
+ match = false;
+ } else {
+ currentTopTuple = null;
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ while (currentTopTuple != null) {
+ moveTreeCursor();
+ }
+
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ }
+ writer.close();
+ try {
+ cursor.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ } finally {
+ treeIndexOpHelper.deinit();
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ /** compare tuples */
+ private int compare(ITupleReference left, ITupleReference right) throws Exception {
+ return lowKeySearchCmp.compare(left, right);
+ }
+
+ /** write result for outer case */
+ private void writeResults(ITupleReference frameTuple) throws Exception {
+ tb.reset();
+ for (int i = 0; i < inputRecDesc.getFields().length; i++) {
+ nullWriter[i].writeNull(dos);
+ tb.addFieldEndOffset();
+ }
+ for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+ dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+ tb.addFieldEndOffset();
+ }
+
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ appender.reset(writeBuffer, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
new file mode 100644
index 0000000..0a966b5
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -0,0 +1,263 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
+
+public class IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
+ private TreeIndexDataflowHelper treeIndexOpHelper;
+ private FrameTupleAccessor accessor;
+
+ private ByteBuffer writeBuffer;
+ private FrameTupleAppender appender;
+ private ArrayTupleBuilder tb;
+ private DataOutput dos;
+
+ private BTree btree;
+ private boolean isForward;
+ private RangePredicate rangePred;
+ private MultiComparator lowKeySearchCmp;
+ private ITreeIndexCursor cursor;
+ private ITreeIndexFrame cursorFrame;
+ protected ITreeIndexAccessor indexAccessor;
+
+ private RecordDescriptor recDesc;
+ private final RecordDescriptor inputRecDesc;
+
+ private PermutingFrameTupleReference lowKey;
+ private PermutingFrameTupleReference highKey;
+
+ private ITupleReference currentTopTuple;
+ private boolean match;
+
+ private final IFrameWriter[] writers;
+ private final FunctionProxy functionProxy;
+
+ public IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
+ IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
+ int[] lowKeyFields, int[] highKeyFields, IUpdateFunctionFactory functionFactory,
+ IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory,
+ IRecordDescriptorFactory inputRdFactory, int outputArity) {
+ inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+ treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
+ opDesc, ctx, partition);
+ this.isForward = isForward;
+ this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+
+ if (lowKeyFields != null && lowKeyFields.length > 0) {
+ lowKey = new PermutingFrameTupleReference();
+ lowKey.setFieldPermutation(lowKeyFields);
+ }
+ if (highKeyFields != null && highKeyFields.length > 0) {
+ highKey = new PermutingFrameTupleReference();
+ highKey.setFieldPermutation(highKeyFields);
+ }
+
+ this.writers = new IFrameWriter[outputArity];
+ this.functionProxy = new FunctionProxy(ctx, functionFactory, preHookFactory, postHookFactory, inputRdFactory,
+ writers);
+ }
+
+ protected void setCursor() {
+ cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, true);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ functionProxy.functionOpen();
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+
+ try {
+ treeIndexOpHelper.init(false);
+ btree = (BTree) treeIndexOpHelper.getIndex();
+ cursorFrame = btree.getLeafFrameFactory().createFrame();
+ setCursor();
+
+ rangePred = new RangePredicate(null, null, true, true, null, null);
+ int lowKeySearchFields = btree.getComparatorFactories().length;
+ IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
+ for (int i = 0; i < lowKeySearchFields; i++) {
+ lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ }
+ lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
+
+ writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+ tb = new ArrayTupleBuilder(btree.getFieldCount());
+ dos = tb.getDataOutput();
+ appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+ appender.reset(writeBuffer, true);
+
+ indexAccessor = btree.createAccessor();
+
+ /** set the search cursor */
+ rangePred.setLowKey(null, true);
+ rangePred.setHighKey(null, true);
+ cursor.reset();
+ indexAccessor.search(cursor, rangePred);
+
+ /** set up current top tuple */
+ if (cursor.hasNext()) {
+ cursor.next();
+ currentTopTuple = cursor.getTuple();
+ match = false;
+ }
+
+ } catch (Exception e) {
+ treeIndexOpHelper.deinit();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ try {
+ for (int i = 0; i < tupleCount;) {
+ if (lowKey != null)
+ lowKey.reset(accessor, i);
+ if (highKey != null)
+ highKey.reset(accessor, i);
+ // TODO: currently use low key only, check what they mean
+ if (currentTopTuple != null) {
+ int cmp = compare(lowKey, currentTopTuple);
+ if (cmp == 0) {
+ outputMatch(i);
+ i++;
+ } else if ((cmp > 0 && isForward) || (cmp < 0 && !isForward)) {
+ moveTreeCursor();
+ } else {
+ writeLeftResults(accessor, i, null);
+ i++;
+ }
+ } else {
+ writeLeftResults(accessor, i, null);
+ i++;
+ }
+ }
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void outputMatch(int i) throws Exception {
+ writeLeftResults(accessor, i, currentTopTuple);
+ match = true;
+ }
+
+ private void moveTreeCursor() throws Exception {
+ if (!match) {
+ writeRightResults(currentTopTuple);
+ }
+ if (cursor.hasNext()) {
+ cursor.next();
+ currentTopTuple = cursor.getTuple();
+ match = false;
+ } else {
+ currentTopTuple = null;
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ while (currentTopTuple != null) {
+ moveTreeCursor();
+ }
+ try {
+ cursor.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ functionProxy.functionClose();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ } finally {
+ treeIndexOpHelper.deinit();
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ for (IFrameWriter writer : writers)
+ writer.fail();
+ }
+
+ /** compare tuples */
+ private int compare(ITupleReference left, ITupleReference right) throws Exception {
+ return lowKeySearchCmp.compare(left, right);
+ }
+
+ /** write the right result */
+ private void writeRightResults(ITupleReference frameTuple) throws Exception {
+ tb.reset();
+ for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+ dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+ tb.addFieldEndOffset();
+ }
+
+ functionProxy.functionCall(tb, frameTuple);
+ }
+
+ /** write the left result */
+ private void writeLeftResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference frameTuple)
+ throws Exception {
+ tb.reset();
+ for (int i = 0; i < inputRecDesc.getFields().length; i++) {
+ int tupleStart = leftAccessor.getTupleStartOffset(tIndex);
+ int fieldStart = leftAccessor.getFieldStartOffset(tIndex, i);
+ int offset = leftAccessor.getFieldSlotsLength() + tupleStart + fieldStart;
+ int len = leftAccessor.getFieldEndOffset(tIndex, i) - fieldStart;
+ dos.write(leftAccessor.getBuffer().array(), offset, len);
+ tb.addFieldEndOffset();
+ }
+
+ functionProxy.functionCall(tb, frameTuple);
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ writers[index] = writer;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
new file mode 100644
index 0000000..615a25b
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
@@ -0,0 +1,257 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+
+public class IndexNestedLoopSetUnionOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+ private TreeIndexDataflowHelper treeIndexOpHelper;
+ private FrameTupleAccessor accessor;
+
+ private ByteBuffer writeBuffer;
+ private FrameTupleAppender appender;
+ private ArrayTupleBuilder tb;
+ private DataOutput dos;
+
+ private BTree btree;
+ private RangePredicate rangePred;
+ private MultiComparator lowKeySearchCmp;
+ private ITreeIndexCursor cursor;
+ private ITreeIndexFrame cursorFrame;
+ protected ITreeIndexAccessor indexAccessor;
+
+ private RecordDescriptor recDesc;
+ private final RecordDescriptor inputRecDesc;
+
+ private PermutingFrameTupleReference lowKey;
+ private PermutingFrameTupleReference highKey;
+
+ private ITupleReference currentTopTuple;
+ private boolean match;
+
+ public IndexNestedLoopSetUnionOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
+ IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
+ int[] lowKeyFields, int[] highKeyFields) {
+ inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+ treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
+ opDesc, ctx, partition);
+ this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+
+ if (lowKeyFields != null && lowKeyFields.length > 0) {
+ lowKey = new PermutingFrameTupleReference();
+ lowKey.setFieldPermutation(lowKeyFields);
+ }
+ if (highKeyFields != null && highKeyFields.length > 0) {
+ highKey = new PermutingFrameTupleReference();
+ highKey.setFieldPermutation(highKeyFields);
+ }
+ }
+
+ protected void setCursor() {
+ cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, false);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+
+ try {
+ treeIndexOpHelper.init(false);
+ btree = (BTree) treeIndexOpHelper.getIndex();
+ cursorFrame = btree.getLeafFrameFactory().createFrame();
+ setCursor();
+ writer.open();
+
+ rangePred = new RangePredicate(null, null, true, true, null, null);
+ int lowKeySearchFields = btree.getComparatorFactories().length;
+ IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
+ for (int i = 0; i < lowKeySearchFields; i++) {
+ lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ }
+ lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
+
+ writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+ tb = new ArrayTupleBuilder(btree.getFieldCount());
+ dos = tb.getDataOutput();
+ appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+ appender.reset(writeBuffer, true);
+
+ indexAccessor = btree.createAccessor();
+
+ /** set the search cursor */
+ rangePred.setLowKey(null, true);
+ rangePred.setHighKey(null, true);
+ cursor.reset();
+ indexAccessor.search(cursor, rangePred);
+
+ /** set up current top tuple */
+ if (cursor.hasNext()) {
+ cursor.next();
+ currentTopTuple = cursor.getTuple();
+ match = false;
+ }
+
+ } catch (Exception e) {
+ treeIndexOpHelper.deinit();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ try {
+ for (int i = 0; i < tupleCount;) {
+ if (lowKey != null)
+ lowKey.reset(accessor, i);
+ if (highKey != null)
+ highKey.reset(accessor, i);
+ // TODO: currently use low key only, check what they mean
+ if (currentTopTuple != null) {
+ int cmp = compare(lowKey, currentTopTuple);
+ if (cmp == 0) {
+ outputMatch(i);
+ i++;
+ } else if ((cmp > 0)) {
+ moveTreeCursor();
+ } else {
+ writeLeftResults(accessor, i);
+ i++;
+ }
+ } else {
+ writeLeftResults(accessor, i);
+ i++;
+ }
+ }
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void outputMatch(int i) throws Exception {
+ writeLeftResults(accessor, i);
+ match = true;
+ }
+
+ private void moveTreeCursor() throws Exception {
+ if (!match) {
+ writeRightResults(currentTopTuple);
+ }
+ if (cursor.hasNext()) {
+ cursor.next();
+ currentTopTuple = cursor.getTuple();
+ match = false;
+ } else {
+ currentTopTuple = null;
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ while (currentTopTuple != null) {
+ moveTreeCursor();
+ }
+
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ }
+ writer.close();
+ try {
+ cursor.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ } finally {
+ treeIndexOpHelper.deinit();
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ /** compare tuples */
+ private int compare(ITupleReference left, ITupleReference right) throws Exception {
+ return lowKeySearchCmp.compare(left, right);
+ }
+
+ /** write the right result */
+ private void writeRightResults(ITupleReference frameTuple) throws Exception {
+ tb.reset();
+ for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+ dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+ tb.addFieldEndOffset();
+ }
+
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ appender.reset(writeBuffer, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ /** write the left result */
+ private void writeLeftResults(IFrameTupleAccessor leftAccessor, int tIndex) throws Exception {
+ tb.reset();
+ for (int i = 0; i < inputRecDesc.getFields().length; i++) {
+ int tupleStart = leftAccessor.getTupleStartOffset(tIndex);
+ int fieldStart = leftAccessor.getFieldStartOffset(tIndex, i);
+ int offset = leftAccessor.getFieldSlotsLength() + tupleStart + fieldStart;
+ int len = leftAccessor.getFieldEndOffset(tIndex, i) - fieldStart;
+ dos.write(leftAccessor.getBuffer().array(), offset, len);
+ tb.addFieldEndOffset();
+ }
+
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ appender.reset(writeBuffer, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/ProjectOperatorDescriptor.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/ProjectOperatorDescriptor.java
new file mode 100644
index 0000000..9f35e25
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/ProjectOperatorDescriptor.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class ProjectOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ private final int[] projectFields;
+
+ public ProjectOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc, int projectFields[]) {
+ super(spec, 1, 1);
+ this.recordDescriptors[0] = rDesc;
+ this.projectFields = projectFields;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+ throws HyracksDataException {
+ return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ private final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+ private final FrameTupleAccessor fta = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
+ private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+ private final ArrayTupleBuilder tb = new ArrayTupleBuilder(projectFields.length);
+ private final DataOutput dos = tb.getDataOutput();
+ private final ByteBuffer writeBuffer = ctx.allocateFrame();
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (appender.getTupleCount() > 0)
+ FrameUtils.flushFrame(writeBuffer, writer);
+ writer.close();
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+ fta.reset(frame);
+ int tupleCount = fta.getTupleCount();
+ try {
+ for (int tIndex = 0; tIndex < tupleCount; tIndex++) {
+ tb.reset();
+ for (int j = 0; j < projectFields.length; j++) {
+ int fIndex = projectFields[j];
+ int tupleStart = fta.getTupleStartOffset(tIndex);
+ int fieldStart = fta.getFieldStartOffset(tIndex, fIndex);
+ int offset = fta.getFieldSlotsLength() + tupleStart + fieldStart;
+ int len = fta.getFieldEndOffset(tIndex, fIndex) - fieldStart;
+ dos.write(fta.getBuffer().array(), offset, len);
+ tb.addFieldEndOffset();
+ }
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ appender.reset(writeBuffer, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ appender.reset(writeBuffer, true);
+ }
+
+ };
+ }
+}
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/RuntimeHookOperatorDescriptor.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/RuntimeHookOperatorDescriptor.java
new file mode 100644
index 0000000..20f7aaa
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/RuntimeHookOperatorDescriptor.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHook;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+
+public class RuntimeHookOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private final IRuntimeHookFactory hookFactory;
+
+ public RuntimeHookOperatorDescriptor(JobSpecification spec, IRuntimeHookFactory hookFactory) {
+ super(spec, 1, 1);
+ this.hookFactory = hookFactory;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ private IRuntimeHook hook = hookFactory.createRuntimeHook();
+
+ @Override
+ public void open() throws HyracksDataException {
+ hook.configure(ctx);
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ FrameUtils.flushFrame(buffer, writer);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ writer.close();
+ }
+
+ };
+ }
+
+}
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorDescriptor.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorDescriptor.java
new file mode 100644
index 0000000..126fcb8
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorDescriptor.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.dataflow.std;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+
+public class TreeIndexBulkReLoadOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int[] fieldPermutation;
+ private final IStorageManagerInterface storageManager;
+ private final IIndexRegistryProvider<IIndex> treeIndexRegistryProvider;
+ private final IFileSplitProvider fileSplitProvider;
+ private final float fillFactor;
+
+ public TreeIndexBulkReLoadOperatorDescriptor(JobSpecification spec, IStorageManagerInterface storageManager,
+ IIndexRegistryProvider<IIndex> treeIndexRegistryProvider, IFileSplitProvider fileSplitProvider,
+ ITypeTraits[] typeTraits, IBinaryComparatorFactory[] comparatorFactories, int[] fieldPermutation,
+ float fillFactor, IIndexDataflowHelperFactory opHelperFactory) {
+ super(spec, 1, 0, null, storageManager, treeIndexRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ this.fieldPermutation = fieldPermutation;
+
+ this.storageManager = storageManager;
+ this.treeIndexRegistryProvider = treeIndexRegistryProvider;
+ this.fileSplitProvider = fileSplitProvider;
+ this.fillFactor = fillFactor;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new TreeIndexBulkReLoadOperatorNodePushable(this, ctx, partition, fieldPermutation, fillFactor,
+ recordDescProvider, storageManager, treeIndexRegistryProvider, fileSplitProvider);
+ }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
new file mode 100644
index 0000000..883fef4
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class TreeIndexBulkReLoadOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
+ private final TreeIndexDataflowHelper treeIndexOpHelper;
+ private FrameTupleAccessor accessor;
+ private IIndexBulkLoadContext bulkLoadCtx;
+
+ private IRecordDescriptorProvider recordDescProvider;
+ private PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
+
+ private final IStorageManagerInterface storageManager;
+ private final IIndexRegistryProvider<IIndex> treeIndexRegistryProvider;
+ private final IFileSplitProvider fileSplitProvider;
+ private final int partition;
+ private final float fillFactor;
+ private IHyracksTaskContext ctx;
+ private ITreeIndex index;
+
+ public TreeIndexBulkReLoadOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+ int partition, int[] fieldPermutation, float fillFactor, IRecordDescriptorProvider recordDescProvider,
+ IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IFileSplitProvider fileSplitProvider) {
+ treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
+ opDesc, ctx, partition);
+ this.recordDescProvider = recordDescProvider;
+ tuple.setFieldPermutation(fieldPermutation);
+
+ this.storageManager = storageManager;
+ this.treeIndexRegistryProvider = treeIndexRegistryProvider;
+ this.fileSplitProvider = fileSplitProvider;
+ this.partition = partition;
+ this.ctx = ctx;
+ this.fillFactor = fillFactor;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ initDrop();
+ init();
+ }
+
+ private void initDrop() throws HyracksDataException {
+ try {
+ IndexRegistry<IIndex> treeIndexRegistry = treeIndexRegistryProvider.getRegistry(ctx);
+ IBufferCache bufferCache = storageManager.getBufferCache(ctx);
+ IFileMapProvider fileMapProvider = storageManager.getFileMapProvider(ctx);
+
+ FileReference f = fileSplitProvider.getFileSplits()[partition].getLocalFile();
+ int indexFileId = -1;
+ boolean fileIsMapped = false;
+ synchronized (fileMapProvider) {
+ fileIsMapped = fileMapProvider.isMapped(f);
+ if (fileIsMapped)
+ indexFileId = fileMapProvider.lookupFileId(f);
+ }
+
+ /**
+ * delete the file if it is mapped
+ */
+ if (fileIsMapped) {
+ // Unregister tree instance.
+ synchronized (treeIndexRegistry) {
+ treeIndexRegistry.unregister(indexFileId);
+ }
+
+ // remove name to id mapping
+ bufferCache.deleteFile(indexFileId, false);
+ }
+ }
+ // TODO: for the time being we don't throw,
+ // with proper exception handling (no hanging job problem) we should
+ // throw
+ catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void init() throws HyracksDataException {
+ AbstractTreeIndexOperatorDescriptor opDesc = (AbstractTreeIndexOperatorDescriptor) treeIndexOpHelper
+ .getOperatorDescriptor();
+ RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ try {
+ treeIndexOpHelper.init(true);
+ treeIndexOpHelper.getIndex().open(treeIndexOpHelper.getIndexFileId());
+ index = (ITreeIndex) treeIndexOpHelper.getIndex();
+ index.open(treeIndexOpHelper.getIndexFileId());
+ bulkLoadCtx = index.beginBulkLoad(fillFactor);
+ } catch (Exception e) {
+ // cleanup in case of failure
+ treeIndexOpHelper.deinit();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ tuple.reset(accessor, i);
+ index.bulkLoadAddTuple(tuple, bulkLoadCtx);
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ index.endBulkLoad(bulkLoadCtx);
+ } finally {
+ treeIndexOpHelper.deinit();
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
new file mode 100644
index 0000000..4b0f4a5
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.dataflow.util;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+
+public class FunctionProxy {
+
+ private final IUpdateFunction function;
+ private final IRuntimeHookFactory preHookFactory;
+ private final IRuntimeHookFactory postHookFactory;
+ private final IRecordDescriptorFactory inputRdFactory;
+ private final IHyracksTaskContext ctx;
+ private final IFrameWriter[] writers;
+ private TupleDeserializer tupleDe;
+ private RecordDescriptor inputRd;
+
+ public FunctionProxy(IHyracksTaskContext ctx, IUpdateFunctionFactory functionFactory,
+ IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory,
+ IRecordDescriptorFactory inputRdFactory, IFrameWriter[] writers) {
+ this.function = functionFactory.createFunction();
+ this.preHookFactory = preHookFactory;
+ this.postHookFactory = postHookFactory;
+ this.inputRdFactory = inputRdFactory;
+ this.writers = writers;
+ this.ctx = ctx;
+ }
+
+ /**
+ * Initialize the function
+ *
+ * @throws HyracksDataException
+ */
+ public void functionOpen() throws HyracksDataException {
+ inputRd = inputRdFactory.createRecordDescriptor();
+ tupleDe = new TupleDeserializer(inputRd);
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ for (IFrameWriter writer : writers) {
+ writer.open();
+ }
+ if (preHookFactory != null)
+ preHookFactory.createRuntimeHook().configure(ctx);
+ function.open(ctx, inputRd, writers);
+ }
+
+ /**
+ * Call the function
+ *
+ * @param tb
+ * input data
+ * @param updateRef
+ * update pointer
+ * @throws HyracksDataException
+ */
+ public void functionCall(ArrayTupleBuilder tb, ITupleReference updateRef) throws HyracksDataException {
+ Object[] tuple = tupleDe.deserializeRecord(tb);
+ function.process(tuple);
+ function.update(updateRef);
+ }
+
+ /**
+ * call function, without the newly generated tuple, just the tuple in btree
+ *
+ * @param updateRef
+ * @throws HyracksDataException
+ */
+ public void functionCall(ITupleReference updateRef) throws HyracksDataException {
+ Object[] tuple = tupleDe.deserializeRecord(updateRef);
+ function.process(tuple);
+ function.update(updateRef);
+ }
+
+ /**
+ * Close the function
+ *
+ * @throws HyracksDataException
+ */
+ public void functionClose() throws HyracksDataException {
+ if (postHookFactory != null)
+ postHookFactory.createRuntimeHook().configure(ctx);
+ function.close();
+ for (IFrameWriter writer : writers) {
+ writer.close();
+ }
+ }
+}
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/ResetableByteArrayInputStream.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/ResetableByteArrayInputStream.java
new file mode 100644
index 0000000..b6fd98a
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/ResetableByteArrayInputStream.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.util;
+
+import java.io.InputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ResetableByteArrayInputStream extends InputStream {
+ private static final Logger LOGGER = Logger.getLogger(ResetableByteArrayInputStream.class.getName());
+
+ private byte[] data;
+ private int position;
+
+ public ResetableByteArrayInputStream() {
+ }
+
+ public void setByteArray(byte[] data, int position) {
+ this.data = data;
+ this.position = position;
+ }
+
+ @Override
+ public int read() {
+ int remaining = data.length - position;
+ int value = remaining > 0 ? (data[position++] & 0xff) : -1;
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("read(): value: " + value + " remaining: " + remaining + " position: " + position);
+ }
+ return value;
+ }
+
+ @Override
+ public int read(byte[] bytes, int offset, int length) {
+ int remaining = data.length - position;
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("read(bytes[], int, int): remaining: " + remaining + " offset: " + offset + " length: "
+ + length + " position: " + position);
+ }
+ if (remaining == 0) {
+ return -1;
+ }
+ int l = Math.min(length, remaining);
+ System.arraycopy(data, position, bytes, offset, l);
+ position += l;
+ return l;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/ResetableByteArrayOutputStream.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/ResetableByteArrayOutputStream.java
new file mode 100644
index 0000000..88521a4
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/ResetableByteArrayOutputStream.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.util;
+
+import java.io.OutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ResetableByteArrayOutputStream extends OutputStream {
+ private static final Logger LOGGER = Logger.getLogger(ResetableByteArrayOutputStream.class.getName());
+
+ private byte[] data;
+ private int position;
+
+ public ResetableByteArrayOutputStream() {
+ }
+
+ public void setByteArray(byte[] data, int position) {
+ this.data = data;
+ this.position = position;
+ }
+
+ @Override
+ public void write(int b) {
+ int remaining = data.length - position;
+ if (position + 1 > data.length - 1)
+ throw new IndexOutOfBoundsException();
+ data[position] = (byte) b;
+ position++;
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("write(): value: " + b + " remaining: " + remaining + " position: " + position);
+ }
+ }
+
+ @Override
+ public void write(byte[] bytes, int offset, int length) {
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("write(bytes[], int, int) offset: " + offset + " length: " + length + " position: "
+ + position);
+ }
+ if (position + length > data.length - 1)
+ throw new IndexOutOfBoundsException();
+ System.arraycopy(bytes, offset, data, position, length);
+ position += length;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
new file mode 100644
index 0000000..5ae1d81
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.dataflow.util;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameConstants;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class TupleDeserializer {
+ private static final Logger LOGGER = Logger.getLogger(TupleDeserializer.class.getName());
+
+ private Object[] record;
+ private RecordDescriptor recordDescriptor;
+ private ResetableByteArrayInputStream bbis;
+ private DataInputStream di;
+
+ public TupleDeserializer(RecordDescriptor recordDescriptor) {
+ this.recordDescriptor = recordDescriptor;
+ this.bbis = new ResetableByteArrayInputStream();
+ this.di = new DataInputStream(bbis);
+ this.record = new Object[recordDescriptor.getFields().length];
+ }
+
+ public Object[] deserializeRecord(ITupleReference tupleRef) throws HyracksDataException {
+ for (int i = 0; i < record.length; ++i) {
+ byte[] data = tupleRef.getFieldData(i);
+ int offset = tupleRef.getFieldStart(i);
+ bbis.setByteArray(data, offset);
+
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest(i + " " + instance);
+ }
+ record[i] = instance;
+ if (FrameConstants.DEBUG_FRAME_IO) {
+ try {
+ if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
+ throw new HyracksDataException("Field magic mismatch");
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ return record;
+ }
+
+ public Object[] deserializeRecord(ArrayTupleBuilder tb) throws HyracksDataException {
+ byte[] data = tb.getByteArray();
+ int[] offset = tb.getFieldEndOffsets();
+ int start = 0;
+ for (int i = 0; i < record.length; ++i) {
+ /**
+ * reset the input
+ */
+ bbis.setByteArray(data, start);
+ start = offset[i];
+
+ /**
+ * do deserialization
+ */
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest(i + " " + instance);
+ }
+ record[i] = instance;
+ if (FrameConstants.DEBUG_FRAME_IO) {
+ try {
+ if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
+ throw new HyracksDataException("Field magic mismatch");
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ return record;
+ }
+}