Making HHJ Code Available for review

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_sort_join_opts@665 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/io/RunFileReader.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/io/RunFileReader.java
index 063d0f7..d0148f3 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/io/RunFileReader.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/io/RunFileReader.java
@@ -42,4 +42,8 @@
     public void close() throws HyracksDataException {
         ioManager.close(handle);
     }
+    
+    public long getFileSize() {
+        return size;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index 7f33bc4..e0035e7 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -50,6 +50,8 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
+import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
 
 public class GraceHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final int RPARTITION_ACTIVITY_ID = 0;
@@ -304,15 +306,18 @@
                         ByteBuffer buffer = ctx.allocateFrame();// input
                         // buffer
                         int tableSize = (int) (numPartitions * recordsPerFrame * factor);
+                        ISerializableTable table = new SerializableHashTable(tableSize, ctx);
+                        
                         for (int partitionid = 0; partitionid < numPartitions; partitionid++) {
                             RunFileWriter buildWriter = buildWriters[partitionid];
                             RunFileWriter probeWriter = probeWriters[partitionid];
                             if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
                                 continue;
                             }
+                            table.reset();
                             joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(),
                                     rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpcRep1,
-                                    new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1);
+                                    new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1, table);
 
                             // build
                             if (buildWriter != null) {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 23064f0..66aa8fb 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -51,6 +51,8 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
+import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
 
 public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0;
@@ -306,10 +308,11 @@
                     ITuplePartitionComputer hpc1 = new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)
                             .createPartitioner();
                     int tableSize = (int) (state.memoryForHashtable * recordsPerFrame * factor);
+                    ISerializableTable table = new SerializableHashTable(tableSize, ctx);
                     state.joiner = new InMemoryHashJoin(ctx, tableSize,
                             new FrameTupleAccessor(ctx.getFrameSize(), rd0), hpc0, new FrameTupleAccessor(
                                     ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(keys0, keys1,
-                                    comparators), isLeftOuter, nullWriters1);
+                                    comparators), isLeftOuter, nullWriters1, table);
                     bufferForPartitions = new ByteBuffer[state.nPartitions];
                     state.fWriters = new RunFileWriter[state.nPartitions];
                     for (int i = 0; i < state.nPartitions; i++) {
@@ -487,11 +490,11 @@
                             if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
                                 continue;
                             }
-
+                            ISerializableTable table = new SerializableHashTable(tableSize, ctx);
                             InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
                                     ctx.getFrameSize(), rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1),
                                     hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
-                                    nullWriters1);
+                                    nullWriters1, table);
 
                             if (buildWriter != null) {
                                 RunFileReader buildReader = buildWriter.createReader();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index fad12c1..3e5e30f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -29,9 +29,11 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
+import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
 
 public class InMemoryHashJoin {
-    private final Link[] table;
+	
     private final List<ByteBuffer> buffers;
     private final FrameTupleAccessor accessorBuild;
     private final ITuplePartitionComputer tpcBuild;
@@ -42,13 +44,18 @@
     private final ByteBuffer outBuffer;
     private final boolean isLeftOuter;
     private final ArrayTupleBuilder nullTupleBuild;
-
+    private final ISerializableTable table;
+	private final int tableSize;
+    private final TuplePointer storedTuplePointer;
+    
     public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
             ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
-            FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1)
+            FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1, ISerializableTable table)
             throws HyracksDataException {
-        table = new Link[tableSize];
-        buffers = new ArrayList<ByteBuffer>();
+    	this.tableSize = tableSize;
+       	this.table = table;
+       	storedTuplePointer = new TuplePointer();
+       	buffers = new ArrayList<ByteBuffer>();
         this.accessorBuild = accessor1;
         this.tpcBuild = tpc1;
         this.accessorProbe = accessor0;
@@ -77,13 +84,10 @@
         accessorBuild.reset(buffer);
         int tCount = accessorBuild.getTupleCount();
         for (int i = 0; i < tCount; ++i) {
-            int entry = tpcBuild.partition(accessorBuild, i, table.length);
-            long tPointer = (((long) bIndex) << 32) + i;
-            Link link = table[entry];
-            if (link == null) {
-                link = table[entry] = new Link();
-            }
-            link.add(tPointer);
+            int entry = tpcBuild.partition(accessorBuild, i, tableSize);
+            storedTuplePointer.frameIndex = bIndex;
+            storedTuplePointer.tupleIndex = i;
+            table.insert(entry, storedTuplePointer);
         }
     }
 
@@ -91,28 +95,29 @@
         accessorProbe.reset(buffer);
         int tupleCount0 = accessorProbe.getTupleCount();
         for (int i = 0; i < tupleCount0; ++i) {
-            int entry = tpcProbe.partition(accessorProbe, i, table.length);
-            Link link = table[entry];
+            int entry = tpcProbe.partition(accessorProbe, i, tableSize);
             boolean matchFound = false;
-            if (link != null) {
-                for (int j = 0; j < link.size; ++j) {
-                    long pointer = link.pointers[j];
-                    int bIndex = (int) ((pointer >> 32) & 0xffffffff);
-                    int tIndex = (int) (pointer & 0xffffffff);
-                    accessorBuild.reset(buffers.get(bIndex));
-                    int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
-                    if (c == 0) {
-                        matchFound = true;
+            int offset = 0;
+            do {
+                table.getTuplePointer(entry, offset++, storedTuplePointer);
+                if (storedTuplePointer.frameIndex < 0)
+                    break;
+                int bIndex = storedTuplePointer.frameIndex;
+                int tIndex = storedTuplePointer.tupleIndex;
+                accessorBuild.reset(buffers.get(bIndex));
+                int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
+                if (c == 0) {
+                    matchFound = true;
+                    if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
+                        flushFrame(outBuffer, writer);
+                        appender.reset(outBuffer, true);
                         if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
-                            flushFrame(outBuffer, writer);
-                            appender.reset(outBuffer, true);
-                            if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
-                                throw new IllegalStateException();
-                            }
+                            throw new IllegalStateException();
                         }
                     }
                 }
-            }
+            } while (true);
+
             if (!matchFound && isLeftOuter) {
                 if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
                         nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 4a13e16..a1fc0b8 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -45,6 +45,8 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
+import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
 
 public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final int BUILD_ACTIVITY_ID = 0;
@@ -161,10 +163,11 @@
                             .createPartitioner();
                     state = new HashBuildTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
                             partition));
+                    ISerializableTable table = new SerializableHashTable(tableSize, ctx);
                     state.joiner = new InMemoryHashJoin(ctx, tableSize,
                             new FrameTupleAccessor(ctx.getFrameSize(), rd0), hpc0, new FrameTupleAccessor(
                                     ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(keys0, keys1,
-                                    comparators), isLeftOuter, nullWriters1);
+                                    comparators), isLeftOuter, nullWriters1, table);
                 }
 
                 @Override