Update the tests to use the rewritten HyracksDataset API.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2837 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index 1e006a9..c1e0578 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -14,8 +14,10 @@
  */
 package edu.uci.ics.hyracks.tests.integration;
 
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
@@ -30,15 +32,20 @@
 
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.client.dataset.DatasetClientContext;
 import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
 import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 
 public abstract class AbstractIntegrationTest {
     private static final Logger LOGGER = Logger.getLogger(AbstractIntegrationTest.class.getName());
@@ -122,8 +129,46 @@
 
         // My code
         int nReaders = 5;
-        IHyracksDataset dataset = new HyracksDataset(hcc, spec, jobId, nReaders);
-        dataset.getResults();
+        DatasetClientContext datasetClientCtx = new DatasetClientContext(spec.getFrameSize());
+        IHyracksDataset hyracksDataset = new HyracksDataset(hcc, datasetClientCtx, nReaders);
+
+        hyracksDataset.open(jobId, spec.getResultSetIds().get(0));
+        RecordDescriptor recordDescriptor = hyracksDataset.getRecordDescriptor();
+        FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(datasetClientCtx.getFrameSize(),
+                recordDescriptor);
+
+        ByteBuffer readBuffer = datasetClientCtx.allocateFrame();
+        ByteBufferInputStream bbis = new ByteBufferInputStream();
+        DataInputStream di = new DataInputStream(bbis);
+
+        while (true) {
+            readBuffer.clear();
+            int size = hyracksDataset.read(readBuffer);
+            if (size <= 0) {
+                break;
+            }
+            try {
+                frameTupleAccessor.reset(readBuffer);
+                System.out.println("Tuple count: " + recordDescriptor);
+                for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
+                    int start = frameTupleAccessor.getTupleStartOffset(tIndex)
+                            + frameTupleAccessor.getFieldSlotsLength();
+                    bbis.setByteBuffer(readBuffer, start);
+                    Object[] record = new Object[recordDescriptor.getFieldCount()];
+                    for (int i = 0; i < record.length; ++i) {
+                        Object instance = recordDescriptor.getFields()[i].deserialize(di);
+                        if (i == 0) {
+                            System.out.print(String.valueOf(instance));
+                        } else {
+                            System.out.print(", " + String.valueOf(instance));
+                        }
+                    }
+                    System.out.println();
+                }
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
         // End of my code
 
         hcc.waitForCompletion(jobId);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
index e75bd2b..18c5ee0 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
@@ -75,7 +75,7 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true);
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -121,7 +121,7 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false);
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);