Add the additional datasetIPAddress parameter required for the NCConfig in all the tests and implement the basic infrastructure required to run hyracks integration tests using ResultWriterOperatorDescriptor.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@3061 123451ca-8445-de46-9d55-352943316053
diff --git a/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java b/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
index 17dc5b1..dad7cd0 100644
--- a/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
+++ b/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
@@ -55,6 +55,7 @@
         ncConfig1.ccPort = TEST_HYRACKS_CC_CLUSTER_NET_PORT;
         ncConfig1.clusterNetIPAddress = "127.0.0.1";
         ncConfig1.dataIPAddress = "127.0.0.1";
+        ncConfig1.datasetIPAddress = "127.0.0.1";
         ncConfig1.nodeId = NC1_ID;
         nc1 = new NodeControllerService(ncConfig1);
         nc1.start();
@@ -64,6 +65,7 @@
         ncConfig2.ccPort = TEST_HYRACKS_CC_CLUSTER_NET_PORT;
         ncConfig2.clusterNetIPAddress = "127.0.0.1";
         ncConfig2.dataIPAddress = "127.0.0.1";
+        ncConfig2.datasetIPAddress = "127.0.0.1";
         ncConfig2.nodeId = NC2_ID;
         nc2 = new NodeControllerService(ncConfig2);
         nc2.start();
diff --git a/hivesterix/src/test/java/edu/uci/ics/hivesterix/perf/base/AbstractPerfTestSuiteClass.java b/hivesterix/src/test/java/edu/uci/ics/hivesterix/perf/base/AbstractPerfTestSuiteClass.java
index 8d50f47..3d66f59 100644
--- a/hivesterix/src/test/java/edu/uci/ics/hivesterix/perf/base/AbstractPerfTestSuiteClass.java
+++ b/hivesterix/src/test/java/edu/uci/ics/hivesterix/perf/base/AbstractPerfTestSuiteClass.java
@@ -115,6 +115,7 @@
 			ncConfig.clusterNetIPAddress = ipAddress;

 			ncConfig.ccPort = clientPort;

 			ncConfig.dataIPAddress = "127.0.0.1";

+            ncConfig.datasetIPAddress = "127.0.0.1";

 			ncConfig.nodeId = "nc" + i;

 			NodeControllerService nc = new NodeControllerService(ncConfig);

 			nc.start();

diff --git a/hivesterix/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java b/hivesterix/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java
index ca2731b..64ad495 100644
--- a/hivesterix/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java
+++ b/hivesterix/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java
@@ -118,6 +118,7 @@
 			ncConfig.clusterNetIPAddress = ipAddress;

 			ncConfig.ccPort = netPort;

 			ncConfig.dataIPAddress = "127.0.0.1";

+            ncConfig.datasetIPAddress = "127.0.0.1";

 			ncConfig.nodeId = "nc" + i;

 			NodeControllerService nc = new NodeControllerService(ncConfig);

 			nc.start();

diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index 2fcd626..8893567 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -14,17 +14,19 @@
  */
 package edu.uci.ics.hyracks.tests.integration;
 
+import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileReader;
 import java.io.IOException;
-import java.io.PrintStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.commons.io.FileUtils;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
@@ -32,19 +34,19 @@
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializer;
-import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetReader;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.client.dataset.DatasetClientContext;
 import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
 import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 
 public abstract class AbstractIntegrationTest {
     private static final Logger LOGGER = Logger.getLogger(AbstractIntegrationTest.class.getName());
@@ -117,7 +119,7 @@
         cc.stop();
     }
 
-    protected void runTest(JobSpecification spec) throws Exception {
+    protected JobId executeTest(JobSpecification spec) throws Exception {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(spec.toJSON().toString(2));
         }
@@ -125,69 +127,72 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(jobId.toString());
         }
