Refactored IOperatorNodePushable to be extended for multiple inputs
git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@111 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePushable.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePushable.java
index f019b13..5487832 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePushable.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePushable.java
@@ -16,7 +16,16 @@
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-public interface IOperatorNodePushable extends IFrameWriter {
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc);
+public interface IOperatorNodePushable {
+ public void initialize() throws HyracksDataException;
+
+ public void deinitialize() throws HyracksDataException;
+
+ public int getInputArity();
+
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc);
+
+ public IFrameWriter getInputFrameWriter(int index);
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
index 39d98b1..12670e7 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
@@ -34,7 +34,7 @@
}
public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
- opNode.setFrameWriter(index, writer, recordDesc);
+ opNode.setOutputFrameWriter(index, writer, recordDesc);
}
public void setFrameReader(IFrameReader reader) {
@@ -48,20 +48,23 @@
@Override
public void run() {
try {
- opNode.open();
+ opNode.initialize();
if (reader != null) {
+ IFrameWriter writer = opNode.getInputFrameWriter(0);
+ writer.open();
reader.open();
while (reader.nextFrame(buffer)) {
if (abort) {
break;
}
buffer.flip();
- opNode.nextFrame(buffer);
+ writer.nextFrame(buffer);
buffer.compact();
}
reader.close();
+ writer.close();
}
- opNode.close();
+ opNode.deinitialize();
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/accessors/FrameTupleReference.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/accessors/FrameTupleReference.java
index d9c9fb2..ff45eba 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/accessors/FrameTupleReference.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/accessors/FrameTupleReference.java
@@ -33,7 +33,7 @@
@Override
public int getFieldStart(int fIdx) {
- return fta.getTupleStartOffset(tIndex) + fta.getFieldSlotsLength() + fta.getFieldStartOffset(tIndex, fIdx);
+ return fta.getTupleStartOffset(tIndex) + fta.getFieldSlotsLength() + fta.getFieldStartOffset(tIndex, fIdx);
}
@Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryInputOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryInputOperatorNodePushable.java
new file mode 100644
index 0000000..182fa2a
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryInputOperatorNodePushable.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.base;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractUnaryInputOperatorNodePushable implements IOperatorNodePushable, IFrameWriter {
+ protected IFrameWriter writer;
+ protected RecordDescriptor recordDesc;
+
+ @Override
+ public final IFrameWriter getInputFrameWriter(int index) {
+ return this;
+ }
+
+ @Override
+ public final int getInputArity() {
+ return 1;
+ }
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ }
+
+ @Override
+ public void deinitialize() throws HyracksDataException {
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryInputSinkOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryInputSinkOperatorNodePushable.java
new file mode 100644
index 0000000..12e8cb4
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryInputSinkOperatorNodePushable.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.base;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public abstract class AbstractUnaryInputSinkOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
+ @Override
+ public final void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ throw new IllegalStateException();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputOperatorNodePushable.java
new file mode 100644
index 0000000..e3767b3
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputOperatorNodePushable.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.base;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+
+public abstract class AbstractUnaryInputUnaryOutputOperatorNodePushable extends AbstractUnaryOutputOperatorNodePushable
+ implements IFrameWriter {
+ @Override
+ public final IFrameWriter getInputFrameWriter(int index) {
+ return this;
+ }
+
+ @Override
+ public final int getInputArity() {
+ return 1;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputOperatorNodePushable.java
index cfe1b65..8b97497e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputOperatorNodePushable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputOperatorNodePushable.java
@@ -17,14 +17,26 @@
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public abstract class AbstractUnaryOutputOperatorNodePushable implements IOperatorNodePushable {
protected IFrameWriter writer;
protected RecordDescriptor recordDesc;
@Override
- public final void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ public final void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ if (index != 0) {
+ throw new IllegalStateException();
+ }
this.writer = writer;
this.recordDesc = recordDesc;
}
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ }
+
+ @Override
+ public void deinitialize() throws HyracksDataException {
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputSourceOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputSourceOperatorNodePushable.java
index f5180f8..db09e57 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputSourceOperatorNodePushable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputSourceOperatorNodePushable.java
@@ -14,17 +14,16 @@
*/
package edu.uci.ics.hyracks.dataflow.std.base;
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
public abstract class AbstractUnaryOutputSourceOperatorNodePushable extends AbstractUnaryOutputOperatorNodePushable {
@Override
- public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- throw new UnsupportedOperationException();
+ public IFrameWriter getInputFrameWriter(int index) {
+ throw new IllegalStateException();
}
@Override
- public final void flush() throws HyracksDataException {
+ public final int getInputArity() {
+ return 0;
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
index efc63d3..8b63690 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
@@ -51,7 +51,7 @@
final ITupleParser tp = tupleParserFactory.createTupleParser(ctx);
return new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
- public void open() throws HyracksDataException {
+ public void initialize() throws HyracksDataException {
File f = split.getLocalFile();
writer.open();
try {
@@ -66,10 +66,6 @@
writer.close();
}
}
-
- @Override
- public void close() throws HyracksDataException {
- }
};
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
index a568781..09c1c6a 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
@@ -16,7 +16,6 @@
import java.nio.ByteBuffer;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
@@ -31,6 +30,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public class HashGroupOperatorDescriptor extends AbstractOperatorDescriptor {
@@ -77,7 +77,7 @@
final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx,
recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
- return new IOperatorNodePushable() {
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
private GroupingHashTable table;
@Override
@@ -102,11 +102,6 @@
}
@Override
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
- throw new IllegalArgumentException();
- }
-
- @Override
public void flush() throws HyracksDataException {
}
};
@@ -126,18 +121,13 @@
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
return new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
- public void open() throws HyracksDataException {
+ public void initialize() throws HyracksDataException {
GroupingHashTable table = (GroupingHashTable) env.get(HASHTABLE);
writer.open();
table.write(writer);
writer.close();
env.set(HASHTABLE, null);
}
-
- @Override
- public void close() throws HyracksDataException {
- // do nothing
- }
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index c124cb3..221c22f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -20,7 +20,6 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
@@ -42,6 +41,7 @@
import edu.uci.ics.hyracks.dataflow.common.data.partition.RepartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public class GraceHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
@@ -119,7 +119,7 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- IOperatorNodePushable op = new IOperatorNodePushable() {
+ IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx,
recordDescProvider.getInputRecordDescriptor(getOperatorId(), operatorInputIndex));
@@ -133,11 +133,6 @@
private final int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
@Override
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
- throw new IllegalArgumentException();
- }
-
- @Override
public void close() throws HyracksDataException {
for (int i = 0; i < numPartitions; i++) {
try {
@@ -249,7 +244,7 @@
private int[] maxBufferRi;
@Override
- public void open() throws HyracksDataException {
+ public void initialize() throws HyracksDataException {
channelsR = (FileChannel[]) env.get(SMALLRELATION);
channelsS = (FileChannel[]) env.get(LARGERELATION);
numPartitions = (Integer) env.get(NUM_PARTITION);
@@ -321,7 +316,7 @@
}
@Override
- public void close() throws HyracksDataException {
+ public void deinitialize() throws HyracksDataException {
env.set(LARGERELATION, null);
env.set(SMALLRELATION, null);
env.set(NUM_PARTITION, null);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 1da6b61..4381bd1 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -20,7 +20,6 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
@@ -43,6 +42,8 @@
import edu.uci.ics.hyracks.dataflow.common.data.partition.RepartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
private static final String JOINER0 = "joiner0";
@@ -132,7 +133,7 @@
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- IOperatorNodePushable op = new IOperatorNodePushable() {
+ IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private InMemoryHashJoin joiner0;
private final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx, rd0);
ITuplePartitionComputer hpc0 = new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)
@@ -147,11 +148,6 @@
private int B;
@Override
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
- throw new IllegalArgumentException();
- }
-
- @Override
public void close() throws HyracksDataException {
if (memoryForHashtable != 0)
build(inBuffer);
@@ -345,7 +341,7 @@
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- IOperatorNodePushable op = new IOperatorNodePushable() {
+ IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
private InMemoryHashJoin joiner0;
private final FrameTupleAccessor accessor1 = new FrameTupleAccessor(ctx, rd1);
private ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
@@ -358,7 +354,6 @@
private final FrameTupleAppender ftap = new FrameTupleAppender(ctx);
private final ByteBuffer inBuffer = ctx.getResourceManager().allocateFrame();
private final ByteBuffer outBuffer = ctx.getResourceManager().allocateFrame();
- private IFrameWriter writer;
private FileChannel[] channelsR;
private FileChannel[] channelsS;
private File filesS[];
@@ -550,14 +545,6 @@
}
@Override
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
- if (index != 0) {
- throw new IllegalStateException();
- }
- this.writer = writer;
- }
-
- @Override
public void flush() throws HyracksDataException {
writer.flush();
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 7e64312..a987e68 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -16,7 +16,6 @@
import java.nio.ByteBuffer;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
@@ -36,6 +35,8 @@
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
private static final String JOINER = "joiner";
@@ -86,7 +87,7 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- IOperatorNodePushable op = new IOperatorNodePushable() {
+ IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private InMemoryHashJoin joiner;
@Override
@@ -113,11 +114,6 @@
}
@Override
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
- throw new IllegalArgumentException();
- }
-
- @Override
public void flush() throws HyracksDataException {
}
};
@@ -136,8 +132,7 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- IOperatorNodePushable op = new IOperatorNodePushable() {
- private IFrameWriter writer;
+ IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
private InMemoryHashJoin joiner;
@Override
@@ -159,14 +154,6 @@
}
@Override
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
- if (index != 0) {
- throw new IllegalStateException();
- }
- this.writer = writer;
- }
-
- @Override
public void flush() throws HyracksDataException {
writer.flush();
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index 0bfe83d..2869f13 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -20,7 +20,6 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
@@ -32,6 +31,7 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public class MaterializingOperatorDescriptor extends AbstractOperatorDescriptor {
@@ -64,7 +64,7 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new IOperatorNodePushable() {
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
private FileChannel out;
private int frameCount;
@@ -108,11 +108,6 @@
}
@Override
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
- throw new IllegalArgumentException();
- }
-
- @Override
public void flush() throws HyracksDataException {
}
};
@@ -132,7 +127,7 @@
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
return new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
- public void open() throws HyracksDataException {
+ public void initialize() throws HyracksDataException {
try {
File inFile = new File((String) env.get(MATERIALIZED_FILE));
int frameCount = (Integer) env.get(FRAME_COUNT);
@@ -157,7 +152,7 @@
}
@Override
- public void close() throws HyracksDataException {
+ public void deinitialize() throws HyracksDataException {
env.set(MATERIALIZED_FILE, null);
}
};
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
index b35954f..c5149a4 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
@@ -1,16 +1,29 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.hyracks.dataflow.std.misc;
import java.nio.ByteBuffer;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
public class NullSinkOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
@@ -22,7 +35,7 @@
@Override
public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new IOperatorNodePushable() {
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
@Override
public void open() throws HyracksDataException {
}
@@ -36,10 +49,6 @@
}
@Override
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
- }
-
- @Override
public void flush() throws HyracksDataException {
}
};
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index 4594685..1012098 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -43,6 +43,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
@@ -98,7 +99,7 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- IOperatorNodePushable op = new IOperatorNodePushable() {
+ IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private final FrameTupleAccessor fta1 = new FrameTupleAccessor(ctx, recordDescriptors[0]);
private final FrameTupleAccessor fta2 = new FrameTupleAccessor(ctx, recordDescriptors[0]);
private List<ByteBuffer> inFrames;
@@ -273,11 +274,6 @@
}
@Override
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
- throw new IllegalArgumentException();
- }
-
- @Override
public void flush() throws HyracksDataException {
}
};
@@ -307,7 +303,7 @@
private FrameTupleAppender outFrameAppender;
@Override
- public void open() throws HyracksDataException {
+ public void initialize() throws HyracksDataException {
inFrames = (List<ByteBuffer>) env.get(IN_FRAMES);
outFrame = ctx.getResourceManager().allocateFrame();
runs = (LinkedList<File>) env.get(RUNS);
@@ -339,11 +335,6 @@
env.set(RUNS, null);
}
- @Override
- public void close() throws HyracksDataException {
- // do nothing
- }
-
// creates a new run from runs that can fit in memory.
private void doPass(LinkedList<File> runs, int passCount) throws ClassNotFoundException, Exception {
File newRun = null;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index be81ecd..8664fdd 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -18,7 +18,6 @@
import java.util.ArrayList;
import java.util.List;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
@@ -35,6 +34,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public class InMemorySortOperatorDescriptor extends AbstractOperatorDescriptor {
@@ -82,7 +82,7 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- IOperatorNodePushable op = new IOperatorNodePushable() {
+ IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private List<ByteBuffer> buffers;
private final FrameTupleAccessor fta1 = new FrameTupleAccessor(ctx, recordDescriptors[0]);
@@ -206,11 +206,6 @@
}
@Override
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
- throw new IllegalArgumentException();
- }
-
- @Override
public void flush() throws HyracksDataException {
}
};
@@ -231,7 +226,7 @@
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
- public void open() throws HyracksDataException {
+ public void initialize() throws HyracksDataException {
List<ByteBuffer> buffers = (List<ByteBuffer>) env.get(BUFFERS);
long[] tPointers = (long[]) env.get(TPOINTERS);
FrameTupleAccessor accessor = new FrameTupleAccessor(ctx, recordDescriptors[0]);
@@ -266,11 +261,6 @@
frame.limit(frame.capacity());
writer.nextFrame(frame);
}
-
- @Override
- public void close() throws HyracksDataException {
- // do nothing
- }
};
return op;
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
index 037e546..b348677 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
@@ -18,14 +18,14 @@
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksContext;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializer;
import edu.uci.ics.hyracks.dataflow.common.comm.io.SerializingDataWriter;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
-public final class DeserializedOperatorNodePushable implements IOperatorNodePushable {
+public final class DeserializedOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
private final IHyracksContext ctx;
private final IOpenableDataWriterOperator delegate;
@@ -40,7 +40,7 @@
}
@Override
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
delegate.setDataWriter(index, new SerializingDataWriter(ctx, recordDesc, writer));
}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/AbstractBTreeOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/AbstractBTreeOperatorNodePushable.java
deleted file mode 100644
index 9dd430c..0000000
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/AbstractBTreeOperatorNodePushable.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.storage.am.btree.dataflow;
-
-import java.io.DataOutput;
-import java.io.File;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.util.Random;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrame;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
-import edu.uci.ics.hyracks.storage.am.btree.api.IFieldAccessor;
-import edu.uci.ics.hyracks.storage.am.btree.frames.MetaDataFrame;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
-import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
-import edu.uci.ics.hyracks.storage.common.file.FileInfo;
-import edu.uci.ics.hyracks.storage.common.file.FileManager;
-
-public abstract class AbstractBTreeOperatorNodePushable extends AbstractUnaryOutputOperatorNodePushable {
-
- protected IBTreeInteriorFrame interiorFrame;
- protected IBTreeLeafFrame leafFrame;
-
- protected BTree btree;
-
- protected AbstractBTreeOperatorDescriptor opDesc;
- protected IHyracksContext ctx;
-
- protected boolean createBTree;
-
- public AbstractBTreeOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, final IHyracksContext ctx, boolean createBTree) {
- this.opDesc = opDesc;
- this.ctx = ctx;
- this.createBTree = createBTree;
- }
-
- public void init() throws Exception {
- IBufferCache bufferCache = opDesc.getBufferCacheProvider().getBufferCache();
- FileManager fileManager = opDesc.getBufferCacheProvider().getFileManager();
-
- File f = new File(opDesc.getBtreeFileName());
- RandomAccessFile raf = new RandomAccessFile(f, "rw");
-
- if(!f.exists() && !createBTree) {
- throw new Exception("Trying to open btree from file " + opDesc.getBtreeFileName() + " but file doesn't exist.");
- }
-
- try {
- FileInfo fi = new FileInfo(opDesc.getBtreeFileId(), raf);
- fileManager.registerFile(fi);
- }
- catch (Exception e) {
- }
-
- interiorFrame = opDesc.getInteriorFactory().getFrame();
- leafFrame = opDesc.getLeafFactory().getFrame();
-
- BTreeRegistry btreeRegistry = opDesc.getBtreeRegistryProvider().getBTreeRegistry();
- btree = btreeRegistry.get(opDesc.getBtreeFileId());
- if(btree == null) {
-
- // create new btree and register it
- btreeRegistry.lock();
- try {
- // check if btree has already been registered by another thread
- btree = btreeRegistry.get(opDesc.getBtreeFileId());
- if(btree == null) {
- // this thread should create and register the btee
-
- // start by building the multicomparator from the factories
- IFieldAccessor[] fields = new IFieldAccessor[opDesc.getFieldAccessorFactories().length];
- for(int i = 0; i < opDesc.getFieldAccessorFactories().length; i++) {
- fields[i] = opDesc.getFieldAccessorFactories()[i].getFieldAccessor();
- }
-
- IBinaryComparator[] comparators = new IBinaryComparator[opDesc.getComparatorFactories().length];
- for(int i = 0; i < opDesc.getComparatorFactories().length; i++) {
- comparators[i] = opDesc.getComparatorFactories()[i].createBinaryComparator();
- }
-
- MultiComparator cmp = new MultiComparator(comparators, fields);
-
- btree = new BTree(bufferCache, opDesc.getInteriorFactory(), opDesc.getLeafFactory(), cmp);
- if(createBTree) {
- MetaDataFrame metaFrame = new MetaDataFrame();
- btree.create(opDesc.getBtreeFileId(), leafFrame, metaFrame);
- }
- btree.open(opDesc.getBtreeFileId());
- btreeRegistry.register(opDesc.getBtreeFileId(), btree);
- }
- }
- finally {
- btreeRegistry.unlock();
- }
- }
- }
-
- // debug
- protected void fill() throws Exception {
-
-
- // TODO: uncomment and fix
- MetaDataFrame metaFrame = new MetaDataFrame();
- btree.create(opDesc.getBtreeFileId(), leafFrame, metaFrame);
-
- Random rnd = new Random();
- rnd.setSeed(50);
-
- ByteBuffer frame = ctx.getResourceManager().allocateFrame();
- FrameTupleAppender appender = new FrameTupleAppender(ctx);
- ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
- DataOutput dos = tb.getDataOutput();
-
- ISerializerDeserializer[] recDescSers = { IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE};
- RecordDescriptor recDesc = new RecordDescriptor(recDescSers);
- IFrameTupleAccessor accessor = new FrameTupleAccessor(ctx, recDesc);
- accessor.reset(frame);
- FrameTupleReference tuple = new FrameTupleReference();
-
- for (int i = 0; i < 10000; i++) {
- int f0 = rnd.nextInt() % 10000;
- int f1 = 5;
-
- tb.reset();
- IntegerSerializerDeserializer.INSTANCE.serialize(f0, dos);
- tb.addFieldEndOffset();
- IntegerSerializerDeserializer.INSTANCE.serialize(f1, dos);
- tb.addFieldEndOffset();
-
- appender.reset(frame, true);
- appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
-
- tuple.reset(accessor, 0);
-
- if (i % 1000 == 0) {
- System.out.println("INSERTING " + i + " : " + f0 + " " + f1);
- }
-
- try {
- btree.insert(tuple, leafFrame, interiorFrame, metaFrame);
- } catch (Exception e) {
- }
- }
-
- /*
- IFieldAccessor[] fields = new IFieldAccessor[2];
- fields[0] = new Int32Accessor(); // key field
- fields[1] = new Int32Accessor(); // value field
-
- int keyLen = 1;
- IBinaryComparator[] cmps = new IBinaryComparator[keyLen];
- cmps[0] = IntegerBinaryComparatorFactory.INSTANCE.createBinaryComparator();
- MultiComparator cmp = new MultiComparator(cmps, fields);
-
- ByteArrayAccessibleOutputStream lkbaaos = new ByteArrayAccessibleOutputStream();
- DataOutputStream lkdos = new DataOutputStream(lkbaaos);
- IntegerSerializerDeserializer.INSTANCE.serialize(-1000, lkdos);
-
- ByteArrayAccessibleOutputStream hkbaaos = new ByteArrayAccessibleOutputStream();
- DataOutputStream hkdos = new DataOutputStream(hkbaaos);
- IntegerSerializerDeserializer.INSTANCE.serialize(1000, hkdos);
-
- byte[] lowKey = lkbaaos.toByteArray();
- byte[] highKey = hkbaaos.toByteArray();
-
- IBinaryComparator[] searchCmps = new IBinaryComparator[1];
- searchCmps[0] = IntegerBinaryComparatorFactory.INSTANCE.createBinaryComparator();
- MultiComparator searchCmp = new MultiComparator(searchCmps, fields);
-
- RangePredicate rangePred = new RangePredicate(true, lowKey, highKey, searchCmp);
- btree.search(cursor, rangePred, leafFrame, interiorFrame);
- try {
- while (cursor.hasNext()) {
- cursor.next();
- byte[] array = cursor.getPage().getBuffer().array();
- int recOffset = cursor.getOffset();
- String rec = cmp.printRecord(array, recOffset);
- System.out.println(rec);
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- cursor.close();
- }
- */
- }
-
- protected byte[] buildBTreeRecordFromHyraxRecord(IFrameTupleAccessor accessor, int tupleId, int[] keyFields, int[] payloadFields) {
-
- // determine size of record
- int btreeRecordSize = 0;
- for(int j = 0; j < keyFields.length; j++) {
- btreeRecordSize += accessor.getFieldLength(tupleId, keyFields[j]);
- }
- for(int j = 0; j < payloadFields.length; j++) {
- btreeRecordSize += accessor.getFieldLength(tupleId, payloadFields[j]);
- }
-
- // allocate record and copy fields
- byte[] btreeRecord = new byte[btreeRecordSize];
- int recRunner = 0;
- for(int j = 0; j < keyFields.length; j++) {
- int fieldStartOff = accessor.getTupleStartOffset(tupleId) + + accessor.getFieldSlotsLength() + accessor.getFieldStartOffset(tupleId, keyFields[j]);
- int fieldLength = accessor.getFieldLength(tupleId, keyFields[j]);
- System.arraycopy(accessor.getBuffer().array(), fieldStartOff, btreeRecord, recRunner, fieldLength);
- recRunner += fieldLength;
- }
- for(int j = 0; j < payloadFields.length; j++) {
- int fieldStartOff = accessor.getTupleStartOffset(tupleId) + + accessor.getFieldSlotsLength() + accessor.getFieldStartOffset(tupleId, payloadFields[j]);
- int fieldLength = accessor.getFieldLength(tupleId, payloadFields[j]);
- System.arraycopy(accessor.getBuffer().array(), fieldStartOff, btreeRecord, recRunner, fieldLength);
- recRunner += fieldLength;
- }
-
- return btreeRecord;
- }
-}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
index 1b9db48..9120f15 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
@@ -12,7 +12,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package edu.uci.ics.hyracks.storage.am.btree.dataflow;
import java.nio.ByteBuffer;
@@ -22,67 +21,70 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeMetaDataFrame;
import edu.uci.ics.hyracks.storage.am.btree.frames.MetaDataFrame;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-public class BTreeBulkLoadOperatorNodePushable extends AbstractBTreeOperatorNodePushable {
-
+public class BTreeBulkLoadOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
private float fillFactor;
-
+ private final BTreeOpHelper btreeOpHelper;
private FrameTupleAccessor accessor;
private BTree.BulkLoadContext bulkLoadCtx;
-
+
private IRecordDescriptorProvider recordDescProvider;
-
+
private PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
-
- public BTreeBulkLoadOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx, int[] fieldPermutation, float fillFactor, IRecordDescriptorProvider recordDescProvider) {
- super(opDesc, ctx, true);
- this.fillFactor = fillFactor;
- this.recordDescProvider = recordDescProvider;
- tuple.setFieldPermutation(fieldPermutation);
- }
-
- @Override
- public void close() throws HyracksDataException {
- try {
- btree.endBulkLoad(bulkLoadCtx);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- accessor.reset(buffer);
-
- int tupleCount = accessor.getTupleCount();
- for(int i = 0; i < tupleCount; i++) {
- tuple.reset(accessor, i);
- try {
- btree.bulkLoadAddRecord(bulkLoadCtx, tuple);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- @Override
- public void open() throws HyracksDataException {
- RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
- accessor = new FrameTupleAccessor(ctx, recDesc);
- IBTreeMetaDataFrame metaFrame = new MetaDataFrame();
- try {
- init();
- btree.open(opDesc.getBtreeFileId());
- bulkLoadCtx = btree.beginBulkLoad(fillFactor, leafFrame, interiorFrame, metaFrame);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
+
+ public BTreeBulkLoadOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx,
+ int[] fieldPermutation, float fillFactor, IRecordDescriptorProvider recordDescProvider) {
+ btreeOpHelper = new BTreeOpHelper(opDesc, ctx, true);
+ this.fillFactor = fillFactor;
+ this.recordDescProvider = recordDescProvider;
+ tuple.setFieldPermutation(fieldPermutation);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ btreeOpHelper.getBTree().endBulkLoad(bulkLoadCtx);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ tuple.reset(accessor, i);
+ try {
+ btreeOpHelper.getBTree().bulkLoadAddRecord(bulkLoadCtx, tuple);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ AbstractBTreeOperatorDescriptor opDesc = btreeOpHelper.getOperatorDescriptor();
+ RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
+ accessor = new FrameTupleAccessor(btreeOpHelper.getHyracksContext(), recDesc);
+ IBTreeMetaDataFrame metaFrame = new MetaDataFrame();
+ try {
+ btreeOpHelper.init();
+ btreeOpHelper.getBTree().open(opDesc.getBtreeFileId());
+ bulkLoadCtx = btreeOpHelper.getBTree().beginBulkLoad(fillFactor, btreeOpHelper.getLeafFrame(),
+ btreeOpHelper.getInteriorFrame(), metaFrame);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
@Override
public void flush() throws HyracksDataException {
- }
-}
+ }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
index 373a27d..0c3b8d5 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
@@ -12,7 +12,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package edu.uci.ics.hyracks.storage.am.btree.dataflow;
import java.io.DataOutput;
@@ -24,6 +23,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeMetaDataFrame;
import edu.uci.ics.hyracks.storage.am.btree.api.IFieldIterator;
@@ -31,81 +31,69 @@
import edu.uci.ics.hyracks.storage.am.btree.impls.DiskOrderScanCursor;
import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
-public class BTreeDiskOrderScanOperatorNodePushable extends AbstractBTreeOperatorNodePushable {
-
- public BTreeDiskOrderScanOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx) {
- super(opDesc, ctx, false);
- }
-
- @Override
- public void open() throws HyracksDataException {
-
- IBTreeLeafFrame cursorFrame = opDesc.getLeafFactory().getFrame();
- DiskOrderScanCursor cursor = new DiskOrderScanCursor(cursorFrame);
- IBTreeMetaDataFrame metaFrame = new MetaDataFrame();
-
- try {
- init();
- fill();
- btree.diskOrderScan(cursor, cursorFrame, metaFrame);
- } catch (FileNotFoundException e1) {
- e1.printStackTrace();
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- MultiComparator cmp = btree.getMultiComparator();
- ByteBuffer frame = ctx.getResourceManager().allocateFrame();
- FrameTupleAppender appender = new FrameTupleAppender(ctx);
- appender.reset(frame, true);
- ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFields().length);
- DataOutput dos = tb.getDataOutput();
-
- try {
- while(cursor.hasNext()) {
- tb.reset();
- cursor.next();
+public class BTreeDiskOrderScanOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
+ private final BTreeOpHelper btreeOpHelper;
- IFieldIterator fieldIter = cursor.getFieldIterator();
- for(int i = 0; i < cmp.getFields().length; i++) {
- int fieldLen = fieldIter.getFieldSize();
- dos.write(fieldIter.getBuffer().array(), fieldIter.getFieldOff(), fieldLen);
- tb.addFieldEndOffset();
- }
-
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- FrameUtils.flushFrame(frame, writer);
- appender.reset(frame, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException();
- }
- }
-
- //int recOffset = cursor.getOffset();
- //String rec = cmp.printRecord(array, recOffset);
- //System.out.println(rec);
- }
-
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(frame, writer);
- }
- writer.close();
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- throw new UnsupportedOperationException();
+ public BTreeDiskOrderScanOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx) {
+ btreeOpHelper = new BTreeOpHelper(opDesc, ctx, false);
}
-
- @Override
- public void close() throws HyracksDataException {
- }
@Override
- public void flush() throws HyracksDataException {
+ public void initialize() throws HyracksDataException {
+
+ IBTreeLeafFrame cursorFrame = btreeOpHelper.getOperatorDescriptor().getLeafFactory().getFrame();
+ DiskOrderScanCursor cursor = new DiskOrderScanCursor(cursorFrame);
+ IBTreeMetaDataFrame metaFrame = new MetaDataFrame();
+
+ try {
+ btreeOpHelper.init();
+ btreeOpHelper.fill();
+ btreeOpHelper.getBTree().diskOrderScan(cursor, cursorFrame, metaFrame);
+ } catch (FileNotFoundException e1) {
+ e1.printStackTrace();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ MultiComparator cmp = btreeOpHelper.getBTree().getMultiComparator();
+ ByteBuffer frame = btreeOpHelper.getHyracksContext().getResourceManager().allocateFrame();
+ FrameTupleAppender appender = new FrameTupleAppender(btreeOpHelper.getHyracksContext());
+ appender.reset(frame, true);
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFields().length);
+ DataOutput dos = tb.getDataOutput();
+
+ try {
+ while (cursor.hasNext()) {
+ tb.reset();
+ cursor.next();
+
+ IFieldIterator fieldIter = cursor.getFieldIterator();
+ for (int i = 0; i < cmp.getFields().length; i++) {
+ int fieldLen = fieldIter.getFieldSize();
+ dos.write(fieldIter.getBuffer().array(), fieldIter.getFieldOff(), fieldLen);
+ tb.addFieldEndOffset();
+ }
+
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+
+ //int recOffset = cursor.getOffset();
+ //String rec = cmp.printRecord(array, recOffset);
+ //System.out.println(rec);
+ }
+
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(frame, writer);
+ }
+ writer.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
-}
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
index 09668fe..ad35a93 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
@@ -12,7 +12,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package edu.uci.ics.hyracks.storage.am.btree.dataflow;
import java.nio.ByteBuffer;
@@ -23,82 +22,96 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrame;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeMetaDataFrame;
import edu.uci.ics.hyracks.storage.am.btree.frames.MetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeOp;
-public class BTreeInsertUpdateDeleteOperatorNodePushable extends AbstractBTreeOperatorNodePushable {
-
+public class BTreeInsertUpdateDeleteOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+ private final BTreeOpHelper btreeOpHelper;
+
private FrameTupleAccessor accessor;
-
+
private IRecordDescriptorProvider recordDescProvider;
-
+
private IBTreeMetaDataFrame metaFrame;
-
+
private BTreeOp op;
-
+
private PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
-
- public BTreeInsertUpdateDeleteOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, BTreeOp op) {
- super(opDesc, ctx, false);
- this.recordDescProvider = recordDescProvider;
- this.op = op;
- tuple.setFieldPermutation(fieldPermutation);
- }
-
- @Override
- public void close() throws HyracksDataException {
- writer.close();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- accessor.reset(buffer);
-
- int tupleCount = accessor.getTupleCount();
- for(int i = 0; i < tupleCount; i++) {
- tuple.reset(accessor, i);
- try {
-
- switch(op) {
-
- case BTO_INSERT: {
- btree.insert(tuple, leafFrame, interiorFrame, metaFrame);
- } break;
-
- case BTO_DELETE: {
- btree.delete(tuple, leafFrame, interiorFrame, metaFrame);
- } break;
-
- default: {
- throw new HyracksDataException("Unsupported operation " + op + " in BTree InsertUpdateDelete operator");
- }
-
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- // pass a copy of the frame to next op
- FrameUtils.flushFrame(buffer.duplicate(), writer);
- }
-
- @Override
- public void open() throws HyracksDataException {
- RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
- accessor = new FrameTupleAccessor(ctx, recDesc);
- try {
- init();
- btree.open(opDesc.getBtreeFileId());
- metaFrame = new MetaDataFrame();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
+
+ public BTreeInsertUpdateDeleteOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx,
+ int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, BTreeOp op) {
+ btreeOpHelper = new BTreeOpHelper(opDesc, ctx, false);
+ this.recordDescProvider = recordDescProvider;
+ this.op = op;
+ tuple.setFieldPermutation(fieldPermutation);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ writer.close();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ final BTree btree = btreeOpHelper.getBTree();
+ final IBTreeLeafFrame leafFrame = btreeOpHelper.getLeafFrame();
+ final IBTreeInteriorFrame interiorFrame = btreeOpHelper.getInteriorFrame();
+
+ accessor.reset(buffer);
+
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ tuple.reset(accessor, i);
+ try {
+
+ switch (op) {
+
+ case BTO_INSERT: {
+ btree.insert(tuple, leafFrame, interiorFrame, metaFrame);
+ }
+ break;
+
+ case BTO_DELETE: {
+ btree.delete(tuple, leafFrame, interiorFrame, metaFrame);
+ }
+ break;
+
+ default: {
+ throw new HyracksDataException("Unsupported operation " + op
+ + " in BTree InsertUpdateDelete operator");
+ }
+
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ // pass a copy of the frame to next op
+ FrameUtils.flushFrame(buffer.duplicate(), writer);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ AbstractBTreeOperatorDescriptor opDesc = btreeOpHelper.getOperatorDescriptor();
+ RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
+ accessor = new FrameTupleAccessor(btreeOpHelper.getHyracksContext(), recDesc);
+ try {
+ btreeOpHelper.init();
+ btreeOpHelper.getBTree().open(opDesc.getBtreeFileId());
+ metaFrame = new MetaDataFrame();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
@Override
public void flush() throws HyracksDataException {
- }
-}
+ }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java
new file mode 100644
index 0000000..68edce1
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java
@@ -0,0 +1,229 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import java.io.DataOutput;
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrame;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.api.IFieldAccessor;
+import edu.uci.ics.hyracks.storage.am.btree.frames.MetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.FileInfo;
+import edu.uci.ics.hyracks.storage.common.file.FileManager;
+
+final class BTreeOpHelper {
+ private IBTreeInteriorFrame interiorFrame;
+ private IBTreeLeafFrame leafFrame;
+
+ private BTree btree;
+
+ private AbstractBTreeOperatorDescriptor opDesc;
+ private IHyracksContext ctx;
+
+ private boolean createBTree;
+
+ BTreeOpHelper(AbstractBTreeOperatorDescriptor opDesc, final IHyracksContext ctx, boolean createBTree) {
+ this.opDesc = opDesc;
+ this.ctx = ctx;
+ this.createBTree = createBTree;
+ }
+
+ void init() throws Exception {
+ IBufferCache bufferCache = opDesc.getBufferCacheProvider().getBufferCache();
+ FileManager fileManager = opDesc.getBufferCacheProvider().getFileManager();
+
+ File f = new File(opDesc.getBtreeFileName());
+ RandomAccessFile raf = new RandomAccessFile(f, "rw");
+
+ if (!f.exists() && !createBTree) {
+ throw new Exception("Trying to open btree from file " + opDesc.getBtreeFileName()
+ + " but file doesn't exist.");
+ }
+
+ try {
+ FileInfo fi = new FileInfo(opDesc.getBtreeFileId(), raf);
+ fileManager.registerFile(fi);
+ } catch (Exception e) {
+ }
+
+ interiorFrame = opDesc.getInteriorFactory().getFrame();
+ leafFrame = opDesc.getLeafFactory().getFrame();
+
+ BTreeRegistry btreeRegistry = opDesc.getBtreeRegistryProvider().getBTreeRegistry();
+ btree = btreeRegistry.get(opDesc.getBtreeFileId());
+ if (btree == null) {
+
+ // create new btree and register it
+ btreeRegistry.lock();
+ try {
+ // check if btree has already been registered by another thread
+ btree = btreeRegistry.get(opDesc.getBtreeFileId());
+ if (btree == null) {
+ // this thread should create and register the btee
+
+ // start by building the multicomparator from the factories
+ IFieldAccessor[] fields = new IFieldAccessor[opDesc.getFieldAccessorFactories().length];
+ for (int i = 0; i < opDesc.getFieldAccessorFactories().length; i++) {
+ fields[i] = opDesc.getFieldAccessorFactories()[i].getFieldAccessor();
+ }
+
+ IBinaryComparator[] comparators = new IBinaryComparator[opDesc.getComparatorFactories().length];
+ for (int i = 0; i < opDesc.getComparatorFactories().length; i++) {
+ comparators[i] = opDesc.getComparatorFactories()[i].createBinaryComparator();
+ }
+
+ MultiComparator cmp = new MultiComparator(comparators, fields);
+
+ btree = new BTree(bufferCache, opDesc.getInteriorFactory(), opDesc.getLeafFactory(), cmp);
+ if (createBTree) {
+ MetaDataFrame metaFrame = new MetaDataFrame();
+ btree.create(opDesc.getBtreeFileId(), leafFrame, metaFrame);
+ }
+ btree.open(opDesc.getBtreeFileId());
+ btreeRegistry.register(opDesc.getBtreeFileId(), btree);
+ }
+ } finally {
+ btreeRegistry.unlock();
+ }
+ }
+ }
+
+ // debug
+ void fill() throws Exception {
+
+ // TODO: uncomment and fix
+ MetaDataFrame metaFrame = new MetaDataFrame();
+ btree.create(opDesc.getBtreeFileId(), leafFrame, metaFrame);
+
+ Random rnd = new Random();
+ rnd.setSeed(50);
+
+ ByteBuffer frame = ctx.getResourceManager().allocateFrame();
+ FrameTupleAppender appender = new FrameTupleAppender(ctx);
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+ DataOutput dos = tb.getDataOutput();
+
+ ISerializerDeserializer[] recDescSers = { IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE };
+ RecordDescriptor recDesc = new RecordDescriptor(recDescSers);
+ IFrameTupleAccessor accessor = new FrameTupleAccessor(ctx, recDesc);
+ accessor.reset(frame);
+ FrameTupleReference tuple = new FrameTupleReference();
+
+ for (int i = 0; i < 10000; i++) {
+ int f0 = rnd.nextInt() % 10000;
+ int f1 = 5;
+
+ tb.reset();
+ IntegerSerializerDeserializer.INSTANCE.serialize(f0, dos);
+ tb.addFieldEndOffset();
+ IntegerSerializerDeserializer.INSTANCE.serialize(f1, dos);
+ tb.addFieldEndOffset();
+
+ appender.reset(frame, true);
+ appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+
+ tuple.reset(accessor, 0);
+
+ if (i % 1000 == 0) {
+ System.out.println("INSERTING " + i + " : " + f0 + " " + f1);
+ }
+
+ try {
+ btree.insert(tuple, leafFrame, interiorFrame, metaFrame);
+ } catch (Exception e) {
+ }
+ }
+
+ /*
+ IFieldAccessor[] fields = new IFieldAccessor[2];
+ fields[0] = new Int32Accessor(); // key field
+ fields[1] = new Int32Accessor(); // value field
+
+ int keyLen = 1;
+ IBinaryComparator[] cmps = new IBinaryComparator[keyLen];
+ cmps[0] = IntegerBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+ MultiComparator cmp = new MultiComparator(cmps, fields);
+
+ ByteArrayAccessibleOutputStream lkbaaos = new ByteArrayAccessibleOutputStream();
+ DataOutputStream lkdos = new DataOutputStream(lkbaaos);
+ IntegerSerializerDeserializer.INSTANCE.serialize(-1000, lkdos);
+
+ ByteArrayAccessibleOutputStream hkbaaos = new ByteArrayAccessibleOutputStream();
+ DataOutputStream hkdos = new DataOutputStream(hkbaaos);
+ IntegerSerializerDeserializer.INSTANCE.serialize(1000, hkdos);
+
+ byte[] lowKey = lkbaaos.toByteArray();
+ byte[] highKey = hkbaaos.toByteArray();
+
+ IBinaryComparator[] searchCmps = new IBinaryComparator[1];
+ searchCmps[0] = IntegerBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+ MultiComparator searchCmp = new MultiComparator(searchCmps, fields);
+
+ RangePredicate rangePred = new RangePredicate(true, lowKey, highKey, searchCmp);
+ btree.search(cursor, rangePred, leafFrame, interiorFrame);
+ try {
+ while (cursor.hasNext()) {
+ cursor.next();
+ byte[] array = cursor.getPage().getBuffer().array();
+ int recOffset = cursor.getOffset();
+ String rec = cmp.printRecord(array, recOffset);
+ System.out.println(rec);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ cursor.close();
+ }
+ */
+ }
+
+ public BTree getBTree() {
+ return btree;
+ }
+
+ public IHyracksContext getHyracksContext() {
+ return ctx;
+ }
+
+ public AbstractBTreeOperatorDescriptor getOperatorDescriptor() {
+ return opDesc;
+ }
+
+ public IBTreeLeafFrame getLeafFrame() {
+ return leafFrame;
+ }
+
+ public IBTreeInteriorFrame getInteriorFrame() {
+ return interiorFrame;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index 3bd3e36..2b78993 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -12,7 +12,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package edu.uci.ics.hyracks.storage.am.btree.dataflow;
import java.io.DataOutput;
@@ -26,106 +25,102 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeCursor;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrame;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
import edu.uci.ics.hyracks.storage.am.btree.api.IFieldIterator;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangeSearchCursor;
-public class BTreeSearchOperatorNodePushable extends AbstractBTreeOperatorNodePushable {
+public class BTreeSearchOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
+ private BTreeOpHelper btreeOpHelper;
+ private boolean isForward;
+ private ITupleReference lowKey;
+ private ITupleReference highKey;
+ private int searchKeyFields;
- private boolean isForward;
- private ITupleReference lowKey;
- private ITupleReference highKey;
- private int searchKeyFields;
-
- public BTreeSearchOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx, boolean isForward, ITupleReference lowKey, ITupleReference highKey, int searchKeyFields) {
- super(opDesc, ctx, false);
- this.isForward = isForward;
- this.lowKey = lowKey;
- this.highKey = highKey;
- this.searchKeyFields = searchKeyFields;
- }
-
- @Override
- public void open() throws HyracksDataException {
-
- IBTreeLeafFrame cursorFrame = opDesc.getLeafFactory().getFrame();
- IBTreeCursor cursor = new RangeSearchCursor(cursorFrame);
-
- try {
- init();
- fill();
-
- // construct range predicate
- assert(searchKeyFields <= btree.getMultiComparator().getKeyLength());
- IBinaryComparator[] searchComparators = new IBinaryComparator[searchKeyFields];
- for(int i = 0; i < searchKeyFields; i++) {
- searchComparators[i] = btree.getMultiComparator().getComparators()[i];
- }
- MultiComparator searchCmp = new MultiComparator(searchComparators, btree.getMultiComparator().getFields());
- RangePredicate rangePred = new RangePredicate(isForward, lowKey, highKey, searchCmp);
-
- btree.search(cursor, rangePred, leafFrame, interiorFrame);
- } catch (FileNotFoundException e1) {
- e1.printStackTrace();
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- MultiComparator cmp = btree.getMultiComparator();
- ByteBuffer frame = ctx.getResourceManager().allocateFrame();
- FrameTupleAppender appender = new FrameTupleAppender(ctx);
- appender.reset(frame, true);
- ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFields().length);
- DataOutput dos = tb.getDataOutput();
-
- try {
- while(cursor.hasNext()) {
- tb.reset();
- cursor.next();
-
- IFieldIterator fieldIter = cursor.getFieldIterator();
- for(int i = 0; i < cmp.getFields().length; i++) {
- int fieldLen = fieldIter.getFieldSize();
- dos.write(fieldIter.getBuffer().array(), fieldIter.getFieldOff(), fieldLen);
- tb.addFieldEndOffset();
- }
-
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- FrameUtils.flushFrame(frame, writer);
- appender.reset(frame, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException();
- }
- }
-
- //int recOffset = cursor.getOffset();
- //String rec = cmp.printRecord(array, recOffset);
- //System.out.println(rec);
- }
-
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(frame, writer);
- }
- writer.close();
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- throw new UnsupportedOperationException();
+ public BTreeSearchOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx,
+ boolean isForward, ITupleReference lowKey, ITupleReference highKey, int searchKeyFields) {
+ btreeOpHelper = new BTreeOpHelper(opDesc, ctx, false);
+ this.isForward = isForward;
+ this.lowKey = lowKey;
+ this.highKey = highKey;
+ this.searchKeyFields = searchKeyFields;
}
-
- @Override
- public void close() throws HyracksDataException {
- }
@Override
- public void flush() throws HyracksDataException {
+ public void initialize() throws HyracksDataException {
+ AbstractBTreeOperatorDescriptor opDesc = btreeOpHelper.getOperatorDescriptor();
+ BTree btree = btreeOpHelper.getBTree();
+ IBTreeLeafFrame leafFrame = btreeOpHelper.getLeafFrame();
+ IBTreeInteriorFrame interiorFrame = btreeOpHelper.getInteriorFrame();
+ IHyracksContext ctx = btreeOpHelper.getHyracksContext();
+
+ IBTreeLeafFrame cursorFrame = opDesc.getLeafFactory().getFrame();
+ IBTreeCursor cursor = new RangeSearchCursor(cursorFrame);
+
+ try {
+ btreeOpHelper.init();
+ btreeOpHelper.fill();
+
+ // construct range predicate
+ assert (searchKeyFields <= btree.getMultiComparator().getKeyLength());
+ IBinaryComparator[] searchComparators = new IBinaryComparator[searchKeyFields];
+ for (int i = 0; i < searchKeyFields; i++) {
+ searchComparators[i] = btree.getMultiComparator().getComparators()[i];
+ }
+ MultiComparator searchCmp = new MultiComparator(searchComparators, btree.getMultiComparator().getFields());
+ RangePredicate rangePred = new RangePredicate(isForward, lowKey, highKey, searchCmp);
+
+ btree.search(cursor, rangePred, leafFrame, interiorFrame);
+ } catch (FileNotFoundException e1) {
+ e1.printStackTrace();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ MultiComparator cmp = btree.getMultiComparator();
+ ByteBuffer frame = ctx.getResourceManager().allocateFrame();
+ FrameTupleAppender appender = new FrameTupleAppender(ctx);
+ appender.reset(frame, true);
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFields().length);
+ DataOutput dos = tb.getDataOutput();
+
+ try {
+ while (cursor.hasNext()) {
+ tb.reset();
+ cursor.next();
+
+ IFieldIterator fieldIter = cursor.getFieldIterator();
+ for (int i = 0; i < cmp.getFields().length; i++) {
+ int fieldLen = fieldIter.getFieldSize();
+ dos.write(fieldIter.getBuffer().array(), fieldIter.getFieldOff(), fieldLen);
+ tb.addFieldEndOffset();
+ }
+
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+
+ //int recOffset = cursor.getOffset();
+ //String rec = cmp.printRecord(array, recOffset);
+ //System.out.println(rec);
+ }
+
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(frame, writer);
+ }
+ writer.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
}
\ No newline at end of file