Refactored Hyracks into runtime/dataflow-common/dataflow-std/server
git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@38 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-common/.classpath b/hyracks-dataflow-common/.classpath
new file mode 100644
index 0000000..86f50f4
--- /dev/null
+++ b/hyracks-dataflow-common/.classpath
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="src" output="target/classes" path="src/main/java"/>
+ <classpathentry kind="src" path="src/test/java"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
+ <classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
+ <classpathentry kind="output" path="target/classes"/>
+</classpath>
diff --git a/hyracks-dataflow-common/.project b/hyracks-dataflow-common/.project
new file mode 100644
index 0000000..8fd9acf
--- /dev/null
+++ b/hyracks-dataflow-common/.project
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>hyracks-dataflow-common</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.maven.ide.eclipse.maven2Builder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.maven.ide.eclipse.maven2Nature</nature>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ </natures>
+</projectDescription>
diff --git a/hyracks-dataflow-common/.settings/org.eclipse.jdt.core.prefs b/hyracks-dataflow-common/.settings/org.eclipse.jdt.core.prefs
new file mode 100644
index 0000000..8496bf4
--- /dev/null
+++ b/hyracks-dataflow-common/.settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,6 @@
+#Thu Jul 29 14:49:35 PDT 2010
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
+org.eclipse.jdt.core.compiler.compliance=1.6
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
+org.eclipse.jdt.core.compiler.source=1.6
diff --git a/hyracks-dataflow-common/.settings/org.maven.ide.eclipse.prefs b/hyracks-dataflow-common/.settings/org.maven.ide.eclipse.prefs
new file mode 100644
index 0000000..c411710
--- /dev/null
+++ b/hyracks-dataflow-common/.settings/org.maven.ide.eclipse.prefs
@@ -0,0 +1,9 @@
+#Thu Jul 29 14:49:35 PDT 2010
+activeProfiles=
+eclipse.preferences.version=1
+fullBuildGoals=process-test-resources
+includeModules=false
+resolveWorkspaceProjects=true
+resourceFilterGoals=process-resources resources\:testResources
+skipCompilerPlugin=true
+version=1
diff --git a/hyracks-dataflow-common/pom.xml b/hyracks-dataflow-common/pom.xml
new file mode 100644
index 0000000..589e93f
--- /dev/null
+++ b/hyracks-dataflow-common/pom.xml
@@ -0,0 +1,28 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-common</artifactId>
+ <version>0.1.0</version>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>0.1.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
new file mode 100644
index 0000000..bfc79e7
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
@@ -0,0 +1,190 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
+import edu.uci.ics.hyracks.api.comm.IDataReceiveListener;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+
+public class ConnectionEntry implements IConnectionEntry {
+ private static final Logger LOGGER = Logger.getLogger(ConnectionEntry.class.getName());
+
+ private SocketChannel socketChannel;
+
+ private final ByteBuffer readBuffer;
+
+ private final ByteBuffer writeBuffer;
+
+ private IDataReceiveListener recvListener;
+
+ private Object attachment;
+
+ private final SelectionKey key;
+
+ private UUID jobId;
+
+ private UUID stageId;
+
+ private boolean aborted;
+
+ public ConnectionEntry(IHyracksContext ctx, SocketChannel socketChannel, SelectionKey key) {
+ this.socketChannel = socketChannel;
+ readBuffer = ctx.getResourceManager().allocateFrame();
+ readBuffer.clear();
+ writeBuffer = ctx.getResourceManager().allocateFrame();
+ writeBuffer.clear();
+ this.key = key;
+ }
+
+ public SocketChannel getSocketChannel() {
+ return socketChannel;
+ }
+
+ public boolean dispatch(SelectionKey key) throws IOException {
+ if (aborted) {
+ recvListener.dataReceived(this);
+ } else {
+ if (key.isReadable()) {
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.finer("Before read: " + readBuffer.position() + " " + readBuffer.limit());
+ }
+ int bytesRead = socketChannel.read(readBuffer);
+ if (bytesRead < 0) {
+ recvListener.eos(this);
+ return true;
+ }
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.finer("After read: " + readBuffer.position() + " " + readBuffer.limit());
+ }
+ recvListener.dataReceived(this);
+ } else if (key.isWritable()) {
+ synchronized (this) {
+ writeBuffer.flip();
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.finer("Before write: " + writeBuffer.position() + " " + writeBuffer.limit());
+ }
+ int bytesWritten = socketChannel.write(writeBuffer);
+ if (bytesWritten < 0) {
+ return true;
+ }
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.finer("After write: " + writeBuffer.position() + " " + writeBuffer.limit());
+ }
+ if (writeBuffer.remaining() <= 0) {
+ int ops = key.interestOps();
+ key.interestOps(ops & ~SelectionKey.OP_WRITE);
+ }
+ writeBuffer.compact();
+ notifyAll();
+ }
+ } else {
+ LOGGER.warning("Spurious event triggered: " + key.readyOps());
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public ByteBuffer getReadBuffer() {
+ return readBuffer;
+ }
+
+ @Override
+ public synchronized void write(ByteBuffer buffer) {
+ while (buffer.remaining() > 0) {
+ while (writeBuffer.remaining() <= 0) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ int oldLimit = buffer.limit();
+ buffer.limit(Math.min(oldLimit, writeBuffer.remaining()));
+ writeBuffer.put(buffer);
+ buffer.limit(oldLimit);
+ int ops = key.interestOps();
+ key.interestOps(ops | SelectionKey.OP_WRITE);
+ key.selector().wakeup();
+ }
+ }
+
+ @Override
+ public void setDataReceiveListener(IDataReceiveListener listener) {
+ this.recvListener = listener;
+ }
+
+ @Override
+ public void attach(Object attachment) {
+ this.attachment = attachment;
+ }
+
+ @Override
+ public Object getAttachment() {
+ return attachment;
+ }
+
+ @Override
+ public void close() {
+ try {
+ socketChannel.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public SelectionKey getSelectionKey() {
+ return key;
+ }
+
+ @Override
+ public UUID getJobId() {
+ return jobId;
+ }
+
+ @Override
+ public void setJobId(UUID jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public UUID getStageId() {
+ return stageId;
+ }
+
+ @Override
+ public void setStageId(UUID stageId) {
+ this.stageId = stageId;
+ }
+
+ @Override
+ public void abort() {
+ aborted = true;
+ }
+
+ @Override
+ public boolean aborted() {
+ return aborted;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java
new file mode 100644
index 0000000..96fb70a
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
+import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class NonDeterministicFrameReader implements IFrameReader {
+ private static final Logger LOGGER = Logger.getLogger(NonDeterministicFrameReader.class.getName());
+
+ private final IHyracksContext ctx;
+ private final IConnectionDemultiplexer demux;
+ private int lastReadSender;
+ private boolean eos;
+
+ public NonDeterministicFrameReader(IHyracksContext ctx, IConnectionDemultiplexer demux) {
+ this.ctx = ctx;
+ this.demux = demux;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ lastReadSender = 0;
+ eos = false;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ }
+
+ @Override
+ public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ if (eos) {
+ return false;
+ }
+ while (true) {
+ IConnectionEntry entry = demux.findNextReadyEntry(lastReadSender);
+ if (entry.aborted()) {
+ eos = true;
+ return false;
+ }
+ lastReadSender = (Integer) entry.getAttachment();
+ ByteBuffer netBuffer = entry.getReadBuffer();
+ int tupleCount = netBuffer.getInt(FrameHelper.getTupleCountOffset(ctx));
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.finer("Frame Tuple Count: " + tupleCount);
+ }
+ if (tupleCount == 0) {
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Empty Frame received: Closing " + lastReadSender);
+ }
+ int openEntries = demux.closeEntry(lastReadSender);
+ if (openEntries == 0) {
+ eos = true;
+ return false;
+ }
+ netBuffer.clear();
+ demux.unreadyEntry(lastReadSender);
+ } else {
+ buffer.clear();
+ buffer.put(netBuffer);
+ netBuffer.clear();
+ demux.unreadyEntry(lastReadSender);
+ return true;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/SortMergeFrameReader.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/SortMergeFrameReader.java
new file mode 100644
index 0000000..5c97397
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/SortMergeFrameReader.java
@@ -0,0 +1,247 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Comparator;
+import java.util.PriorityQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
+import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.comm.io.FrameHelper;
+import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.comm.io.FrameTuplePairComparator;
+
+public class SortMergeFrameReader implements IFrameReader {
+ private static final Logger LOGGER = Logger.getLogger(SortMergeFrameReader.class.getName());
+
+ private final IHyracksContext ctx;
+ private final IConnectionDemultiplexer demux;
+ private final FrameTuplePairComparator tpc;
+ private final FrameTupleAppender appender;
+ private final RecordDescriptor recordDescriptor;
+ private Run[] runs;
+ private ByteBuffer[] frames;
+ private PriorityQueue<Integer> pQueue;
+ private int lastReadSender;
+ private boolean first;
+
+ public SortMergeFrameReader(IHyracksContext ctx, IConnectionDemultiplexer demux, int[] sortFields,
+ IBinaryComparator[] comparators, RecordDescriptor recordDescriptor) {
+ this.ctx = ctx;
+ this.demux = demux;
+ tpc = new FrameTuplePairComparator(sortFields, sortFields, comparators);
+ appender = new FrameTupleAppender(ctx);
+ this.recordDescriptor = recordDescriptor;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ int nSenders = demux.getSenderCount();
+ runs = new Run[nSenders];
+ frames = new ByteBuffer[nSenders];
+ for (int i = 0; i < runs.length; ++i) {
+ runs[i] = new Run(i);
+ frames[i] = ctx.getResourceManager().allocateFrame();
+ }
+ pQueue = new PriorityQueue<Integer>(nSenders, new Comparator<Integer>() {
+ @Override
+ public int compare(Integer o1, Integer o2) {
+ int i1 = o1.intValue();
+ int i2 = o2.intValue();
+ Run r1 = runs[i1];
+ Run r2 = runs[i2];
+
+ int c = tpc.compare(r1.accessor, r1.tIndex, r2.accessor, r2.tIndex);
+ return c == 0 ? (i1 < i2 ? -1 : 1) : c;
+ }
+ });
+ lastReadSender = 0;
+ first = true;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ for (Run r : runs) {
+ r.close();
+ }
+ }
+
+ @Override
+ public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ buffer.clear();
+ buffer.position(buffer.capacity());
+ appender.reset(buffer, true);
+ if (first) {
+ for (int i = 0; i < runs.length; ++i) {
+ if (runs[i].next()) {
+ pQueue.add(Integer.valueOf(i));
+ }
+ }
+ }
+ first = false;
+ while (true) {
+ if (pQueue.isEmpty()) {
+ return appender.getTupleCount() > 0;
+ }
+ Integer top = pQueue.peek();
+ Run run = runs[top.intValue()];
+ if (!appender.append(run.accessor, run.tIndex)) {
+ return true;
+ }
+ pQueue.remove();
+ if (run.next()) {
+ pQueue.add(top);
+ }
+ }
+ }
+
+ private class Run {
+ private final int runId;
+ private final File file;
+ private final FileChannel channel;
+ private final ByteBuffer frame;
+ private final FrameTupleAccessor accessor;
+ private int tIndex;
+ private long readFP;
+ private long writeFP;
+ private boolean eof;
+
+ public Run(int runId) throws HyracksDataException {
+ this.runId = runId;
+ try {
+ file = ctx.getResourceManager().createFile(SortMergeFrameReader.class.getSimpleName(), ".run");
+ RandomAccessFile raf = new RandomAccessFile(file, "rw");
+ channel = raf.getChannel();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ frame = ctx.getResourceManager().allocateFrame();
+ accessor = new FrameTupleAccessor(ctx, recordDescriptor);
+ readFP = 0;
+ writeFP = 0;
+ eof = false;
+ }
+
+ public void close() throws HyracksDataException {
+ try {
+ channel.close();
+ file.delete();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void write(ByteBuffer frame) throws HyracksDataException {
+ try {
+ int len = frame.capacity();
+ while (len > 0) {
+ int sz = channel.write(frame, writeFP);
+ if (sz < 0) {
+ throw new HyracksDataException("Error writing to run");
+ }
+ len -= sz;
+ writeFP += sz;
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private boolean next() throws HyracksDataException {
+ ++tIndex;
+ while (readFP == 0 || tIndex >= accessor.getTupleCount()) {
+ if (!read(frame)) {
+ return false;
+ }
+ accessor.reset(frame);
+ tIndex = 0;
+ }
+ return true;
+ }
+
+ private boolean read(ByteBuffer frame) throws HyracksDataException {
+ while (!eof && readFP >= writeFP) {
+ spoolRuns(runId);
+ }
+ if (eof && readFP >= writeFP) {
+ return false;
+ }
+ try {
+ channel.position(readFP);
+ frame.clear();
+ int len = frame.capacity();
+ while (len > 0) {
+ int sz = channel.read(frame, readFP);
+ if (sz < 0) {
+ throw new HyracksDataException("Error reading file");
+ }
+ len -= sz;
+ readFP += sz;
+ }
+ return true;
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void eof() {
+ eof = true;
+ }
+ }
+
+ private void spoolRuns(int interestingRun) throws HyracksDataException {
+ while (true) {
+ IConnectionEntry entry = demux.findNextReadyEntry(lastReadSender);
+ lastReadSender = (Integer) entry.getAttachment();
+ ByteBuffer netBuffer = entry.getReadBuffer();
+ int tupleCount = netBuffer.getInt(FrameHelper.getTupleCountOffset(ctx));
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.finer("Frame Tuple Count: " + tupleCount);
+ }
+ if (tupleCount == 0) {
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Empty Frame received: Closing " + lastReadSender);
+ }
+ int openEntries = demux.closeEntry(lastReadSender);
+ runs[lastReadSender].eof();
+ netBuffer.clear();
+ demux.unreadyEntry(lastReadSender);
+ if (openEntries == 0) {
+ return;
+ }
+ } else {
+ runs[lastReadSender].write(netBuffer);
+ netBuffer.clear();
+ demux.unreadyEntry(lastReadSender);
+ }
+ if (lastReadSender == interestingRun) {
+ return;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/ArrayTupleBuilder.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/ArrayTupleBuilder.java
new file mode 100644
index 0000000..368f7a3
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/ArrayTupleBuilder.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.io;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class ArrayTupleBuilder {
+ private final ByteArrayAccessibleOutputStream baaos;
+ private final DataOutputStream dos;
+ private final int[] fEndOffsets;
+ private int nextField;
+
+ public ArrayTupleBuilder(int nFields) {
+ baaos = new ByteArrayAccessibleOutputStream();
+ dos = new DataOutputStream(baaos);
+ fEndOffsets = new int[nFields];
+ }
+
+ public void reset() {
+ nextField = 0;
+ baaos.reset();
+ }
+
+ public int[] getFieldEndOffsets() {
+ return fEndOffsets;
+ }
+
+ public byte[] getByteArray() {
+ return baaos.getByteArray();
+ }
+
+ public int getSize() {
+ return baaos.size();
+ }
+
+ public void addField(FrameTupleAccessor accessor, int tIndex, int fIndex) throws HyracksDataException {
+ int startOffset = accessor.getTupleStartOffset(tIndex);
+ int fStartOffset = accessor.getFieldStartOffset(tIndex, fIndex);
+ int fLen = accessor.getFieldEndOffset(tIndex, fIndex) - fStartOffset;
+ try {
+ dos.write(accessor.getBuffer().array(), startOffset + accessor.getFieldSlotsLength() + fStartOffset, fLen);
+ if (FrameConstants.DEBUG_FRAME_IO) {
+ dos.writeInt(FrameConstants.FRAME_FIELD_MAGIC);
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ fEndOffsets[nextField++] = baaos.size();
+ }
+
+ public <T> void addField(ISerializerDeserializer<T> serDeser, T instance) throws HyracksDataException {
+ serDeser.serialize(instance, dos);
+ fEndOffsets[nextField++] = baaos.size();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/ByteArrayAccessibleOutputStream.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/ByteArrayAccessibleOutputStream.java
new file mode 100644
index 0000000..41c771b
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/ByteArrayAccessibleOutputStream.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.io;
+
+import java.io.ByteArrayOutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ByteArrayAccessibleOutputStream extends ByteArrayOutputStream {
+ private static final Logger LOGGER = Logger.getLogger(ByteArrayAccessibleOutputStream.class.getName());
+
+ public byte[] getByteArray() {
+ return buf;
+ }
+
+ public void write(int b) {
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("write(byte) value: " + b);
+ }
+ super.write(b);
+ }
+
+ @Override
+ public void write(byte[] bytes, int offset, int length) {
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("write(byte[], int, int) offset: " + offset + " length" + length);
+ }
+ super.write(bytes, offset, length);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameConstants.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameConstants.java
new file mode 100644
index 0000000..93aab63
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameConstants.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.io;
+
+public interface FrameConstants {
+ public static final int SIZE_LEN = 4;
+
+ public static final boolean DEBUG_FRAME_IO = false;
+
+ public static final int FRAME_FIELD_MAGIC = 0x12345678;
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializer.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializer.java
new file mode 100644
index 0000000..499f692
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializer.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.io;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.comm.util.ByteBufferInputStream;
+
+public class FrameDeserializer {
+ private static final Logger LOGGER = Logger.getLogger(FrameDeserializer.class.getName());
+
+ private final ByteBufferInputStream bbis;
+
+ private final DataInputStream di;
+
+ private final RecordDescriptor recordDescriptor;
+
+ private final FrameTupleAccessor frameTupleAccessor;
+
+ private int tupleCount;
+
+ private int tIndex;
+
+ private ByteBuffer buffer;
+
+ public FrameDeserializer(IHyracksContext ctx, RecordDescriptor recordDescriptor) {
+ this.bbis = new ByteBufferInputStream();
+ this.di = new DataInputStream(bbis);
+ this.recordDescriptor = recordDescriptor;
+ frameTupleAccessor = new FrameTupleAccessor(ctx, recordDescriptor);
+ }
+
+ public void reset(ByteBuffer buffer) {
+ this.buffer = buffer;
+ frameTupleAccessor.reset(buffer);
+ tupleCount = frameTupleAccessor.getTupleCount();
+ tIndex = 0;
+ }
+
+ public boolean done() {
+ return tIndex >= tupleCount;
+ }
+
+ public Object[] deserializeRecord() throws HyracksDataException {
+ int start = frameTupleAccessor.getTupleStartOffset(tIndex) + frameTupleAccessor.getFieldSlotsLength();
+ bbis.setByteBuffer(buffer, start);
+ Object[] record = new Object[recordDescriptor.getFields().length];
+ for (int i = 0; i < record.length; ++i) {
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest(i + " " + instance);
+ }
+ record[i] = instance;
+ if (FrameConstants.DEBUG_FRAME_IO) {
+ try {
+ if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
+ throw new HyracksDataException("Field magic mismatch");
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("Read Record tIndex = " + tIndex + ", tupleCount = " + tupleCount);
+ }
+ ++tIndex;
+ return record;
+ }
+
+ public void close() {
+ try {
+ di.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataReader.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataReader.java
new file mode 100644
index 0000000..08f3472
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataReader.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.io;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOpenableDataReader;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class FrameDeserializingDataReader implements IOpenableDataReader<Object[]> {
+ private final ByteBuffer buffer;
+
+ private boolean eos;
+
+ private boolean first;
+
+ private final IFrameReader frameReader;
+
+ private final FrameDeserializer frameDeserializer;
+
+ public FrameDeserializingDataReader(IHyracksContext ctx, IFrameReader frameReader, RecordDescriptor recordDescriptor) {
+ buffer = ctx.getResourceManager().allocateFrame();
+ this.frameReader = frameReader;
+ this.frameDeserializer = new FrameDeserializer(ctx, recordDescriptor);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ frameReader.open();
+ buffer.clear();
+ buffer.flip();
+ eos = false;
+ first = true;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ frameReader.close();
+ frameDeserializer.close();
+ }
+
+ @Override
+ public Object[] readData() throws HyracksDataException {
+ while (true) {
+ if (eos) {
+ return null;
+ }
+ if (!first && !frameDeserializer.done()) {
+ return frameDeserializer.deserializeRecord();
+ }
+ buffer.clear();
+ if (!frameReader.nextFrame(buffer)) {
+ eos = true;
+ } else {
+ frameDeserializer.reset(buffer);
+ }
+ first = false;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataWriter.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataWriter.java
new file mode 100644
index 0000000..413ded6
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataWriter.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.io;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class FrameDeserializingDataWriter implements IFrameWriter {
+ private final IOpenableDataWriter<Object[]> writer;
+ private final FrameDeserializer frameDeserializer;
+
+ public FrameDeserializingDataWriter(IHyracksContext ctx, IOpenableDataWriter<Object[]> writer,
+ RecordDescriptor recordDescriptor) {
+ this.writer = writer;
+ this.frameDeserializer = new FrameDeserializer(ctx, recordDescriptor);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ writer.close();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ frameDeserializer.reset(buffer);
+ while (!frameDeserializer.done()) {
+ Object[] tuple = frameDeserializer.deserializeRecord();
+ writer.writeData(tuple);
+ }
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameHelper.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameHelper.java
new file mode 100644
index 0000000..52ecfce
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameHelper.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.io;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+
+public class FrameHelper {
+ public static int getTupleCountOffset(IHyracksContext ctx) {
+ return ctx.getFrameSize() - 4;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAccessor.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAccessor.java
new file mode 100644
index 0000000..9a10136
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAccessor.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.io;
+
+import java.io.DataInputStream;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.comm.util.ByteBufferInputStream;
+
+/**
+ * FrameTupleCursor is used to navigate over tuples in a Frame.
+ * A frame is formatted with tuple data concatenated starting at offset 0, one tuple after another.
+ * Offset FS - 4 holds an int indicating the number of tuples (N) in the frame. FS - ((i + 1) * 4) for i from
+ * 0 to N - 1 holds an int indicating the offset of the (i + 1)^th tuple.
+ * Every tuple is organized as a sequence of shorts indicating the end of each field in the tuple relative to the end of the
+ * field slots.
+ *
+ * @author vinayakb
+ */
+public final class FrameTupleAccessor implements IFrameTupleAccessor {
+ private final IHyracksContext ctx;
+ private final RecordDescriptor recordDescriptor;
+
+ private ByteBuffer buffer;
+
+ public FrameTupleAccessor(IHyracksContext ctx, RecordDescriptor recordDescriptor) {
+ this.ctx = ctx;
+ this.recordDescriptor = recordDescriptor;
+ }
+
+ @Override
+ public void reset(ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ @Override
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
+
+ @Override
+ public int getTupleCount() {
+ return buffer.getInt(FrameHelper.getTupleCountOffset(ctx));
+ }
+
+ @Override
+ public int getTupleStartOffset(int tupleIndex) {
+ return tupleIndex == 0 ? 0 : buffer.getInt(FrameHelper.getTupleCountOffset(ctx) - 4 * tupleIndex);
+ }
+
+ @Override
+ public int getTupleEndOffset(int tupleIndex) {
+ return buffer.getInt(FrameHelper.getTupleCountOffset(ctx) - 4 * (tupleIndex + 1));
+ }
+
+ @Override
+ public int getFieldStartOffset(int tupleIndex, int fIdx) {
+ return fIdx == 0 ? 0 : buffer.getShort(getTupleStartOffset(tupleIndex) + (fIdx - 1) * 2);
+ }
+
+ @Override
+ public int getFieldEndOffset(int tupleIndex, int fIdx) {
+ return buffer.getShort(getTupleStartOffset(tupleIndex) + fIdx * 2);
+ }
+
+ @Override
+ public int getFieldSlotsLength() {
+ return recordDescriptor.getFields().length * 2;
+ }
+
+ public void prettyPrint() {
+ ByteBufferInputStream bbis = new ByteBufferInputStream();
+ DataInputStream dis = new DataInputStream(bbis);
+ int tc = getTupleCount();
+ System.err.println("TC: " + tc);
+ for (int i = 0; i < tc; ++i) {
+ System.err.print(i + ":(" + getTupleStartOffset(i) + ", " + getTupleEndOffset(i) + ")[");
+ for (int j = 0; j < recordDescriptor.getFields().length; ++j) {
+ System.err.print(j + ":(" + getFieldStartOffset(i, j) + ", " + getFieldEndOffset(i, j) + ") ");
+ System.err.print("{");
+ bbis.setByteBuffer(buffer, getTupleStartOffset(i) + getFieldSlotsLength() + getFieldStartOffset(i, j));
+ try {
+ System.err.print(recordDescriptor.getFields()[j].deserialize(dis));
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ }
+ System.err.print("}");
+ }
+ System.err.println("]");
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java
new file mode 100644
index 0000000..c57e589
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.io;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+
+public class FrameTupleAppender {
+ private final IHyracksContext ctx;
+
+ private ByteBuffer buffer;
+
+ private int tupleCount;
+
+ private int tupleDataEndOffset;
+
+ public FrameTupleAppender(IHyracksContext ctx) {
+ this.ctx = ctx;
+ }
+
+ public void reset(ByteBuffer buffer, boolean clear) {
+ this.buffer = buffer;
+ if (clear) {
+ buffer.putInt(FrameHelper.getTupleCountOffset(ctx), 0);
+ tupleCount = 0;
+ tupleDataEndOffset = 0;
+ } else {
+ tupleCount = buffer.getInt(FrameHelper.getTupleCountOffset(ctx));
+ tupleDataEndOffset = tupleCount == 0 ? 0 : buffer.getInt(FrameHelper.getTupleCountOffset(ctx) - tupleCount
+ * 4);
+ }
+ }
+
+ public boolean append(int[] fieldSlots, byte[] bytes, int offset, int length) {
+ if (tupleDataEndOffset + fieldSlots.length * 2 + length + 4 + (tupleCount + 1) * 4 <= ctx.getFrameSize()) {
+ for (int i = 0; i < fieldSlots.length; ++i) {
+ buffer.putShort(tupleDataEndOffset + i * 2, (short) fieldSlots[i]);
+ }
+ System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset + fieldSlots.length * 2, length);
+ tupleDataEndOffset += fieldSlots.length * 2 + length;
+ buffer.putInt(FrameHelper.getTupleCountOffset(ctx) - 4 * (tupleCount + 1), tupleDataEndOffset);
+ ++tupleCount;
+ buffer.putInt(FrameHelper.getTupleCountOffset(ctx), tupleCount);
+ return true;
+ }
+ return false;
+ }
+
+ public boolean append(FrameTupleAccessor tupleAccessor, int tIndex) {
+ int startOffset = tupleAccessor.getTupleStartOffset(tIndex);
+ int endOffset = tupleAccessor.getTupleEndOffset(tIndex);
+ int length = endOffset - startOffset;
+ if (tupleDataEndOffset + length + 4 + (tupleCount + 1) * 4 <= ctx.getFrameSize()) {
+ ByteBuffer src = tupleAccessor.getBuffer();
+ System.arraycopy(src.array(), startOffset, buffer.array(), tupleDataEndOffset, length);
+ tupleDataEndOffset += length;
+ buffer.putInt(FrameHelper.getTupleCountOffset(ctx) - 4 * (tupleCount + 1), tupleDataEndOffset);
+ ++tupleCount;
+ buffer.putInt(FrameHelper.getTupleCountOffset(ctx), tupleCount);
+ return true;
+ }
+ return false;
+ }
+
+ public boolean appendConcat(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1) {
+ int startOffset0 = accessor0.getTupleStartOffset(tIndex0);
+ int endOffset0 = accessor0.getTupleEndOffset(tIndex0);
+ int length0 = endOffset0 - startOffset0;
+
+ int startOffset1 = accessor1.getTupleStartOffset(tIndex1);
+ int endOffset1 = accessor1.getTupleEndOffset(tIndex1);
+ int length1 = endOffset1 - startOffset1;
+
+ if (tupleDataEndOffset + length0 + length1 + 4 + (tupleCount + 1) * 4 <= ctx.getFrameSize()) {
+ ByteBuffer src0 = accessor0.getBuffer();
+ ByteBuffer src1 = accessor1.getBuffer();
+ int slotsLen0 = accessor0.getFieldSlotsLength();
+ int slotsLen1 = accessor1.getFieldSlotsLength();
+ int dataLen0 = length0 - slotsLen0;
+ int dataLen1 = length1 - slotsLen1;
+ // Copy slots from accessor0 verbatim
+ System.arraycopy(src0.array(), startOffset0, buffer.array(), tupleDataEndOffset, slotsLen0);
+ // Copy slots from accessor1 with the following transformation: newSlotIdx = oldSlotIdx + dataLen0
+ for (int i = 0; i < slotsLen1 / 2; ++i) {
+ buffer.putShort(tupleDataEndOffset + slotsLen0 + i * 2,
+ (short) (src1.getShort(startOffset1 + i * 2) + dataLen0));
+ }
+ // Copy data0
+ System.arraycopy(src0.array(), startOffset0 + slotsLen0, buffer.array(), tupleDataEndOffset + slotsLen0
+ + slotsLen1, dataLen0);
+ // Copy data1
+ System.arraycopy(src1.array(), startOffset1 + slotsLen1, buffer.array(), tupleDataEndOffset + slotsLen0
+ + slotsLen1 + dataLen0, dataLen1);
+ tupleDataEndOffset += (length0 + length1);
+ buffer.putInt(FrameHelper.getTupleCountOffset(ctx) - 4 * (tupleCount + 1), tupleDataEndOffset);
+ ++tupleCount;
+ buffer.putInt(FrameHelper.getTupleCountOffset(ctx), tupleCount);
+ return true;
+ }
+ return false;
+ }
+
+ public boolean appendProjection(FrameTupleAccessor accessor, int tIndex, int[] fields) {
+ int fTargetSlotsLength = fields.length * 2;
+ int length = fTargetSlotsLength;
+ for (int i = 0; i < fields.length; ++i) {
+ length += (accessor.getFieldEndOffset(tIndex, fields[i]) - accessor.getFieldStartOffset(tIndex, fields[i]));
+ }
+
+ if (tupleDataEndOffset + length + 4 + (tupleCount + 1) * 4 <= ctx.getFrameSize()) {
+ int fSrcSlotsLength = accessor.getFieldSlotsLength();
+ int tStartOffset = accessor.getTupleStartOffset(tIndex);
+
+ int fStartOffset = 0;
+ int fEndOffset = 0;
+ for (int i = 0; i < fields.length; ++i) {
+ int fSrcStart = tStartOffset + fSrcSlotsLength + accessor.getFieldStartOffset(tIndex, fields[i]);
+ int fLen = accessor.getFieldEndOffset(tIndex, fields[i])
+ - accessor.getFieldStartOffset(tIndex, fields[i]);
+ System.arraycopy(accessor.getBuffer().array(), fSrcStart, buffer.array(), tupleDataEndOffset
+ + fTargetSlotsLength + fStartOffset, fLen);
+ fEndOffset += fLen;
+ buffer.putShort(tupleDataEndOffset + i * 2, (short) fEndOffset);
+ fStartOffset = fEndOffset;
+ }
+ tupleDataEndOffset += length;
+ buffer.putInt(FrameHelper.getTupleCountOffset(ctx) - 4 * (tupleCount + 1), tupleDataEndOffset);
+ ++tupleCount;
+ buffer.putInt(FrameHelper.getTupleCountOffset(ctx), tupleCount);
+ return true;
+ }
+ return false;
+ }
+
+ public int getTupleCount() {
+ return tupleCount;
+ }
+
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTuplePairComparator.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTuplePairComparator.java
new file mode 100644
index 0000000..0b23383
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTuplePairComparator.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.io;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+
+public class FrameTuplePairComparator {
+ private final int[] keys0;
+ private final int[] keys1;
+ private final IBinaryComparator[] comparators;
+
+ public FrameTuplePairComparator(int[] keys0, int[] keys1, IBinaryComparator[] comparators) {
+ this.keys0 = keys0;
+ this.keys1 = keys1;
+ this.comparators = comparators;
+ }
+
+ public int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1) {
+ int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+ int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+ int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+ int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+ for (int i = 0; i < keys0.length; ++i) {
+ int fIdx0 = keys0[i];
+ int fStart0 = accessor0.getFieldStartOffset(tIndex0, fIdx0);
+ int fEnd0 = accessor0.getFieldEndOffset(tIndex0, fIdx0);
+ int fLen0 = fEnd0 - fStart0;
+
+ int fIdx1 = keys1[i];
+ int fStart1 = accessor1.getFieldStartOffset(tIndex1, fIdx1);
+ int fEnd1 = accessor1.getFieldEndOffset(tIndex1, fIdx1);
+ int fLen1 = fEnd1 - fStart1;
+
+ int c = comparators[i].compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
+ .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/SerializingDataWriter.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/SerializingDataWriter.java
new file mode 100644
index 0000000..593eb3b
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/SerializingDataWriter.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.io;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class SerializingDataWriter implements IOpenableDataWriter<Object[]> {
+ private static final Logger LOGGER = Logger.getLogger(SerializingDataWriter.class.getName());
+
+ private final ByteBuffer buffer;
+
+ private final ArrayTupleBuilder tb;
+
+ private final RecordDescriptor recordDescriptor;
+
+ private final IFrameWriter frameWriter;
+
+ private final FrameTupleAppender tupleAppender;
+
+ private boolean open;
+
+ public SerializingDataWriter(IHyracksContext ctx, RecordDescriptor recordDescriptor, IFrameWriter frameWriter) {
+ buffer = ctx.getResourceManager().allocateFrame();
+ tb = new ArrayTupleBuilder(recordDescriptor.getFields().length);
+ this.recordDescriptor = recordDescriptor;
+ this.frameWriter = frameWriter;
+ tupleAppender = new FrameTupleAppender(ctx);
+ open = false;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ frameWriter.open();
+ buffer.clear();
+ open = true;
+ tupleAppender.reset(buffer, true);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (!open) {
+ throw new HyracksDataException("Closing SerializingDataWriter that has not been opened");
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ flushFrame();
+ }
+ frameWriter.close();
+ open = false;
+ }
+
+ @Override
+ public void writeData(Object[] data) throws HyracksDataException {
+ if (!open) {
+ throw new HyracksDataException("Writing to SerializingDataWriter that has not been opened");
+ }
+ tb.reset();
+ for (int i = 0; i < data.length; ++i) {
+ Object instance = data[i];
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest(i + " " + instance);
+ }
+ tb.addField(recordDescriptor.getFields()[i], instance);
+ }
+ if (!tupleAppender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("Flushing: position = " + buffer.position());
+ }
+ flushFrame();
+ tupleAppender.reset(buffer, true);
+ if (!tupleAppender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ private void flushFrame() throws HyracksDataException {
+ buffer.position(0);
+ buffer.limit(buffer.capacity());
+ frameWriter.nextFrame(buffer);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/util/ByteBufferInputStream.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/util/ByteBufferInputStream.java
new file mode 100644
index 0000000..90f7433
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/util/ByteBufferInputStream.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.util;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ByteBufferInputStream extends InputStream {
+ private static final Logger LOGGER = Logger.getLogger(ByteBufferInputStream.class.getName());
+
+ private ByteBuffer buffer;
+
+ private int position;
+
+ public ByteBufferInputStream() {
+ }
+
+ public void setByteBuffer(ByteBuffer buffer, int position) {
+ this.buffer = buffer;
+ this.position = position;
+ }
+
+ @Override
+ public int read() {
+ int remaining = buffer.capacity() - position;
+ int value = remaining > 0 ? (buffer.get(position++) & 0xff) : -1;
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("read(): value: " + value + " remaining: " + remaining + " position: " + position);
+ }
+ return value;
+ }
+
+ @Override
+ public int read(byte[] bytes, int offset, int length) {
+ int remaining = buffer.capacity() - position;
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("read(bytes[], int, int): remaining: " + remaining + " offset: " + offset + " length: "
+ + length + " position: " + position);
+ }
+ if (remaining == 0) {
+ return -1;
+ }
+ int l = Math.min(length, remaining);
+ System.arraycopy(buffer.array(), position, bytes, offset, l);
+ position += l;
+ return l;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/util/FrameUtils.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/util/FrameUtils.java
new file mode 100644
index 0000000..bdce4a4
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/util/FrameUtils.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.util;
+
+import java.nio.ByteBuffer;
+
+public class FrameUtils {
+ public static void copy(ByteBuffer srcFrame, ByteBuffer destFrame) {
+ makeReadable(srcFrame);
+ destFrame.clear();
+ destFrame.put(srcFrame);
+ }
+
+ public static void makeReadable(ByteBuffer frame) {
+ frame.position(0);
+ frame.limit(frame.capacity());
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/util/ReflectionUtils.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/util/ReflectionUtils.java
new file mode 100644
index 0000000..bb391d7
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/util/ReflectionUtils.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.util;
+
+public class ReflectionUtils {
+ public static <T> T createInstance(Class<? extends T> klass) {
+ T instance = null;
+ try {
+ instance = klass.newInstance();
+ } catch (InstantiationException e) {
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ return instance;
+ }
+}
\ No newline at end of file