-
-        // My code
-        int nReaders = 5;
-        DatasetClientContext datasetClientCtx = new DatasetClientContext(spec.getFrameSize());
-        IHyracksDataset hyracksDataset = new HyracksDataset(hcc, datasetClientCtx, nReaders);
-
-        hyracksDataset.open(jobId, spec.getResultSetIds().get(0));
-        /*        FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(datasetClientCtx.getFrameSize(),
-                        recordDescriptor);
-
-                ByteBuffer readBuffer = datasetClientCtx.allocateFrame();
-                ByteBufferInputStream bbis = new ByteBufferInputStream();
-                DataInputStream di = new DataInputStream(bbis);
-
-                while (true) {
-                    readBuffer.clear();
-                    int size = hyracksDataset.read(readBuffer);
-                    if (size <= 0) {
-                        break;
-                    }
-                    try {
-                        frameTupleAccessor.reset(readBuffer);
-                        System.out.println("Tuple count: " + recordDescriptor);
-                        for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
-                            int start = frameTupleAccessor.getTupleStartOffset(tIndex)
-                                    + frameTupleAccessor.getFieldSlotsLength();
-                            bbis.setByteBuffer(readBuffer, start);
-                            Object[] record = new Object[recordDescriptor.getFieldCount()];
-                            for (int i = 0; i < record.length; ++i) {
-                                Object instance = recordDescriptor.getFields()[i].deserialize(di);
-                                if (i == 0) {
-                                    System.out.print(String.valueOf(instance));
-                                } else {
-                                    System.out.print(", " + String.valueOf(instance));
-                                }
-                            }
-                            System.out.println();
-                        }
-                    } catch (IOException e) {
-                        throw new HyracksDataException(e);
-                    }
-                }
-                // End of my code
-        */
-        hcc.waitForCompletion(jobId);
-        dumpOutputFiles();
+        return jobId;
     }
 
