Update the tests to use the rewritten HyracksDataset API.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2837 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index 1e006a9..c1e0578 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -14,8 +14,10 @@
*/
package edu.uci.ics.hyracks.tests.integration;
+import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
@@ -30,15 +32,20 @@
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.client.dataset.DatasetClientContext;
import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
public abstract class AbstractIntegrationTest {
private static final Logger LOGGER = Logger.getLogger(AbstractIntegrationTest.class.getName());
@@ -122,8 +129,46 @@
// My code
int nReaders = 5;
- IHyracksDataset dataset = new HyracksDataset(hcc, spec, jobId, nReaders);
- dataset.getResults();
+ DatasetClientContext datasetClientCtx = new DatasetClientContext(spec.getFrameSize());
+ IHyracksDataset hyracksDataset = new HyracksDataset(hcc, datasetClientCtx, nReaders);
+
+ hyracksDataset.open(jobId, spec.getResultSetIds().get(0));
+ RecordDescriptor recordDescriptor = hyracksDataset.getRecordDescriptor();
+ FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(datasetClientCtx.getFrameSize(),
+ recordDescriptor);
+
+ ByteBuffer readBuffer = datasetClientCtx.allocateFrame();
+ ByteBufferInputStream bbis = new ByteBufferInputStream();
+ DataInputStream di = new DataInputStream(bbis);
+
+ while (true) {
+ readBuffer.clear();
+ int size = hyracksDataset.read(readBuffer);
+ if (size <= 0) {
+ break;
+ }
+ try {
+ frameTupleAccessor.reset(readBuffer);
+ System.out.println("Tuple count: " + recordDescriptor);
+ for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
+ int start = frameTupleAccessor.getTupleStartOffset(tIndex)
+ + frameTupleAccessor.getFieldSlotsLength();
+ bbis.setByteBuffer(readBuffer, start);
+ Object[] record = new Object[recordDescriptor.getFieldCount()];
+ for (int i = 0; i < record.length; ++i) {
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ if (i == 0) {
+ System.out.print(String.valueOf(instance));
+ } else {
+ System.out.print(", " + String.valueOf(instance));
+ }
+ }
+ System.out.println();
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
// End of my code
hcc.waitForCompletion(jobId);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
index e75bd2b..18c5ee0 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
@@ -75,7 +75,7 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
ResultSetId rsId = new ResultSetId(1);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true);
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -121,7 +121,7 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
ResultSetId rsId = new ResultSetId(1);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false);
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);