Refactored Hyracks into runtime/dataflow-common/dataflow-std/server

git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@38 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-common/.classpath b/hyracks-dataflow-common/.classpath
new file mode 100644
index 0000000..86f50f4
--- /dev/null
+++ b/hyracks-dataflow-common/.classpath
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+	<classpathentry kind="src" output="target/classes" path="src/main/java"/>
+	<classpathentry kind="src" path="src/test/java"/>
+	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
+	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
+	<classpathentry kind="output" path="target/classes"/>
+</classpath>
diff --git a/hyracks-dataflow-common/.project b/hyracks-dataflow-common/.project
new file mode 100644
index 0000000..8fd9acf
--- /dev/null
+++ b/hyracks-dataflow-common/.project
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+	<name>hyracks-dataflow-common</name>
+	<comment></comment>
+	<projects>
+	</projects>
+	<buildSpec>
+		<buildCommand>
+			<name>org.eclipse.jdt.core.javabuilder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+		<buildCommand>
+			<name>org.maven.ide.eclipse.maven2Builder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+	</buildSpec>
+	<natures>
+		<nature>org.maven.ide.eclipse.maven2Nature</nature>
+		<nature>org.eclipse.jdt.core.javanature</nature>
+	</natures>
+</projectDescription>
diff --git a/hyracks-dataflow-common/.settings/org.eclipse.jdt.core.prefs b/hyracks-dataflow-common/.settings/org.eclipse.jdt.core.prefs
new file mode 100644
index 0000000..8496bf4
--- /dev/null
+++ b/hyracks-dataflow-common/.settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,6 @@
+#Thu Jul 29 14:49:35 PDT 2010
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
+org.eclipse.jdt.core.compiler.compliance=1.6
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
+org.eclipse.jdt.core.compiler.source=1.6
diff --git a/hyracks-dataflow-common/.settings/org.maven.ide.eclipse.prefs b/hyracks-dataflow-common/.settings/org.maven.ide.eclipse.prefs
new file mode 100644
index 0000000..c411710
--- /dev/null
+++ b/hyracks-dataflow-common/.settings/org.maven.ide.eclipse.prefs
@@ -0,0 +1,9 @@
+#Thu Jul 29 14:49:35 PDT 2010
+activeProfiles=
+eclipse.preferences.version=1
+fullBuildGoals=process-test-resources
+includeModules=false
+resolveWorkspaceProjects=true
+resourceFilterGoals=process-resources resources\:testResources
+skipCompilerPlugin=true
+version=1
diff --git a/hyracks-dataflow-common/pom.xml b/hyracks-dataflow-common/pom.xml
new file mode 100644
index 0000000..589e93f
--- /dev/null
+++ b/hyracks-dataflow-common/pom.xml
@@ -0,0 +1,28 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>edu.uci.ics.hyracks</groupId>
+  <artifactId>hyracks-dataflow-common</artifactId>
+  <version>0.1.0</version>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-api</artifactId>
+  		<version>0.1.0</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+  </dependencies>
+</project>
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
new file mode 100644
index 0000000..bfc79e7
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
@@ -0,0 +1,190 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
+import edu.uci.ics.hyracks.api.comm.IDataReceiveListener;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+
+public class ConnectionEntry implements IConnectionEntry {
+    private static final Logger LOGGER = Logger.getLogger(ConnectionEntry.class.getName());
+
+    private SocketChannel socketChannel;
+
+    private final ByteBuffer readBuffer;
+
+    private final ByteBuffer writeBuffer;
+
+    private IDataReceiveListener recvListener;
+
+    private Object attachment;
+
+    private final SelectionKey key;
+
+    private UUID jobId;
+
+    private UUID stageId;
+
+    private boolean aborted;
+
+    public ConnectionEntry(IHyracksContext ctx, SocketChannel socketChannel, SelectionKey key) {
+        this.socketChannel = socketChannel;
+        readBuffer = ctx.getResourceManager().allocateFrame();
+        readBuffer.clear();
+        writeBuffer = ctx.getResourceManager().allocateFrame();
+        writeBuffer.clear();
+        this.key = key;
+    }
+
+    public SocketChannel getSocketChannel() {
+        return socketChannel;
+    }
+
+    public boolean dispatch(SelectionKey key) throws IOException {
+        if (aborted) {
+            recvListener.dataReceived(this);
+        } else {
+            if (key.isReadable()) {
+                if (LOGGER.isLoggable(Level.FINER)) {
+                    LOGGER.finer("Before read: " + readBuffer.position() + " " + readBuffer.limit());
+                }
+                int bytesRead = socketChannel.read(readBuffer);
+                if (bytesRead < 0) {
+                    recvListener.eos(this);
+                    return true;
+                }
+                if (LOGGER.isLoggable(Level.FINER)) {
+                    LOGGER.finer("After read: " + readBuffer.position() + " " + readBuffer.limit());
+                }
+                recvListener.dataReceived(this);
+            } else if (key.isWritable()) {
+                synchronized (this) {
+                    writeBuffer.flip();
+                    if (LOGGER.isLoggable(Level.FINER)) {
+                        LOGGER.finer("Before write: " + writeBuffer.position() + " " + writeBuffer.limit());
+                    }
+                    int bytesWritten = socketChannel.write(writeBuffer);
+                    if (bytesWritten < 0) {
+                        return true;
+                    }
+                    if (LOGGER.isLoggable(Level.FINER)) {
+                        LOGGER.finer("After write: " + writeBuffer.position() + " " + writeBuffer.limit());
+                    }
+                    if (writeBuffer.remaining() <= 0) {
+                        int ops = key.interestOps();
+                        key.interestOps(ops & ~SelectionKey.OP_WRITE);
+                    }
+                    writeBuffer.compact();
+                    notifyAll();
+                }
+            } else {
+                LOGGER.warning("Spurious event triggered: " + key.readyOps());
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public ByteBuffer getReadBuffer() {
+        return readBuffer;
+    }
+
+    @Override
+    public synchronized void write(ByteBuffer buffer) {
+        while (buffer.remaining() > 0) {
+            while (writeBuffer.remaining() <= 0) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                }
+            }
+            int oldLimit = buffer.limit();
+            buffer.limit(Math.min(oldLimit, writeBuffer.remaining()));
+            writeBuffer.put(buffer);
+            buffer.limit(oldLimit);
+            int ops = key.interestOps();
+            key.interestOps(ops | SelectionKey.OP_WRITE);
+            key.selector().wakeup();
+        }
+    }
+
+    @Override
+    public void setDataReceiveListener(IDataReceiveListener listener) {
+        this.recvListener = listener;
+    }
+
+    @Override
+    public void attach(Object attachment) {
+        this.attachment = attachment;
+    }
+
+    @Override
+    public Object getAttachment() {
+        return attachment;
+    }
+
+    @Override
+    public void close() {
+        try {
+            socketChannel.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public SelectionKey getSelectionKey() {
+        return key;
+    }
+
+    @Override
+    public UUID getJobId() {
+        return jobId;
+    }
+
+    @Override
+    public void setJobId(UUID jobId) {
+        this.jobId = jobId;
+    }
+
+    @Override
+    public UUID getStageId() {
+        return stageId;
+    }
+
+    @Override
+    public void setStageId(UUID stageId) {
+        this.stageId = stageId;
+    }
+
+    @Override
+    public void abort() {
+        aborted = true;
+    }
+
+    @Override
+    public boolean aborted() {
+        return aborted;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java
new file mode 100644
index 0000000..96fb70a
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
+import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class NonDeterministicFrameReader implements IFrameReader {
+    private static final Logger LOGGER = Logger.getLogger(NonDeterministicFrameReader.class.getName());
+
+    private final IHyracksContext ctx;
+    private final IConnectionDemultiplexer demux;
+    private int lastReadSender;
+    private boolean eos;
+
+    public NonDeterministicFrameReader(IHyracksContext ctx, IConnectionDemultiplexer demux) {
+        this.ctx = ctx;
+        this.demux = demux;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        lastReadSender = 0;
+        eos = false;
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+    }
+
+    @Override
+    public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        if (eos) {
+            return false;
+        }
+        while (true) {
+            IConnectionEntry entry = demux.findNextReadyEntry(lastReadSender);
+            if (entry.aborted()) {
+                eos = true;
+                return false;
+            }
+            lastReadSender = (Integer) entry.getAttachment();
+            ByteBuffer netBuffer = entry.getReadBuffer();
+            int tupleCount = netBuffer.getInt(FrameHelper.getTupleCountOffset(ctx));
+            if (LOGGER.isLoggable(Level.FINER)) {
+                LOGGER.finer("Frame Tuple Count: " + tupleCount);
+            }
+            if (tupleCount == 0) {
+                if (LOGGER.isLoggable(Level.FINE)) {
+                    LOGGER.fine("Empty Frame received: Closing " + lastReadSender);
+                }
+                int openEntries = demux.closeEntry(lastReadSender);
+                if (openEntries == 0) {
+                    eos = true;
+                    return false;
+                }
+                netBuffer.clear();
+                demux.unreadyEntry(lastReadSender);
+            } else {
+                buffer.clear();
+                buffer.put(netBuffer);
+                netBuffer.clear();
+                demux.unreadyEntry(lastReadSender);
+                return true;
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/SortMergeFrameReader.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/SortMergeFrameReader.java
new file mode 100644
index 0000000..5c97397
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/SortMergeFrameReader.java
@@ -0,0 +1,247 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Comparator;
+import java.util.PriorityQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
+import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.comm.io.FrameHelper;
+import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.comm.io.FrameTuplePairComparator;
+
+public class SortMergeFrameReader implements IFrameReader {
+    private static final Logger LOGGER = Logger.getLogger(SortMergeFrameReader.class.getName());
+
+    private final IHyracksContext ctx;
+    private final IConnectionDemultiplexer demux;
+    private final FrameTuplePairComparator tpc;
+    private final FrameTupleAppender appender;
+    private final RecordDescriptor recordDescriptor;
+    private Run[] runs;
+    private ByteBuffer[] frames;
+    private PriorityQueue<Integer> pQueue;
+    private int lastReadSender;
+    private boolean first;
+
+    public SortMergeFrameReader(IHyracksContext ctx, IConnectionDemultiplexer demux, int[] sortFields,
+            IBinaryComparator[] comparators, RecordDescriptor recordDescriptor) {
+        this.ctx = ctx;
+        this.demux = demux;
+        tpc = new FrameTuplePairComparator(sortFields, sortFields, comparators);
+        appender = new FrameTupleAppender(ctx);
+        this.recordDescriptor = recordDescriptor;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        int nSenders = demux.getSenderCount();
+        runs = new Run[nSenders];
+        frames = new ByteBuffer[nSenders];
+        for (int i = 0; i < runs.length; ++i) {
+            runs[i] = new Run(i);
+            frames[i] = ctx.getResourceManager().allocateFrame();
+        }
+        pQueue = new PriorityQueue<Integer>(nSenders, new Comparator<Integer>() {
+            @Override
+            public int compare(Integer o1, Integer o2) {
+                int i1 = o1.intValue();
+                int i2 = o2.intValue();
+                Run r1 = runs[i1];
+                Run r2 = runs[i2];
+
+                int c = tpc.compare(r1.accessor, r1.tIndex, r2.accessor, r2.tIndex);
+                return c == 0 ? (i1 < i2 ? -1 : 1) : c;
+            }
+        });
+        lastReadSender = 0;
+        first = true;
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        for (Run r : runs) {
+            r.close();
+        }
+    }
+
+    @Override
+    public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        buffer.clear();
+        buffer.position(buffer.capacity());
+        appender.reset(buffer, true);
+        if (first) {
+            for (int i = 0; i < runs.length; ++i) {
+                if (runs[i].next()) {
+                    pQueue.add(Integer.valueOf(i));
+                }
+            }
+        }
+        first = false;
+        while (true) {
+            if (pQueue.isEmpty()) {
+                return appender.getTupleCount() > 0;
+            }
+            Integer top = pQueue.peek();
+            Run run = runs[top.intValue()];
+            if (!appender.append(run.accessor, run.tIndex)) {
+                return true;
+            }
+            pQueue.remove();
+            if (run.next()) {
+                pQueue.add(top);
+            }
+        }
+    }
+
+    private class Run {
+        private final int runId;
+        private final File file;
+        private final FileChannel channel;
+        private final ByteBuffer frame;
+        private final FrameTupleAccessor accessor;
+        private int tIndex;
+        private long readFP;
+        private long writeFP;
+        private boolean eof;
+
+        public Run(int runId) throws HyracksDataException {
+            this.runId = runId;
+            try {
+                file = ctx.getResourceManager().createFile(SortMergeFrameReader.class.getSimpleName(), ".run");
+                RandomAccessFile raf = new RandomAccessFile(file, "rw");
+                channel = raf.getChannel();
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+            frame = ctx.getResourceManager().allocateFrame();
+            accessor = new FrameTupleAccessor(ctx, recordDescriptor);
+            readFP = 0;
+            writeFP = 0;
+            eof = false;
+        }
+
+        public void close() throws HyracksDataException {
+            try {
+                channel.close();
+                file.delete();
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        private void write(ByteBuffer frame) throws HyracksDataException {
+            try {
+                int len = frame.capacity();
+                while (len > 0) {
+                    int sz = channel.write(frame, writeFP);
+                    if (sz < 0) {
+                        throw new HyracksDataException("Error writing to run");
+                    }
+                    len -= sz;
+                    writeFP += sz;
+                }
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        private boolean next() throws HyracksDataException {
+            ++tIndex;
+            while (readFP == 0 || tIndex >= accessor.getTupleCount()) {
+                if (!read(frame)) {
+                    return false;
+                }
+                accessor.reset(frame);
+                tIndex = 0;
+            }
+            return true;
+        }
+
+        private boolean read(ByteBuffer frame) throws HyracksDataException {
+            while (!eof && readFP >= writeFP) {
+                spoolRuns(runId);
+            }
+            if (eof && readFP >= writeFP) {
+                return false;
+            }
+            try {
+                channel.position(readFP);
+                frame.clear();
+                int len = frame.capacity();
+                while (len > 0) {
+                    int sz = channel.read(frame, readFP);
+                    if (sz < 0) {
+                        throw new HyracksDataException("Error reading file");
+                    }
+                    len -= sz;
+                    readFP += sz;
+                }
+                return true;
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        private void eof() {
+            eof = true;
+        }
+    }
+
+    private void spoolRuns(int interestingRun) throws HyracksDataException {
+        while (true) {
+            IConnectionEntry entry = demux.findNextReadyEntry(lastReadSender);
+            lastReadSender = (Integer) entry.getAttachment();
+            ByteBuffer netBuffer = entry.getReadBuffer();
+            int tupleCount = netBuffer.getInt(FrameHelper.getTupleCountOffset(ctx));
+            if (LOGGER.isLoggable(Level.FINER)) {
+                LOGGER.finer("Frame Tuple Count: " + tupleCount);
+            }
+            if (tupleCount == 0) {
+                if (LOGGER.isLoggable(Level.FINE)) {
+                    LOGGER.fine("Empty Frame received: Closing " + lastReadSender);
+                }
+                int openEntries = demux.closeEntry(lastReadSender);
+                runs[lastReadSender].eof();
+                netBuffer.clear();
+                demux.unreadyEntry(lastReadSender);
+                if (openEntries == 0) {
+                    return;
+                }
+            } else {
+                runs[lastReadSender].write(netBuffer);
+                netBuffer.clear();
+                demux.unreadyEntry(lastReadSender);
+            }
+            if (lastReadSender == interestingRun) {
+                return;
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/ArrayTupleBuilder.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/ArrayTupleBuilder.java
new file mode 100644
index 0000000..368f7a3
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/ArrayTupleBuilder.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.io;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class ArrayTupleBuilder {
+    private final ByteArrayAccessibleOutputStream baaos;
+    private final DataOutputStream dos;
+    private final int[] fEndOffsets;
+    private int nextField;
+
+    public ArrayTupleBuilder(int nFields) {
+        baaos = new ByteArrayAccessibleOutputStream();
+        dos = new DataOutputStream(baaos);
+        fEndOffsets = new int[nFields];
+    }
+
+    public void reset() {
+        nextField = 0;
+        baaos.reset();
+    }
+
+    public int[] getFieldEndOffsets() {
+        return fEndOffsets;
+    }
+
+    public byte[] getByteArray() {
+        return baaos.getByteArray();
+    }
+
+    public int getSize() {
+        return baaos.size();
+    }
+
+    public void addField(FrameTupleAccessor accessor, int tIndex, int fIndex) throws HyracksDataException {
+        int startOffset = accessor.getTupleStartOffset(tIndex);
+        int fStartOffset = accessor.getFieldStartOffset(tIndex, fIndex);
+        int fLen = accessor.getFieldEndOffset(tIndex, fIndex) - fStartOffset;
+        try {
+            dos.write(accessor.getBuffer().array(), startOffset + accessor.getFieldSlotsLength() + fStartOffset, fLen);
+            if (FrameConstants.DEBUG_FRAME_IO) {
+                dos.writeInt(FrameConstants.FRAME_FIELD_MAGIC);
+            }
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+        fEndOffsets[nextField++] = baaos.size();
+    }
+
+    public <T> void addField(ISerializerDeserializer<T> serDeser, T instance) throws HyracksDataException {
+        serDeser.serialize(instance, dos);
+        fEndOffsets[nextField++] = baaos.size();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/ByteArrayAccessibleOutputStream.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/ByteArrayAccessibleOutputStream.java
new file mode 100644
index 0000000..41c771b
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/ByteArrayAccessibleOutputStream.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.io;
+
+import java.io.ByteArrayOutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ByteArrayAccessibleOutputStream extends ByteArrayOutputStream {
+    private static final Logger LOGGER = Logger.getLogger(ByteArrayAccessibleOutputStream.class.getName());
+
+    public byte[] getByteArray() {
+        return buf;
+    }
+
+    public void write(int b) {
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("write(byte) value: " + b);
+        }
+        super.write(b);
+    }
+
+    @Override
+    public void write(byte[] bytes, int offset, int length) {
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("write(byte[], int, int) offset: " + offset + " length" + length);
+        }
+        super.write(bytes, offset, length);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameConstants.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameConstants.java
new file mode 100644
index 0000000..93aab63
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameConstants.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.io;
+
+public interface FrameConstants {
+    public static final int SIZE_LEN = 4;
+
+    public static final boolean DEBUG_FRAME_IO = false;
+
+    public static final int FRAME_FIELD_MAGIC = 0x12345678;
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializer.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializer.java
new file mode 100644
index 0000000..499f692
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializer.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.io;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.comm.util.ByteBufferInputStream;
+
+public class FrameDeserializer {
+    private static final Logger LOGGER = Logger.getLogger(FrameDeserializer.class.getName());
+
+    private final ByteBufferInputStream bbis;
+
+    private final DataInputStream di;
+
+    private final RecordDescriptor recordDescriptor;
+
+    private final FrameTupleAccessor frameTupleAccessor;
+
+    private int tupleCount;
+
+    private int tIndex;
+
+    private ByteBuffer buffer;
+
+    public FrameDeserializer(IHyracksContext ctx, RecordDescriptor recordDescriptor) {
+        this.bbis = new ByteBufferInputStream();
+        this.di = new DataInputStream(bbis);
+        this.recordDescriptor = recordDescriptor;
+        frameTupleAccessor = new FrameTupleAccessor(ctx, recordDescriptor);
+    }
+
+    public void reset(ByteBuffer buffer) {
+        this.buffer = buffer;
+        frameTupleAccessor.reset(buffer);
+        tupleCount = frameTupleAccessor.getTupleCount();
+        tIndex = 0;
+    }
+
+    public boolean done() {
+        return tIndex >= tupleCount;
+    }
+
+    public Object[] deserializeRecord() throws HyracksDataException {
+        int start = frameTupleAccessor.getTupleStartOffset(tIndex) + frameTupleAccessor.getFieldSlotsLength();
+        bbis.setByteBuffer(buffer, start);
+        Object[] record = new Object[recordDescriptor.getFields().length];
+        for (int i = 0; i < record.length; ++i) {
+            Object instance = recordDescriptor.getFields()[i].deserialize(di);
+            if (LOGGER.isLoggable(Level.FINEST)) {
+                LOGGER.finest(i + " " + instance);
+            }
+            record[i] = instance;
+            if (FrameConstants.DEBUG_FRAME_IO) {
+                try {
+                    if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
+                        throw new HyracksDataException("Field magic mismatch");
+                    }
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("Read Record tIndex = " + tIndex + ", tupleCount = " + tupleCount);
+        }
+        ++tIndex;
+        return record;
+    }
+
+    public void close() {
+        try {
+            di.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataReader.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataReader.java
new file mode 100644
index 0000000..08f3472
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataReader.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.io;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOpenableDataReader;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class FrameDeserializingDataReader implements IOpenableDataReader<Object[]> {
+    private final ByteBuffer buffer;
+
+    private boolean eos;
+
+    private boolean first;
+
+    private final IFrameReader frameReader;
+
+    private final FrameDeserializer frameDeserializer;
+
+    public FrameDeserializingDataReader(IHyracksContext ctx, IFrameReader frameReader, RecordDescriptor recordDescriptor) {
+        buffer = ctx.getResourceManager().allocateFrame();
+        this.frameReader = frameReader;
+        this.frameDeserializer = new FrameDeserializer(ctx, recordDescriptor);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        frameReader.open();
+        buffer.clear();
+        buffer.flip();
+        eos = false;
+        first = true;
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        frameReader.close();
+        frameDeserializer.close();
+    }
+
+    @Override
+    public Object[] readData() throws HyracksDataException {
+        while (true) {
+            if (eos) {
+                return null;
+            }
+            if (!first && !frameDeserializer.done()) {
+                return frameDeserializer.deserializeRecord();
+            }
+            buffer.clear();
+            if (!frameReader.nextFrame(buffer)) {
+                eos = true;
+            } else {
+                frameDeserializer.reset(buffer);
+            }
+            first = false;
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataWriter.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataWriter.java
new file mode 100644
index 0000000..413ded6
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataWriter.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.io;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class FrameDeserializingDataWriter implements IFrameWriter {
+    private final IOpenableDataWriter<Object[]> writer;
+    private final FrameDeserializer frameDeserializer;
+
+    public FrameDeserializingDataWriter(IHyracksContext ctx, IOpenableDataWriter<Object[]> writer,
+            RecordDescriptor recordDescriptor) {
+        this.writer = writer;
+        this.frameDeserializer = new FrameDeserializer(ctx, recordDescriptor);
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        writer.close();
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        frameDeserializer.reset(buffer);
+        while (!frameDeserializer.done()) {
+            Object[] tuple = frameDeserializer.deserializeRecord();
+            writer.writeData(tuple);
+        }
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        writer.open();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameHelper.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameHelper.java
new file mode 100644
index 0000000..52ecfce
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameHelper.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.io;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+
+public class FrameHelper {
+    public static int getTupleCountOffset(IHyracksContext ctx) {
+        return ctx.getFrameSize() - 4;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAccessor.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAccessor.java
new file mode 100644
index 0000000..9a10136
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAccessor.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.io;
+
+import java.io.DataInputStream;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.comm.util.ByteBufferInputStream;
+
+/**
+ * FrameTupleCursor is used to navigate over tuples in a Frame.
+ * A frame is formatted with tuple data concatenated starting at offset 0, one tuple after another.
+ * Offset FS - 4 holds an int indicating the number of tuples (N) in the frame. FS - ((i + 1) * 4) for i from
+ * 0 to N - 1 holds an int indicating the offset of the (i + 1)^th tuple.
+ * Every tuple is organized as a sequence of shorts indicating the end of each field in the tuple relative to the end of the
+ * field slots.
+ * 
+ * @author vinayakb
+ */
+public final class FrameTupleAccessor implements IFrameTupleAccessor {
+    private final IHyracksContext ctx;
+    private final RecordDescriptor recordDescriptor;
+
+    private ByteBuffer buffer;
+
+    public FrameTupleAccessor(IHyracksContext ctx, RecordDescriptor recordDescriptor) {
+        this.ctx = ctx;
+        this.recordDescriptor = recordDescriptor;
+    }
+
+    @Override
+    public void reset(ByteBuffer buffer) {
+        this.buffer = buffer;
+    }
+
+    @Override
+    public ByteBuffer getBuffer() {
+        return buffer;
+    }
+
+    @Override
+    public int getTupleCount() {
+        return buffer.getInt(FrameHelper.getTupleCountOffset(ctx));
+    }
+
+    @Override
+    public int getTupleStartOffset(int tupleIndex) {
+        return tupleIndex == 0 ? 0 : buffer.getInt(FrameHelper.getTupleCountOffset(ctx) - 4 * tupleIndex);
+    }
+
+    @Override
+    public int getTupleEndOffset(int tupleIndex) {
+        return buffer.getInt(FrameHelper.getTupleCountOffset(ctx) - 4 * (tupleIndex + 1));
+    }
+
+    @Override
+    public int getFieldStartOffset(int tupleIndex, int fIdx) {
+        return fIdx == 0 ? 0 : buffer.getShort(getTupleStartOffset(tupleIndex) + (fIdx - 1) * 2);
+    }
+
+    @Override
+    public int getFieldEndOffset(int tupleIndex, int fIdx) {
+        return buffer.getShort(getTupleStartOffset(tupleIndex) + fIdx * 2);
+    }
+
+    @Override
+    public int getFieldSlotsLength() {
+        return recordDescriptor.getFields().length * 2;
+    }
+
+    public void prettyPrint() {
+        ByteBufferInputStream bbis = new ByteBufferInputStream();
+        DataInputStream dis = new DataInputStream(bbis);
+        int tc = getTupleCount();
+        System.err.println("TC: " + tc);
+        for (int i = 0; i < tc; ++i) {
+            System.err.print(i + ":(" + getTupleStartOffset(i) + ", " + getTupleEndOffset(i) + ")[");
+            for (int j = 0; j < recordDescriptor.getFields().length; ++j) {
+                System.err.print(j + ":(" + getFieldStartOffset(i, j) + ", " + getFieldEndOffset(i, j) + ") ");
+                System.err.print("{");
+                bbis.setByteBuffer(buffer, getTupleStartOffset(i) + getFieldSlotsLength() + getFieldStartOffset(i, j));
+                try {
+                    System.err.print(recordDescriptor.getFields()[j].deserialize(dis));
+                } catch (HyracksDataException e) {
+                    e.printStackTrace();
+                }
+                System.err.print("}");
+            }
+            System.err.println("]");
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java
new file mode 100644
index 0000000..c57e589
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.io;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+
+public class FrameTupleAppender {
+    private final IHyracksContext ctx;
+
+    private ByteBuffer buffer;
+
+    private int tupleCount;
+
+    private int tupleDataEndOffset;
+
+    public FrameTupleAppender(IHyracksContext ctx) {
+        this.ctx = ctx;
+    }
+
+    public void reset(ByteBuffer buffer, boolean clear) {
+        this.buffer = buffer;
+        if (clear) {
+            buffer.putInt(FrameHelper.getTupleCountOffset(ctx), 0);
+            tupleCount = 0;
+            tupleDataEndOffset = 0;
+        } else {
+            tupleCount = buffer.getInt(FrameHelper.getTupleCountOffset(ctx));
+            tupleDataEndOffset = tupleCount == 0 ? 0 : buffer.getInt(FrameHelper.getTupleCountOffset(ctx) - tupleCount
+                    * 4);
+        }
+    }
+
+    public boolean append(int[] fieldSlots, byte[] bytes, int offset, int length) {
+        if (tupleDataEndOffset + fieldSlots.length * 2 + length + 4 + (tupleCount + 1) * 4 <= ctx.getFrameSize()) {
+            for (int i = 0; i < fieldSlots.length; ++i) {
+                buffer.putShort(tupleDataEndOffset + i * 2, (short) fieldSlots[i]);
+            }
+            System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset + fieldSlots.length * 2, length);
+            tupleDataEndOffset += fieldSlots.length * 2 + length;
+            buffer.putInt(FrameHelper.getTupleCountOffset(ctx) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            ++tupleCount;
+            buffer.putInt(FrameHelper.getTupleCountOffset(ctx), tupleCount);
+            return true;
+        }
+        return false;
+    }
+
+    public boolean append(FrameTupleAccessor tupleAccessor, int tIndex) {
+        int startOffset = tupleAccessor.getTupleStartOffset(tIndex);
+        int endOffset = tupleAccessor.getTupleEndOffset(tIndex);
+        int length = endOffset - startOffset;
+        if (tupleDataEndOffset + length + 4 + (tupleCount + 1) * 4 <= ctx.getFrameSize()) {
+            ByteBuffer src = tupleAccessor.getBuffer();
+            System.arraycopy(src.array(), startOffset, buffer.array(), tupleDataEndOffset, length);
+            tupleDataEndOffset += length;
+            buffer.putInt(FrameHelper.getTupleCountOffset(ctx) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            ++tupleCount;
+            buffer.putInt(FrameHelper.getTupleCountOffset(ctx), tupleCount);
+            return true;
+        }
+        return false;
+    }
+
+    public boolean appendConcat(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1) {
+        int startOffset0 = accessor0.getTupleStartOffset(tIndex0);
+        int endOffset0 = accessor0.getTupleEndOffset(tIndex0);
+        int length0 = endOffset0 - startOffset0;
+
+        int startOffset1 = accessor1.getTupleStartOffset(tIndex1);
+        int endOffset1 = accessor1.getTupleEndOffset(tIndex1);
+        int length1 = endOffset1 - startOffset1;
+
+        if (tupleDataEndOffset + length0 + length1 + 4 + (tupleCount + 1) * 4 <= ctx.getFrameSize()) {
+            ByteBuffer src0 = accessor0.getBuffer();
+            ByteBuffer src1 = accessor1.getBuffer();
+            int slotsLen0 = accessor0.getFieldSlotsLength();
+            int slotsLen1 = accessor1.getFieldSlotsLength();
+            int dataLen0 = length0 - slotsLen0;
+            int dataLen1 = length1 - slotsLen1;
+            // Copy slots from accessor0 verbatim
+            System.arraycopy(src0.array(), startOffset0, buffer.array(), tupleDataEndOffset, slotsLen0);
+            // Copy slots from accessor1 with the following transformation: newSlotIdx = oldSlotIdx + dataLen0
+            for (int i = 0; i < slotsLen1 / 2; ++i) {
+                buffer.putShort(tupleDataEndOffset + slotsLen0 + i * 2,
+                        (short) (src1.getShort(startOffset1 + i * 2) + dataLen0));
+            }
+            // Copy data0
+            System.arraycopy(src0.array(), startOffset0 + slotsLen0, buffer.array(), tupleDataEndOffset + slotsLen0
+                    + slotsLen1, dataLen0);
+            // Copy data1
+            System.arraycopy(src1.array(), startOffset1 + slotsLen1, buffer.array(), tupleDataEndOffset + slotsLen0
+                    + slotsLen1 + dataLen0, dataLen1);
+            tupleDataEndOffset += (length0 + length1);
+            buffer.putInt(FrameHelper.getTupleCountOffset(ctx) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            ++tupleCount;
+            buffer.putInt(FrameHelper.getTupleCountOffset(ctx), tupleCount);
+            return true;
+        }
+        return false;
+    }
+
+    public boolean appendProjection(FrameTupleAccessor accessor, int tIndex, int[] fields) {
+        int fTargetSlotsLength = fields.length * 2;
+        int length = fTargetSlotsLength;
+        for (int i = 0; i < fields.length; ++i) {
+            length += (accessor.getFieldEndOffset(tIndex, fields[i]) - accessor.getFieldStartOffset(tIndex, fields[i]));
+        }
+
+        if (tupleDataEndOffset + length + 4 + (tupleCount + 1) * 4 <= ctx.getFrameSize()) {
+            int fSrcSlotsLength = accessor.getFieldSlotsLength();
+            int tStartOffset = accessor.getTupleStartOffset(tIndex);
+
+            int fStartOffset = 0;
+            int fEndOffset = 0;
+            for (int i = 0; i < fields.length; ++i) {
+                int fSrcStart = tStartOffset + fSrcSlotsLength + accessor.getFieldStartOffset(tIndex, fields[i]);
+                int fLen = accessor.getFieldEndOffset(tIndex, fields[i])
+                        - accessor.getFieldStartOffset(tIndex, fields[i]);
+                System.arraycopy(accessor.getBuffer().array(), fSrcStart, buffer.array(), tupleDataEndOffset
+                        + fTargetSlotsLength + fStartOffset, fLen);
+                fEndOffset += fLen;
+                buffer.putShort(tupleDataEndOffset + i * 2, (short) fEndOffset);
+                fStartOffset = fEndOffset;
+            }
+            tupleDataEndOffset += length;
+            buffer.putInt(FrameHelper.getTupleCountOffset(ctx) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            ++tupleCount;
+            buffer.putInt(FrameHelper.getTupleCountOffset(ctx), tupleCount);
+            return true;
+        }
+        return false;
+    }
+
+    public int getTupleCount() {
+        return tupleCount;
+    }
+
+    public ByteBuffer getBuffer() {
+        return buffer;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTuplePairComparator.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTuplePairComparator.java
new file mode 100644
index 0000000..0b23383
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTuplePairComparator.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.io;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+
+public class FrameTuplePairComparator {
+    private final int[] keys0;
+    private final int[] keys1;
+    private final IBinaryComparator[] comparators;
+
+    public FrameTuplePairComparator(int[] keys0, int[] keys1, IBinaryComparator[] comparators) {
+        this.keys0 = keys0;
+        this.keys1 = keys1;
+        this.comparators = comparators;
+    }
+
+    public int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1) {
+        int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+        int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+        int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+        int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+        for (int i = 0; i < keys0.length; ++i) {
+            int fIdx0 = keys0[i];
+            int fStart0 = accessor0.getFieldStartOffset(tIndex0, fIdx0);
+            int fEnd0 = accessor0.getFieldEndOffset(tIndex0, fIdx0);
+            int fLen0 = fEnd0 - fStart0;
+
+            int fIdx1 = keys1[i];
+            int fStart1 = accessor1.getFieldStartOffset(tIndex1, fIdx1);
+            int fEnd1 = accessor1.getFieldEndOffset(tIndex1, fIdx1);
+            int fLen1 = fEnd1 - fStart1;
+
+            int c = comparators[i].compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
+                    .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+            if (c != 0) {
+                return c;
+            }
+        }
+        return 0;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/SerializingDataWriter.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/SerializingDataWriter.java
new file mode 100644
index 0000000..593eb3b
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/io/SerializingDataWriter.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.io;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class SerializingDataWriter implements IOpenableDataWriter<Object[]> {
+    private static final Logger LOGGER = Logger.getLogger(SerializingDataWriter.class.getName());
+
+    private final ByteBuffer buffer;
+
+    private final ArrayTupleBuilder tb;
+
+    private final RecordDescriptor recordDescriptor;
+
+    private final IFrameWriter frameWriter;
+
+    private final FrameTupleAppender tupleAppender;
+
+    private boolean open;
+
+    public SerializingDataWriter(IHyracksContext ctx, RecordDescriptor recordDescriptor, IFrameWriter frameWriter) {
+        buffer = ctx.getResourceManager().allocateFrame();
+        tb = new ArrayTupleBuilder(recordDescriptor.getFields().length);
+        this.recordDescriptor = recordDescriptor;
+        this.frameWriter = frameWriter;
+        tupleAppender = new FrameTupleAppender(ctx);
+        open = false;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        frameWriter.open();
+        buffer.clear();
+        open = true;
+        tupleAppender.reset(buffer, true);
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (!open) {
+            throw new HyracksDataException("Closing SerializingDataWriter that has not been opened");
+        }
+        if (tupleAppender.getTupleCount() > 0) {
+            flushFrame();
+        }
+        frameWriter.close();
+        open = false;
+    }
+
+    @Override
+    public void writeData(Object[] data) throws HyracksDataException {
+        if (!open) {
+            throw new HyracksDataException("Writing to SerializingDataWriter that has not been opened");
+        }
+        tb.reset();
+        for (int i = 0; i < data.length; ++i) {
+            Object instance = data[i];
+            if (LOGGER.isLoggable(Level.FINEST)) {
+                LOGGER.finest(i + " " + instance);
+            }
+            tb.addField(recordDescriptor.getFields()[i], instance);
+        }
+        if (!tupleAppender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+            if (LOGGER.isLoggable(Level.FINEST)) {
+                LOGGER.finest("Flushing: position = " + buffer.position());
+            }
+            flushFrame();
+            tupleAppender.reset(buffer, true);
+            if (!tupleAppender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                throw new IllegalStateException();
+            }
+        }
+    }
+
+    private void flushFrame() throws HyracksDataException {
+        buffer.position(0);
+        buffer.limit(buffer.capacity());
+        frameWriter.nextFrame(buffer);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/util/ByteBufferInputStream.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/util/ByteBufferInputStream.java
new file mode 100644
index 0000000..90f7433
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/util/ByteBufferInputStream.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.util;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ByteBufferInputStream extends InputStream {
+    private static final Logger LOGGER = Logger.getLogger(ByteBufferInputStream.class.getName());
+
+    private ByteBuffer buffer;
+
+    private int position;
+
+    public ByteBufferInputStream() {
+    }
+
+    public void setByteBuffer(ByteBuffer buffer, int position) {
+        this.buffer = buffer;
+        this.position = position;
+    }
+
+    @Override
+    public int read() {
+        int remaining = buffer.capacity() - position;
+        int value = remaining > 0 ? (buffer.get(position++) & 0xff) : -1;
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("read(): value: " + value + " remaining: " + remaining + " position: " + position);
+        }
+        return value;
+    }
+
+    @Override
+    public int read(byte[] bytes, int offset, int length) {
+        int remaining = buffer.capacity() - position;
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("read(bytes[], int, int): remaining: " + remaining + " offset: " + offset + " length: "
+                    + length + " position: " + position);
+        }
+        if (remaining == 0) {
+            return -1;
+        }
+        int l = Math.min(length, remaining);
+        System.arraycopy(buffer.array(), position, bytes, offset, l);
+        position += l;
+        return l;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/util/FrameUtils.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/util/FrameUtils.java
new file mode 100644
index 0000000..bdce4a4
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/comm/util/FrameUtils.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.util;
+
+import java.nio.ByteBuffer;
+
+public class FrameUtils {
+    public static void copy(ByteBuffer srcFrame, ByteBuffer destFrame) {
+        makeReadable(srcFrame);
+        destFrame.clear();
+        destFrame.put(srcFrame);
+    }
+
+    public static void makeReadable(ByteBuffer frame) {
+        frame.position(0);
+        frame.limit(frame.capacity());
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/util/ReflectionUtils.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/util/ReflectionUtils.java
new file mode 100644
index 0000000..bb391d7
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/util/ReflectionUtils.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.util;
+
+public class ReflectionUtils {
+    public static <T> T createInstance(Class<? extends T> klass) {
+        T instance = null;
+        try {
+            instance = klass.newInstance();
+        } catch (InstantiationException e) {
+            throw new RuntimeException(e);
+        } catch (IllegalAccessException e) {
+            throw new RuntimeException(e);
+        }
+        return instance;
+    }
+}
\ No newline at end of file