-    private void dumpOutputFiles() {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            for (File f : outputFiles) {
-                if (f.exists() && f.isFile()) {
-                    try {
-                        LOGGER.info("Reading file: " + f.getAbsolutePath() + " in test: " + getClass().getName());
-                        String data = FileUtils.readFileToString(f);
-                        LOGGER.info(data);
-                    } catch (IOException e) {
-                        LOGGER.info("Error reading file: " + f.getAbsolutePath());
-                        LOGGER.info(e.getMessage());
-                    }
+    protected void runTest(JobSpecification spec) throws Exception {
+        JobId jobId = executeTest(spec);
+        hcc.waitForCompletion(jobId);
+    }
+
+    protected List<String> readResults(JobSpecification spec, JobId jobId, ResultSetId resultSetId) throws Exception {
+        int nReaders = 1;
+        ByteBuffer resultBuffer = ByteBuffer.allocate(spec.getFrameSize());
+        resultBuffer.clear();
+
+        IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor(spec.getFrameSize());
+
+        IHyracksDataset hyracksDataset = new HyracksDataset(hcc, spec.getFrameSize(), nReaders);
+        IHyracksDatasetReader reader = hyracksDataset.createReader(jobId, resultSetId);
+
+        List<String> resultRecords = new ArrayList<String>();
+        ByteBufferInputStream bbis = new ByteBufferInputStream();
+
+        int readSize = reader.read(resultBuffer);
+
+        while (readSize > 0) {
+
+            try {
+                frameTupleAccessor.reset(resultBuffer);
+                for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
+                    int start = frameTupleAccessor.getTupleStartOffset(tIndex);
+                    int length = frameTupleAccessor.getTupleEndOffset(tIndex) - start;
+                    bbis.setByteBuffer(resultBuffer, start);
+                    byte[] recordBytes = new byte[length];
+                    bbis.read(recordBytes, 0, length);
+                    resultRecords.add(new String(recordBytes, 0, length));
                 }
+            } finally {
+                bbis.close();
             }
+
+            resultBuffer.clear();
+            readSize = reader.read(resultBuffer);
         }
+        return resultRecords;
+    }
+
+    protected boolean runTestAndCompareResults(JobSpecification spec, String[] expectedFileNames) throws Exception {
+        JobId jobId = executeTest(spec);
+
+        List<String> results;
+        for (int i = 0; i < expectedFileNames.length; i++) {
+            results = readResults(spec, jobId, spec.getResultSetIds().get(i));
+            BufferedReader expectedFile = new BufferedReader(new FileReader(expectedFileNames[i]));
+
+            String expectedLine, actualLine;
+            int j = 0;
+            while ((expectedLine = expectedFile.readLine()) != null) {
+                actualLine = results.get(j).trim();
+                Assert.assertEquals(expectedLine, actualLine);
+                j++;
+            }
+            Assert.assertEquals(j, results.size());
+            expectedFile.close();
+        }
+
+        hcc.waitForCompletion(jobId);
+        return true;
     }
 
     protected File createTempFile() throws IOException {
@@ -198,27 +203,4 @@
         outputFiles.add(tempFile);
         return tempFile;
     }
-
-    protected IResultSerializerFactory getResultSerializedAppenderFactory() {
-        return new IResultSerializerFactory() {
-            private static final long serialVersionUID = 1L;
-
-            @Override
-            public IResultSerializer createResultSerializer(final PrintStream printStream) {
-                return new IResultSerializer() {
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public void init() throws HyracksDataException {
-
-                    }
-
-                    @Override
-                    public boolean appendTuple(IFrameTupleAccessor tAccess, int tIdx) throws HyracksDataException {
-                        return true;
-                    }
-                };
-            }
-        };
-    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index e0b8c73..24d0ef4 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -16,6 +16,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
@@ -23,6 +24,7 @@
 import java.util.logging.Logger;
 
 import org.apache.commons.io.FileUtils;
+import org.json.JSONArray;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -30,13 +32,20 @@
 
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetReader;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
 import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 
 public abstract class AbstractMultiNCIntegrationTest {
 
@@ -84,6 +93,7 @@
             ncConfig.ccPort = 39001;
             ncConfig.clusterNetIPAddress = "127.0.0.1";
             ncConfig.dataIPAddress = "127.0.0.1";
+            ncConfig.datasetIPAddress = "127.0.0.1";
             ncConfig.nodeId = ASTERIX_IDS[i];
             asterixNCs[i] = new NodeControllerService(ncConfig);
             asterixNCs[i].start();
@@ -112,6 +122,46 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(jobId.toString());
         }
+
+        int nReaders = 1;
+
+        ByteBuffer resultBuffer = ByteBuffer.allocate(spec.getFrameSize());
+        resultBuffer.clear();
+
+        IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor(spec.getFrameSize());
+
+        IHyracksDataset hyracksDataset = new HyracksDataset(hcc, spec.getFrameSize(), nReaders);
+        IHyracksDatasetReader reader = hyracksDataset.createReader(jobId, spec.getResultSetIds().get(0));
+
+        JSONArray resultRecords = new JSONArray();
+        ByteBufferInputStream bbis = new ByteBufferInputStream();
+
+        int readSize = reader.read(resultBuffer);
+
+        while (readSize > 0) {
+
+            try {
+                frameTupleAccessor.reset(resultBuffer);
+                for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
+                    int start = frameTupleAccessor.getTupleStartOffset(tIndex);
+                    int length = frameTupleAccessor.getTupleEndOffset(tIndex) - start;
+                    bbis.setByteBuffer(resultBuffer, start);
+                    byte[] recordBytes = new byte[length];
+                    bbis.read(recordBytes, 0, length);
+                    resultRecords.put(new String(recordBytes, 0, length));
+                }
+            } finally {
+                try {
+                    bbis.close();
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            resultBuffer.clear();
+            readSize = reader.read(resultBuffer);
+        }
+
         hcc.waitForCompletion(jobId);
         dumpOutputFiles();
     }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java
index d44b75a..8c12518 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java
@@ -64,6 +64,7 @@
         ncConfig1.clusterNetIPAddress = "localhost";
         ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
         ncConfig1.dataIPAddress = "127.0.0.1";
+        ncConfig1.datasetIPAddress = "127.0.0.1";
         ncConfig1.nodeId = NC1_ID;
         nc1 = new NodeControllerService(ncConfig1);
         nc1.start();
@@ -73,6 +74,7 @@
         ncConfig2.clusterNetIPAddress = "localhost";
         ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
         ncConfig2.dataIPAddress = "127.0.0.1";
+        ncConfig2.datasetIPAddress = "127.0.0.1";
         ncConfig2.nodeId = NC2_ID;
         nc2 = new NodeControllerService(ncConfig2);
         nc2.start();
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index 2a2e2bf..c343763 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -64,6 +64,7 @@
         ncConfig1.clusterNetIPAddress = "localhost";
         ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
         ncConfig1.dataIPAddress = "127.0.0.1";
+        ncConfig1.datasetIPAddress = "127.0.0.1";
         ncConfig1.nodeId = NC1_ID;
         nc1 = new NodeControllerService(ncConfig1);
         nc1.start();
@@ -73,6 +74,7 @@
         ncConfig2.clusterNetIPAddress = "localhost";
         ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
         ncConfig2.dataIPAddress = "127.0.0.1";
+        ncConfig2.datasetIPAddress = "127.0.0.1";
         ncConfig2.nodeId = NC2_ID;
         nc2 = new NodeControllerService(ncConfig2);
         nc2.start();