Add the additional datasetIPAddress parameter required for the NCConfig in all the tests and implement the basic infrastructure required to run hyracks integration tests using ResultWriterOperatorDescriptor.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@3061 123451ca-8445-de46-9d55-352943316053
diff --git a/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java b/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
index 17dc5b1..dad7cd0 100644
--- a/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
+++ b/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
@@ -55,6 +55,7 @@
ncConfig1.ccPort = TEST_HYRACKS_CC_CLUSTER_NET_PORT;
ncConfig1.clusterNetIPAddress = "127.0.0.1";
ncConfig1.dataIPAddress = "127.0.0.1";
+ ncConfig1.datasetIPAddress = "127.0.0.1";
ncConfig1.nodeId = NC1_ID;
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
@@ -64,6 +65,7 @@
ncConfig2.ccPort = TEST_HYRACKS_CC_CLUSTER_NET_PORT;
ncConfig2.clusterNetIPAddress = "127.0.0.1";
ncConfig2.dataIPAddress = "127.0.0.1";
+ ncConfig2.datasetIPAddress = "127.0.0.1";
ncConfig2.nodeId = NC2_ID;
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
diff --git a/hivesterix/src/test/java/edu/uci/ics/hivesterix/perf/base/AbstractPerfTestSuiteClass.java b/hivesterix/src/test/java/edu/uci/ics/hivesterix/perf/base/AbstractPerfTestSuiteClass.java
index 8d50f47..3d66f59 100644
--- a/hivesterix/src/test/java/edu/uci/ics/hivesterix/perf/base/AbstractPerfTestSuiteClass.java
+++ b/hivesterix/src/test/java/edu/uci/ics/hivesterix/perf/base/AbstractPerfTestSuiteClass.java
@@ -115,6 +115,7 @@
ncConfig.clusterNetIPAddress = ipAddress;
ncConfig.ccPort = clientPort;
ncConfig.dataIPAddress = "127.0.0.1";
+ ncConfig.datasetIPAddress = "127.0.0.1";
ncConfig.nodeId = "nc" + i;
NodeControllerService nc = new NodeControllerService(ncConfig);
nc.start();
diff --git a/hivesterix/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java b/hivesterix/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java
index ca2731b..64ad495 100644
--- a/hivesterix/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java
+++ b/hivesterix/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java
@@ -118,6 +118,7 @@
ncConfig.clusterNetIPAddress = ipAddress;
ncConfig.ccPort = netPort;
ncConfig.dataIPAddress = "127.0.0.1";
+ ncConfig.datasetIPAddress = "127.0.0.1";
ncConfig.nodeId = "nc" + i;
NodeControllerService nc = new NodeControllerService(ncConfig);
nc.start();
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index 2fcd626..8893567 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -14,17 +14,19 @@
*/
package edu.uci.ics.hyracks.tests.integration;
+import java.io.BufferedReader;
import java.io.File;
+import java.io.FileReader;
import java.io.IOException;
-import java.io.PrintStream;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.commons.io.FileUtils;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
@@ -32,19 +34,19 @@
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializer;
-import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetReader;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.client.dataset.DatasetClientContext;
import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
public abstract class AbstractIntegrationTest {
private static final Logger LOGGER = Logger.getLogger(AbstractIntegrationTest.class.getName());
@@ -117,7 +119,7 @@
cc.stop();
}
- protected void runTest(JobSpecification spec) throws Exception {
+ protected JobId executeTest(JobSpecification spec) throws Exception {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(spec.toJSON().toString(2));
}
@@ -125,69 +127,72 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(jobId.toString());
}
-
- // My code
- int nReaders = 5;
- DatasetClientContext datasetClientCtx = new DatasetClientContext(spec.getFrameSize());
- IHyracksDataset hyracksDataset = new HyracksDataset(hcc, datasetClientCtx, nReaders);
-
- hyracksDataset.open(jobId, spec.getResultSetIds().get(0));
- /* FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(datasetClientCtx.getFrameSize(),
- recordDescriptor);
-
- ByteBuffer readBuffer = datasetClientCtx.allocateFrame();
- ByteBufferInputStream bbis = new ByteBufferInputStream();
- DataInputStream di = new DataInputStream(bbis);
-
- while (true) {
- readBuffer.clear();
- int size = hyracksDataset.read(readBuffer);
- if (size <= 0) {
- break;
- }
- try {
- frameTupleAccessor.reset(readBuffer);
- System.out.println("Tuple count: " + recordDescriptor);
- for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
- int start = frameTupleAccessor.getTupleStartOffset(tIndex)
- + frameTupleAccessor.getFieldSlotsLength();
- bbis.setByteBuffer(readBuffer, start);
- Object[] record = new Object[recordDescriptor.getFieldCount()];
- for (int i = 0; i < record.length; ++i) {
- Object instance = recordDescriptor.getFields()[i].deserialize(di);
- if (i == 0) {
- System.out.print(String.valueOf(instance));
- } else {
- System.out.print(", " + String.valueOf(instance));
- }
- }
- System.out.println();
- }
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
- // End of my code
- */
- hcc.waitForCompletion(jobId);
- dumpOutputFiles();
+ return jobId;
}
- private void dumpOutputFiles() {
- if (LOGGER.isLoggable(Level.INFO)) {
- for (File f : outputFiles) {
- if (f.exists() && f.isFile()) {
- try {
- LOGGER.info("Reading file: " + f.getAbsolutePath() + " in test: " + getClass().getName());
- String data = FileUtils.readFileToString(f);
- LOGGER.info(data);
- } catch (IOException e) {
- LOGGER.info("Error reading file: " + f.getAbsolutePath());
- LOGGER.info(e.getMessage());
- }
+ protected void runTest(JobSpecification spec) throws Exception {
+ JobId jobId = executeTest(spec);
+ hcc.waitForCompletion(jobId);
+ }
+
+ protected List<String> readResults(JobSpecification spec, JobId jobId, ResultSetId resultSetId) throws Exception {
+ int nReaders = 1;
+ ByteBuffer resultBuffer = ByteBuffer.allocate(spec.getFrameSize());
+ resultBuffer.clear();
+
+ IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor(spec.getFrameSize());
+
+ IHyracksDataset hyracksDataset = new HyracksDataset(hcc, spec.getFrameSize(), nReaders);
+ IHyracksDatasetReader reader = hyracksDataset.createReader(jobId, resultSetId);
+
+ List<String> resultRecords = new ArrayList<String>();
+ ByteBufferInputStream bbis = new ByteBufferInputStream();
+
+ int readSize = reader.read(resultBuffer);
+
+ while (readSize > 0) {
+
+ try {
+ frameTupleAccessor.reset(resultBuffer);
+ for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
+ int start = frameTupleAccessor.getTupleStartOffset(tIndex);
+ int length = frameTupleAccessor.getTupleEndOffset(tIndex) - start;
+ bbis.setByteBuffer(resultBuffer, start);
+ byte[] recordBytes = new byte[length];
+ bbis.read(recordBytes, 0, length);
+ resultRecords.add(new String(recordBytes, 0, length));
}
+ } finally {
+ bbis.close();
}
+
+ resultBuffer.clear();
+ readSize = reader.read(resultBuffer);
}
+ return resultRecords;
+ }
+
+ protected boolean runTestAndCompareResults(JobSpecification spec, String[] expectedFileNames) throws Exception {
+ JobId jobId = executeTest(spec);
+
+ List<String> results;
+ for (int i = 0; i < expectedFileNames.length; i++) {
+ results = readResults(spec, jobId, spec.getResultSetIds().get(i));
+ BufferedReader expectedFile = new BufferedReader(new FileReader(expectedFileNames[i]));
+
+ String expectedLine, actualLine;
+ int j = 0;
+ while ((expectedLine = expectedFile.readLine()) != null) {
+ actualLine = results.get(j).trim();
+ Assert.assertEquals(expectedLine, actualLine);
+ j++;
+ }
+ Assert.assertEquals(j, results.size());
+ expectedFile.close();
+ }
+
+ hcc.waitForCompletion(jobId);
+ return true;
}
protected File createTempFile() throws IOException {
@@ -198,27 +203,4 @@
outputFiles.add(tempFile);
return tempFile;
}
-
- protected IResultSerializerFactory getResultSerializedAppenderFactory() {
- return new IResultSerializerFactory() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public IResultSerializer createResultSerializer(final PrintStream printStream) {
- return new IResultSerializer() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void init() throws HyracksDataException {
-
- }
-
- @Override
- public boolean appendTuple(IFrameTupleAccessor tAccess, int tIdx) throws HyracksDataException {
- return true;
- }
- };
- }
- };
- }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index e0b8c73..24d0ef4 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -16,6 +16,7 @@
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
@@ -23,6 +24,7 @@
import java.util.logging.Logger;
import org.apache.commons.io.FileUtils;
+import org.json.JSONArray;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -30,13 +32,20 @@
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetReader;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
public abstract class AbstractMultiNCIntegrationTest {
@@ -84,6 +93,7 @@
ncConfig.ccPort = 39001;
ncConfig.clusterNetIPAddress = "127.0.0.1";
ncConfig.dataIPAddress = "127.0.0.1";
+ ncConfig.datasetIPAddress = "127.0.0.1";
ncConfig.nodeId = ASTERIX_IDS[i];
asterixNCs[i] = new NodeControllerService(ncConfig);
asterixNCs[i].start();
@@ -112,6 +122,46 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(jobId.toString());
}
+
+ int nReaders = 1;
+
+ ByteBuffer resultBuffer = ByteBuffer.allocate(spec.getFrameSize());
+ resultBuffer.clear();
+
+ IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor(spec.getFrameSize());
+
+ IHyracksDataset hyracksDataset = new HyracksDataset(hcc, spec.getFrameSize(), nReaders);
+ IHyracksDatasetReader reader = hyracksDataset.createReader(jobId, spec.getResultSetIds().get(0));
+
+ JSONArray resultRecords = new JSONArray();
+ ByteBufferInputStream bbis = new ByteBufferInputStream();
+
+ int readSize = reader.read(resultBuffer);
+
+ while (readSize > 0) {
+
+ try {
+ frameTupleAccessor.reset(resultBuffer);
+ for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
+ int start = frameTupleAccessor.getTupleStartOffset(tIndex);
+ int length = frameTupleAccessor.getTupleEndOffset(tIndex) - start;
+ bbis.setByteBuffer(resultBuffer, start);
+ byte[] recordBytes = new byte[length];
+ bbis.read(recordBytes, 0, length);
+ resultRecords.put(new String(recordBytes, 0, length));
+ }
+ } finally {
+ try {
+ bbis.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ resultBuffer.clear();
+ readSize = reader.read(resultBuffer);
+ }
+
hcc.waitForCompletion(jobId);
dumpOutputFiles();
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java
index d44b75a..8c12518 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java
@@ -64,6 +64,7 @@
ncConfig1.clusterNetIPAddress = "localhost";
ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
ncConfig1.dataIPAddress = "127.0.0.1";
+ ncConfig1.datasetIPAddress = "127.0.0.1";
ncConfig1.nodeId = NC1_ID;
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
@@ -73,6 +74,7 @@
ncConfig2.clusterNetIPAddress = "localhost";
ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
ncConfig2.dataIPAddress = "127.0.0.1";
+ ncConfig2.datasetIPAddress = "127.0.0.1";
ncConfig2.nodeId = NC2_ID;
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index 2a2e2bf..c343763 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -64,6 +64,7 @@
ncConfig1.clusterNetIPAddress = "localhost";
ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
ncConfig1.dataIPAddress = "127.0.0.1";
+ ncConfig1.datasetIPAddress = "127.0.0.1";
ncConfig1.nodeId = NC1_ID;
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
@@ -73,6 +74,7 @@
ncConfig2.clusterNetIPAddress = "localhost";
ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
ncConfig2.dataIPAddress = "127.0.0.1";
+ ncConfig2.datasetIPAddress = "127.0.0.1";
ncConfig2.nodeId = NC2_ID;
nc2 = new NodeControllerService(ncConfig2);
nc2.start();