merged from fullstack_asterix_stabilization to fullstack_lsm_staging -r3100:3171
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_lsm_staging@3173 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-examples/btree-example/btreeapp/pom.xml b/hyracks/hyracks-examples/btree-example/btreeapp/pom.xml
deleted file mode 100644
index 792cedd..0000000
--- a/hyracks/hyracks-examples/btree-example/btreeapp/pom.xml
+++ /dev/null
@@ -1,87 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>edu.uci.ics.hyracks.examples.btree</groupId>
- <artifactId>btreeapp</artifactId>
- <name>btreeapp</name>
-
- <parent>
- <groupId>edu.uci.ics.hyracks.examples</groupId>
- <artifactId>btree-example</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- </parent>
-
- <build>
- <pluginManagement>
- <plugins>
- <plugin>
- <groupId>org.eclipse.m2e</groupId>
- <artifactId>lifecycle-mapping</artifactId>
- <version>1.0.0</version>
- <configuration>
- <lifecycleMappingMetadata>
- <pluginExecutions>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <versionRange>[1.0.0,)</versionRange>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore />
- </action>
- </pluginExecution>
- </pluginExecutions>
- </lifecycleMappingMetadata>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
-
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <executions>
- <execution>
- <id>copy-dependencies</id>
- <phase>package</phase>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- <configuration>
- <outputDirectory>target/application/lib</outputDirectory>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>2.2-beta-5</version>
- <executions>
- <execution>
- <configuration>
- <descriptors>
- <descriptor>src/main/assembly/app-assembly.xml</descriptor>
- </descriptors>
- </configuration>
- <phase>package</phase>
- <goals>
- <goal>attached</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- <dependencies>
- <dependency>
- <groupId>edu.uci.ics.hyracks.examples.btree</groupId>
- <artifactId>btreehelper</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- <scope>compile</scope>
- </dependency>
- </dependencies>
-</project>
diff --git a/hyracks/hyracks-examples/btree-example/btreeapp/src/main/assembly/app-assembly.xml b/hyracks/hyracks-examples/btree-example/btreeapp/src/main/assembly/app-assembly.xml
deleted file mode 100644
index 43ace6c..0000000
--- a/hyracks/hyracks-examples/btree-example/btreeapp/src/main/assembly/app-assembly.xml
+++ /dev/null
@@ -1,13 +0,0 @@
-<assembly>
- <id>app-assembly</id>
- <formats>
- <format>zip</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <fileSets>
- <fileSet>
- <directory>target/application/lib</directory>
- <outputDirectory>lib</outputDirectory>
- </fileSet>
- </fileSets>
-</assembly>
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
index 86c11d3..a939a04 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
@@ -61,9 +61,6 @@
@Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
public int port = 1098;
- @Option(name = "-app", usage = "Hyracks Application name", required = true)
- public String app;
-
@Option(name = "-target-ncs", usage = "Comma separated list of node-controller names to use", required = true)
public String ncs;
@@ -87,7 +84,7 @@
JobSpecification job = createJob(options);
long start = System.currentTimeMillis();
- JobId jobId = hcc.startJob(options.app, job);
+ JobId jobId = hcc.startJob(job);
hcc.waitForCompletion(jobId);
long end = System.currentTimeMillis();
System.err.println(start + " " + end + " " + (end - start));
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
index 105fe9b..616fdae 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
@@ -59,9 +59,6 @@
@Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
public int port = 1098;
- @Option(name = "-app", usage = "Hyracks Application name", required = true)
- public String app;
-
@Option(name = "-target-ncs", usage = "Comma separated list of node-controller names to use", required = true)
public String ncs;
@@ -85,7 +82,7 @@
JobSpecification job = createJob(options);
long start = System.currentTimeMillis();
- JobId jobId = hcc.startJob(options.app, job);
+ JobId jobId = hcc.startJob(job);
hcc.waitForCompletion(jobId);
long end = System.currentTimeMillis();
System.err.println(start + " " + end + " " + (end - start));
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
index afc3487..3493947 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
@@ -58,9 +58,6 @@
@Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
public int port = 1098;
- @Option(name = "-app", usage = "Hyracks Application name", required = true)
- public String app;
-
@Option(name = "-target-ncs", usage = "Comma separated list of node-controller names to use", required = true)
public String ncs;
@@ -78,7 +75,7 @@
JobSpecification job = createJob(options);
long start = System.currentTimeMillis();
- JobId jobId = hcc.startJob(options.app, job);
+ JobId jobId = hcc.startJob(job);
hcc.waitForCompletion(jobId);
long end = System.currentTimeMillis();
System.err.println(start + " " + end + " " + (end - start));
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
index 0c6ab8e..075ea20 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
@@ -54,9 +54,6 @@
@Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
public int port = 1098;
- @Option(name = "-app", usage = "Hyracks Application name", required = true)
- public String app;
-
@Option(name = "-target-ncs", usage = "Comma separated list of node-controller names to use", required = true)
public String ncs;
@@ -80,7 +77,7 @@
JobSpecification job = createJob(options);
long start = System.currentTimeMillis();
- JobId jobId = hcc.startJob(options.app, job);
+ JobId jobId = hcc.startJob(job);
hcc.waitForCompletion(jobId);
long end = System.currentTimeMillis();
System.err.println(start + " " + end + " " + (end - start));
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
index e2c0be9..a6ffc13 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
@@ -58,9 +58,6 @@
@Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
public int port = 1098;
- @Option(name = "-app", usage = "Hyracks Application name", required = true)
- public String app;
-
@Option(name = "-target-ncs", usage = "Comma separated list of node-controller names to use", required = true)
public String ncs;
@@ -81,7 +78,7 @@
JobSpecification job = createJob(options);
long start = System.currentTimeMillis();
- JobId jobId = hcc.startJob(options.app, job);
+ JobId jobId = hcc.startJob(job);
hcc.waitForCompletion(jobId);
long end = System.currentTimeMillis();
System.err.println(start + " " + end + " " + (end - start));
diff --git a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/NCApplicationEntryPoint.java b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/NCApplicationEntryPoint.java
new file mode 100644
index 0000000..a1dacd8
--- /dev/null
+++ b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/NCApplicationEntryPoint.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.hyracks.examples.btree.helper;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.application.INCApplicationEntryPoint;
+
+public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
+ @Override
+ public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception {
+ RuntimeContext rCtx = new RuntimeContext(ncAppCtx);
+ ncAppCtx.setApplicationObject(rCtx);
+ }
+
+ @Override
+ public void notifyStartupComplete() throws Exception {
+
+ }
+
+ @Override
+ public void stop() throws Exception {
+
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/NCBootstrap.java b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/NCBootstrap.java
deleted file mode 100644
index ea55e7a..0000000
--- a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/NCBootstrap.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.examples.btree.helper;
-
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.application.INCApplicationContext;
-import edu.uci.ics.hyracks.api.application.INCBootstrap;
-
-public class NCBootstrap implements INCBootstrap {
- private static final Logger LOGGER = Logger.getLogger(NCBootstrap.class.getName());
-
- private INCApplicationContext appCtx;
-
- @Override
- public void start() throws Exception {
- LOGGER.info("Starting NC Bootstrap");
- RuntimeContext rCtx = new RuntimeContext(appCtx);
- appCtx.setApplicationObject(rCtx);
- LOGGER.info("Initialized RuntimeContext: " + rCtx);
- }
-
- @Override
- public void stop() throws Exception {
- LOGGER.info("Stopping Asterix NC Bootstrap");
- RuntimeContext rCtx = (RuntimeContext) appCtx.getApplicationObject();
- rCtx.close();
- }
-
- @Override
- public void setApplicationContext(INCApplicationContext appCtx) {
- this.appCtx = appCtx;
- }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/resources/hyracks-deployment.properties b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/resources/hyracks-deployment.properties
deleted file mode 100644
index ab0ecb3..0000000
--- a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/resources/hyracks-deployment.properties
+++ /dev/null
@@ -1 +0,0 @@
-nc.bootstrap.class=edu.uci.ics.hyracks.examples.btree.helper.NCBootstrap
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/btree-example/btreeserver/pom.xml b/hyracks/hyracks-examples/btree-example/btreeserver/pom.xml
new file mode 100644
index 0000000..ec6747d
--- /dev/null
+++ b/hyracks/hyracks-examples/btree-example/btreeserver/pom.xml
@@ -0,0 +1,84 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.hyracks.examples.btree</groupId>
+ <artifactId>btreeserver</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <name>btreeserver</name>
+
+ <parent>
+ <groupId>edu.uci.ics.hyracks.examples</groupId>
+ <artifactId>btree-example</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <version>1.3</version>
+ <executions>
+ <execution>
+ <configuration>
+ <programs>
+ <program>
+ <mainClass>edu.uci.ics.hyracks.control.cc.CCDriver</mainClass>
+ <name>hyrackscc</name>
+ </program>
+ <program>
+ <mainClass>edu.uci.ics.hyracks.control.nc.NCDriver</mainClass>
+ <name>hyracksnc</name>
+ </program>
+ </programs>
+ <repositoryLayout>flat</repositoryLayout>
+ <repositoryName>lib</repositoryName>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>assemble</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2-beta-5</version>
+ <executions>
+ <execution>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>attached</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks.examples.btree</groupId>
+ <artifactId>btreehelper</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-cc</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-nc</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks/hyracks-examples/btree-example/btreeserver/src/main/assembly/binary-assembly.xml b/hyracks/hyracks-examples/btree-example/btreeserver/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..cd598d9
--- /dev/null
+++ b/hyracks/hyracks-examples/btree-example/btreeserver/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,23 @@
+<assembly>
+ <id>binary-assembly</id>
+ <formats>
+ <format>zip</format>
+ <format>dir</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>target/appassembler/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>target/appassembler/lib</directory>
+ <outputDirectory>lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>docs</directory>
+ <outputDirectory>docs</outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/hyracks/hyracks-examples/btree-example/pom.xml b/hyracks/hyracks-examples/btree-example/pom.xml
index 9ef5df5..6614a6e 100644
--- a/hyracks/hyracks-examples/btree-example/pom.xml
+++ b/hyracks/hyracks-examples/btree-example/pom.xml
@@ -14,6 +14,6 @@
<modules>
<module>btreehelper</module>
<module>btreeclient</module>
- <module>btreeapp</module>
+ <module>btreeserver</module>
</modules>
</project>
diff --git a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/src/main/assembly/app-assembly.xml b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/src/main/assembly/app-assembly.xml
deleted file mode 100644
index 43ace6c..0000000
--- a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/src/main/assembly/app-assembly.xml
+++ /dev/null
@@ -1,13 +0,0 @@
-<assembly>
- <id>app-assembly</id>
- <formats>
- <format>zip</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <fileSets>
- <fileSet>
- <directory>target/application/lib</directory>
- <outputDirectory>lib</outputDirectory>
- </fileSet>
- </fileSets>
-</assembly>
diff --git a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/conf/local_cluster.conf b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatserver/conf/local_cluster.conf
similarity index 100%
rename from hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/conf/local_cluster.conf
rename to hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatserver/conf/local_cluster.conf
diff --git a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/data/file1.txt b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatserver/data/file1.txt
similarity index 100%
rename from hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/data/file1.txt
rename to hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatserver/data/file1.txt
diff --git a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/data/file2.txt b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatserver/data/file2.txt
similarity index 100%
rename from hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/data/file2.txt
rename to hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatserver/data/file2.txt
diff --git a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/job/wordcount/wordcount.job b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatserver/job/wordcount/wordcount.job
similarity index 100%
rename from hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/job/wordcount/wordcount.job
rename to hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatserver/job/wordcount/wordcount.job
diff --git a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatserver/pom.xml
similarity index 60%
rename from hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml
rename to hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatserver/pom.xml
index 4cf9745..b538db5 100644
--- a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml
+++ b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatserver/pom.xml
@@ -1,8 +1,9 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>edu.uci.ics.hyracks.examples.compat</groupId>
- <artifactId>hadoopcompatapp</artifactId>
- <name>hadoopcompatapp</name>
+ <artifactId>hadoopcompatserver</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <name>hadoopcompatserver</name>
<parent>
<groupId>edu.uci.ics.hyracks.examples</groupId>
@@ -11,49 +12,31 @@
</parent>
<build>
- <pluginManagement>
- <plugins>
- <plugin>
- <groupId>org.eclipse.m2e</groupId>
- <artifactId>lifecycle-mapping</artifactId>
- <version>1.0.0</version>
- <configuration>
- <lifecycleMappingMetadata>
- <pluginExecutions>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <versionRange>[1.0.0,)</versionRange>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore />
- </action>
- </pluginExecution>
- </pluginExecutions>
- </lifecycleMappingMetadata>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
-
<plugins>
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <version>1.3</version>
<executions>
<execution>
- <id>copy-dependencies</id>
+ <configuration>
+ <programs>
+ <program>
+ <mainClass>edu.uci.ics.hyracks.control.cc.CCDriver</mainClass>
+ <name>hyrackscc</name>
+ </program>
+ <program>
+ <mainClass>edu.uci.ics.hyracks.control.nc.NCDriver</mainClass>
+ <name>hyracksnc</name>
+ </program>
+ </programs>
+ <repositoryLayout>flat</repositoryLayout>
+ <repositoryName>lib</repositoryName>
+ </configuration>
<phase>package</phase>
<goals>
- <goal>copy-dependencies</goal>
+ <goal>assemble</goal>
</goals>
- <configuration>
- <outputDirectory>target/application/lib</outputDirectory>
- </configuration>
</execution>
</executions>
</plugin>
@@ -64,7 +47,7 @@
<execution>
<configuration>
<descriptors>
- <descriptor>src/main/assembly/app-assembly.xml</descriptor>
+ <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
</descriptors>
</configuration>
<phase>package</phase>
@@ -79,8 +62,7 @@
<artifactId>hyracks-virtualcluster-maven-plugin</artifactId>
<version>0.2.3-SNAPSHOT</version>
<configuration>
- <hyracksServerHome>${basedir}/../../../hyracks-server/target/hyracks-server-${project.version}-binary-assembly</hyracksServerHome>
- <hyracksCLIHome>${basedir}/../../../hyracks-cli/target/hyracks-cli-${project.version}-binary-assembly</hyracksCLIHome>
+ <hyracksServerHome>${basedir}/target/hadoopcompatserver-${project.version}-binary-assembly</hyracksServerHome>
<jvmOptions>${jvm.extraargs}</jvmOptions>
</configuration>
<executions>
@@ -90,9 +72,6 @@
<goals>
<goal>start-cc</goal>
</goals>
- <configuration>
- <workingDir>${project.build.directory}</workingDir>
- </configuration>
</execution>
<execution>
<id>hyracks-nc1-start</id>
@@ -104,7 +83,6 @@
<nodeId>NC1</nodeId>
<dataIpAddress>127.0.0.1</dataIpAddress>
<ccHost>localhost</ccHost>
- <workingDir>${project.build.directory}</workingDir>
</configuration>
</execution>
<execution>
@@ -117,38 +95,24 @@
<nodeId>NC2</nodeId>
<dataIpAddress>127.0.0.1</dataIpAddress>
<ccHost>localhost</ccHost>
- <workingDir>${project.build.directory}</workingDir>
</configuration>
</execution>
<execution>
- <id>deploy-app</id>
- <phase>pre-integration-test</phase>
+ <id>stop-services</id>
+ <phase>post-integration-test</phase>
<goals>
- <goal>deploy-app</goal>
+ <goal>stop-services</goal>
</goals>
- <configuration>
- <ccHost>localhost</ccHost>
- <appName>compat</appName>
- <harFile>${project.build.directory}/hadoopcompatapp-${project.version}-app-assembly.zip</harFile>
- </configuration>
</execution>
- <execution>
- <id>stop-services</id>
- <phase>post-integration-test</phase>
- <goals>
- <goal>stop-services</goal>
- </goals>
- </execution>
- </executions>
+ </executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
- <fork>true</fork>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
<plugin>
@@ -174,6 +138,20 @@
<version>0.2.3-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-cc</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-nc</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
<dependency>
<groupId>edu.uci.ics.hyracks.examples.compat</groupId>
<artifactId>hadoopcompatclient</artifactId>
diff --git a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatserver/src/main/assembly/binary-assembly.xml b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatserver/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..cd598d9
--- /dev/null
+++ b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatserver/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,23 @@
+<assembly>
+ <id>binary-assembly</id>
+ <formats>
+ <format>zip</format>
+ <format>dir</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>target/appassembler/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>target/appassembler/lib</directory>
+ <outputDirectory>lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>docs</directory>
+ <outputDirectory>docs</outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/src/test/java/edu/uci/ics/hyracks/examples/compat/test/WordCountCompatibilityIT.java b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatserver/src/test/java/edu/uci/ics/hyracks/examples/compat/test/WordCountCompatibilityIT.java
similarity index 100%
rename from hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/src/test/java/edu/uci/ics/hyracks/examples/compat/test/WordCountCompatibilityIT.java
rename to hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatserver/src/test/java/edu/uci/ics/hyracks/examples/compat/test/WordCountCompatibilityIT.java
diff --git a/hyracks/hyracks-examples/hadoop-compat-example/pom.xml b/hyracks/hyracks-examples/hadoop-compat-example/pom.xml
index 16787ca..103d762 100644
--- a/hyracks/hyracks-examples/hadoop-compat-example/pom.xml
+++ b/hyracks/hyracks-examples/hadoop-compat-example/pom.xml
@@ -14,7 +14,7 @@
<modules>
<module>hadoopcompathelper</module>
<module>hadoopcompatclient</module>
- <module>hadoopcompatapp</module>
+ <module>hadoopcompatserver</module>
</modules>
<dependencies>
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml b/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
index 25c7655..30d47ea 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
@@ -9,7 +9,6 @@
<artifactId>hyracks-examples</artifactId>
<version>0.2.3-SNAPSHOT</version>
</parent>
-
<build>
<plugins>
<plugin>
@@ -99,5 +98,12 @@
<artifactId>hyracks-data-std</artifactId>
<version>0.2.3-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-client</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index 0e0e899..e74e54c 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -14,29 +14,39 @@
*/
package edu.uci.ics.hyracks.tests.integration;
+import java.io.BufferedReader;
import java.io.File;
+import java.io.FileReader;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.commons.io.FileUtils;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetReader;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
public abstract class AbstractIntegrationTest {
private static final Logger LOGGER = Logger.getLogger(AbstractIntegrationTest.class.getName());
@@ -83,6 +93,7 @@
ncConfig1.ccPort = 39001;
ncConfig1.clusterNetIPAddress = "127.0.0.1";
ncConfig1.dataIPAddress = "127.0.0.1";
+ ncConfig1.datasetIPAddress = "127.0.0.1";
ncConfig1.nodeId = NC1_ID;
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
@@ -92,12 +103,12 @@
ncConfig2.ccPort = 39001;
ncConfig2.clusterNetIPAddress = "127.0.0.1";
ncConfig2.dataIPAddress = "127.0.0.1";
+ ncConfig2.datasetIPAddress = "127.0.0.1";
ncConfig2.nodeId = NC2_ID;
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
- hcc.createApplication("test", null);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
}
@@ -110,33 +121,80 @@
cc.stop();
}
- protected void runTest(JobSpecification spec) throws Exception {
+ protected JobId executeTest(JobSpecification spec) throws Exception {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(spec.toJSON().toString(2));
}
- JobId jobId = hcc.startJob("test", spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+ JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(jobId.toString());
}
- hcc.waitForCompletion(jobId);
- dumpOutputFiles();
+ return jobId;
}
- private void dumpOutputFiles() {
- if (LOGGER.isLoggable(Level.INFO)) {
- for (File f : outputFiles) {
- if (f.exists() && f.isFile()) {
- try {
- LOGGER.info("Reading file: " + f.getAbsolutePath() + " in test: " + getClass().getName());
- String data = FileUtils.readFileToString(f);
- LOGGER.info(data);
- } catch (IOException e) {
- LOGGER.info("Error reading file: " + f.getAbsolutePath());
- LOGGER.info(e.getMessage());
- }
+ protected void runTest(JobSpecification spec) throws Exception {
+ JobId jobId = executeTest(spec);
+ hcc.waitForCompletion(jobId);
+ }
+
+ protected List<String> readResults(JobSpecification spec, JobId jobId, ResultSetId resultSetId) throws Exception {
+ int nReaders = 1;
+ ByteBuffer resultBuffer = ByteBuffer.allocate(spec.getFrameSize());
+ resultBuffer.clear();
+
+ IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor(spec.getFrameSize());
+
+ IHyracksDataset hyracksDataset = new HyracksDataset(hcc, spec.getFrameSize(), nReaders);
+ IHyracksDatasetReader reader = hyracksDataset.createReader(jobId, resultSetId);
+
+ List<String> resultRecords = new ArrayList<String>();
+ ByteBufferInputStream bbis = new ByteBufferInputStream();
+
+ int readSize = reader.read(resultBuffer);
+
+ while (readSize > 0) {
+
+ try {
+ frameTupleAccessor.reset(resultBuffer);
+ for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
+ int start = frameTupleAccessor.getTupleStartOffset(tIndex);
+ int length = frameTupleAccessor.getTupleEndOffset(tIndex) - start;
+ bbis.setByteBuffer(resultBuffer, start);
+ byte[] recordBytes = new byte[length];
+ bbis.read(recordBytes, 0, length);
+ resultRecords.add(new String(recordBytes, 0, length));
}
+ } finally {
+ bbis.close();
}
+
+ resultBuffer.clear();
+ readSize = reader.read(resultBuffer);
}
+ return resultRecords;
+ }
+
+ protected boolean runTestAndCompareResults(JobSpecification spec, String[] expectedFileNames) throws Exception {
+ JobId jobId = executeTest(spec);
+
+ List<String> results;
+ for (int i = 0; i < expectedFileNames.length; i++) {
+ results = readResults(spec, jobId, spec.getResultSetIds().get(i));
+ BufferedReader expectedFile = new BufferedReader(new FileReader(expectedFileNames[i]));
+
+ String expectedLine, actualLine;
+ int j = 0;
+ while ((expectedLine = expectedFile.readLine()) != null) {
+ actualLine = results.get(j).trim();
+ Assert.assertEquals(expectedLine, actualLine);
+ j++;
+ }
+ Assert.assertEquals(j, results.size());
+ expectedFile.close();
+ }
+
+ hcc.waitForCompletion(jobId);
+ return true;
}
protected File createTempFile() throws IOException {
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index e0b8c73..d97b7db 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -16,6 +16,7 @@
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
@@ -23,6 +24,7 @@
import java.util.logging.Logger;
import org.apache.commons.io.FileUtils;
+import org.json.JSONArray;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -30,13 +32,20 @@
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetReader;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
public abstract class AbstractMultiNCIntegrationTest {
@@ -84,13 +93,13 @@
ncConfig.ccPort = 39001;
ncConfig.clusterNetIPAddress = "127.0.0.1";
ncConfig.dataIPAddress = "127.0.0.1";
+ ncConfig.datasetIPAddress = "127.0.0.1";
ncConfig.nodeId = ASTERIX_IDS[i];
asterixNCs[i] = new NodeControllerService(ncConfig);
asterixNCs[i].start();
}
hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
- hcc.createApplication("test", null);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
}
@@ -108,10 +117,50 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(spec.toJSON().toString(2));
}
- JobId jobId = hcc.startJob("test", spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+ JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(jobId.toString());
}
+
+ int nReaders = 1;
+
+ ByteBuffer resultBuffer = ByteBuffer.allocate(spec.getFrameSize());
+ resultBuffer.clear();
+
+ IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor(spec.getFrameSize());
+
+ IHyracksDataset hyracksDataset = new HyracksDataset(hcc, spec.getFrameSize(), nReaders);
+ IHyracksDatasetReader reader = hyracksDataset.createReader(jobId, spec.getResultSetIds().get(0));
+
+ JSONArray resultRecords = new JSONArray();
+ ByteBufferInputStream bbis = new ByteBufferInputStream();
+
+ int readSize = reader.read(resultBuffer);
+
+ while (readSize > 0) {
+
+ try {
+ frameTupleAccessor.reset(resultBuffer);
+ for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
+ int start = frameTupleAccessor.getTupleStartOffset(tIndex);
+ int length = frameTupleAccessor.getTupleEndOffset(tIndex) - start;
+ bbis.setByteBuffer(resultBuffer, start);
+ byte[] recordBytes = new byte[length];
+ bbis.read(recordBytes, 0, length);
+ resultRecords.put(new String(recordBytes, 0, length));
+ }
+ } finally {
+ try {
+ bbis.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ resultBuffer.clear();
+ readSize = reader.read(resultBuffer);
+ }
+
hcc.waitForCompletion(jobId);
dumpOutputFiles();
}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTest.java
index 93e1e9b..06751e3 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTest.java
@@ -25,8 +25,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
@@ -49,7 +49,6 @@
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.aggregators.AvgFieldGroupAggregatorFactory;
@@ -62,60 +61,42 @@
import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.tests.util.ResultSerializerFactoryProvider;
/**
*
*/
public class AggregationTest extends AbstractIntegrationTest {
- final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
- new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
- "data/tpch0.001/lineitem.tbl"))) });
+ final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC2_ID,
+ new FileReference(new File("data/tpch0.001/lineitem.tbl"))) });
- final RecordDescriptor desc = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ final RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
- final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(
- new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, }, '|');
+ final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
+ UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, }, '|');
- private AbstractSingleActivityOperatorDescriptor getPrinter(
- IOperatorDescriptorRegistry spec, String prefix) throws IOException {
+ private AbstractSingleActivityOperatorDescriptor getPrinter(JobSpecification spec, String prefix)
+ throws IOException {
- AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(
- spec, new ConstantFileSplitProvider(new FileSplit[] {
- new FileSplit(NC1_ID, createTempFile()
- .getAbsolutePath()),
- new FileSplit(NC2_ID, createTempFile()
- .getAbsolutePath()) }), "\t");
+ ResultSetId rsId = new ResultSetId(1);
+ AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ spec.addResultSetId(rsId);
return printer;
}
@@ -124,54 +105,38 @@
public void singleKeySumInmemGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
int tableSize = 8;
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
- spec,
- keyFields,
- new FieldHashPartitionComputerFactory(
- keyFields,
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
+ new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }),
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new IntSumFieldAggregatorFactory(3, true),
- new FloatSumFieldAggregatorFactory(5, true) }),
- outputRec, tableSize);
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true),
+ new FloatSumFieldAggregatorFactory(5, true) }), outputRec, tableSize);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeySumInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeySumInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -184,49 +149,34 @@
public void singleKeySumPreClusterGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE});
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
- PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
- spec,
- keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new IntSumFieldAggregatorFactory(3, true),
- new FloatSumFieldAggregatorFactory(5, true)}),
- outputRec);
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true),
+ new FloatSumFieldAggregatorFactory(5, true) }), outputRec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeySumInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeySumInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -239,64 +189,43 @@
public void singleKeySumExtGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE});
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
int frameLimits = 4;
int tableSize = 8;
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY) },
- new UTF8StringNormalizedKeyComputerFactory(),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new UTF8StringNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
new IntSumFieldAggregatorFactory(3, false),
- new FloatSumFieldAggregatorFactory(5, false)}),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
+ new FloatSumFieldAggregatorFactory(5, false) }), new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
new IntSumFieldAggregatorFactory(2, false),
- new FloatSumFieldAggregatorFactory(3, false)}),
- outputRec,
- new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }),
- tableSize), true);
+ new FloatSumFieldAggregatorFactory(3, false) }), outputRec,
+ new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }), tableSize), true);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeySumExtGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeySumExtGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -309,54 +238,38 @@
public void singleKeyAvgInmemGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
int tableSize = 8;
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
- spec,
- keyFields,
- new FieldHashPartitionComputerFactory(
- keyFields,
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
+ new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }),
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new CountFieldAggregatorFactory(true),
- new AvgFieldGroupAggregatorFactory(1, true) }),
- outputRec, tableSize);
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
+ new AvgFieldGroupAggregatorFactory(1, true) }), outputRec, tableSize);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeyAvgInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -369,49 +282,34 @@
public void singleKeyAvgPreClusterGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
- PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
- spec,
- keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new CountFieldAggregatorFactory(true),
- new AvgFieldGroupAggregatorFactory(1, true) }),
- outputRec);
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
+ new AvgFieldGroupAggregatorFactory(1, true) }), outputRec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeyAvgInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -424,64 +322,43 @@
public void singleKeyAvgExtGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
int frameLimits = 4;
int tableSize = 8;
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY) },
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
new UTF8StringNormalizedKeyComputerFactory(),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
- new CountFieldAggregatorFactory(false),
- new AvgFieldGroupAggregatorFactory(1, false) }),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false), new CountFieldAggregatorFactory(false),
+ new AvgFieldGroupAggregatorFactory(1, false) }), new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
new IntSumFieldAggregatorFactory(2, false),
- new AvgFieldMergeAggregatorFactory(3, false) }),
- outputRec,
- new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }),
- tableSize), true);
+ new AvgFieldMergeAggregatorFactory(3, false) }), outputRec,
+ new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }), tableSize), true);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeyAvgExtGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgExtGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -494,52 +371,38 @@
public void singleKeyMinMaxStringInmemGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
int tableSize = 8;
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
- spec,
- keyFields,
- new FieldHashPartitionComputerFactory(
- keyFields,
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
+ new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }),
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new MinMaxStringFieldAggregatorFactory(15,
- true, false) }), outputRec, tableSize);
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new MinMaxStringFieldAggregatorFactory(15, true, false) }), outputRec, tableSize);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeyAvgInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -552,47 +415,34 @@
public void singleKeyMinMaxStringPreClusterGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
- PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
- spec,
- keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new MinMaxStringFieldAggregatorFactory(15,
- true, false) }), outputRec);
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new MinMaxStringFieldAggregatorFactory(15, true, false) }), outputRec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeyAvgInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -605,63 +455,42 @@
public void singleKeyMinMaxStringExtGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
int frameLimits = 4;
int tableSize = 8;
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY) },
- new UTF8StringNormalizedKeyComputerFactory(),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
- new MinMaxStringFieldAggregatorFactory(15,
- true, true) }),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
- new MinMaxStringFieldAggregatorFactory(2, true,
- true) }),
- outputRec,
- new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }),
- tableSize), true);
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new UTF8StringNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+ new MinMaxStringFieldAggregatorFactory(15, true, true) }),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new MinMaxStringFieldAggregatorFactory(2, true, true) }), outputRec,
+ new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }), tableSize), true);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeyAvgExtGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgExtGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -674,58 +503,39 @@
public void multiKeySumInmemGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 8, 0 };
int tableSize = 8;
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
- spec, keyFields, new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }),
- new IBinaryComparatorFactory[] {
- PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new IntSumFieldAggregatorFactory(3, true) }),
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true) }),
outputRec, tableSize);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "multiKeySumInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeySumInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -738,51 +548,35 @@
public void multiKeySumPreClusterGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 8, 0 };
- PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
- spec, keyFields,
- new IBinaryComparatorFactory[] {
- PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new IntSumFieldAggregatorFactory(3, true) }),
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true) }),
outputRec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "multiKeySumInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeySumInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -795,69 +589,43 @@
public void multiKeySumExtGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 8, 0 };
int frameLimits = 4;
int tableSize = 8;
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] {
- PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY) },
- new UTF8StringNormalizedKeyComputerFactory(),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
- new IntSumFieldAggregatorFactory(3, false) }),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(2, false),
- new IntSumFieldAggregatorFactory(3, false) }),
- outputRec,
- new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }),
- tableSize), true);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keyFields,
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new UTF8StringNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+ new IntSumFieldAggregatorFactory(3, false) }), new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(2, false),
+ new IntSumFieldAggregatorFactory(3, false) }), outputRec,
+ new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), tableSize), true);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "multiKeySumExtGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeySumExtGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -870,60 +638,40 @@
public void multiKeyAvgInmemGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 8, 0 };
int tableSize = 8;
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
- spec, keyFields, new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }),
- new IBinaryComparatorFactory[] {
- PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new CountFieldAggregatorFactory(true),
- new AvgFieldGroupAggregatorFactory(1, true) }),
- outputRec, tableSize);
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
+ new AvgFieldGroupAggregatorFactory(1, true) }), outputRec, tableSize);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "multiKeyAvgInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyAvgInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -936,53 +684,36 @@
public void multiKeyAvgPreClusterGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 8, 0 };
- PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
- spec, keyFields,
- new IBinaryComparatorFactory[] {
- PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new CountFieldAggregatorFactory(true),
- new AvgFieldGroupAggregatorFactory(1, true) }),
- outputRec);
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
+ new AvgFieldGroupAggregatorFactory(1, true) }), outputRec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "multiKeyAvgInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyAvgInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -995,72 +726,46 @@
public void multiKeyAvgExtGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 8, 0 };
int frameLimits = 4;
int tableSize = 8;
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] {
- PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY) },
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
new UTF8StringNormalizedKeyComputerFactory(),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
- new CountFieldAggregatorFactory(false),
- new AvgFieldGroupAggregatorFactory(1, false) }),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(2, false),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false), new CountFieldAggregatorFactory(false),
+ new AvgFieldGroupAggregatorFactory(1, false) }), new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(2, false),
new IntSumFieldAggregatorFactory(3, false),
- new AvgFieldMergeAggregatorFactory(4, false) }),
- outputRec,
- new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }),
- tableSize), true);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keyFields,
+ new AvgFieldMergeAggregatorFactory(4, false) }), outputRec,
+ new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), tableSize), true);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "multiKeyAvgExtGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyAvgExtGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -1073,58 +778,39 @@
public void multiKeyMinMaxStringInmemGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 8, 0 };
int tableSize = 8;
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
- spec, keyFields, new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }),
- new IBinaryComparatorFactory[] {
- PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new MinMaxStringFieldAggregatorFactory(15,
- true, false) }), outputRec, tableSize);
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new MinMaxStringFieldAggregatorFactory(15, true, false) }), outputRec, tableSize);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "multiKeyMinMaxStringInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyMinMaxStringInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -1137,51 +823,35 @@
public void multiKeyMinMaxStringPreClusterGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 8, 0 };
- PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
- spec, keyFields,
- new IBinaryComparatorFactory[] {
- PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new MinMaxStringFieldAggregatorFactory(15,
- true, false) }), outputRec);
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new MinMaxStringFieldAggregatorFactory(15, true, false) }), outputRec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "multiKeyMinMaxStringPreClusterGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyMinMaxStringPreClusterGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -1194,71 +864,44 @@
public void multiKeyMinMaxStringExtGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 8, 0 };
int frameLimits = 4;
int tableSize = 8;
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] {
- PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY) },
- new UTF8StringNormalizedKeyComputerFactory(),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
- new MinMaxStringFieldAggregatorFactory(15,
- true, true) }),
- new MultiFieldsAggregatorFactory(new int[] { 0, 1 },
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(2, false),
- new MinMaxStringFieldAggregatorFactory(3, true,
- true) }),
- outputRec,
- new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }),
- tableSize), true);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keyFields,
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new UTF8StringNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+ new MinMaxStringFieldAggregatorFactory(15, true, true) }),
+ new MultiFieldsAggregatorFactory(new int[] { 0, 1 }, new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(2, false),
+ new MinMaxStringFieldAggregatorFactory(3, true, true) }), outputRec,
+ new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), tableSize), true);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "multiKeyMinMaxStringExtGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyMinMaxStringExtGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
index aea6126..5008991 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
@@ -25,6 +25,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
@@ -44,13 +45,14 @@
import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
+import edu.uci.ics.hyracks.tests.util.ResultSerializerFactoryProvider;
public class CountOfCountsTest extends AbstractIntegrationTest {
@Test
@@ -76,12 +78,10 @@
RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
- PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(
- spec,
- new int[] { 0 },
+ PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
- desc2);
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }), desc2);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group, NC1_ID);
InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
@@ -91,13 +91,15 @@
RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, new MultiFieldsAggregatorFactory(
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(
new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }), desc3);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ spec.addResultSetId(rsId);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
@@ -148,12 +150,10 @@
RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
- PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(
- spec,
- new int[] { 0 },
+ PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
- desc2);
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }), desc2);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group, NC1_ID, NC2_ID, NC1_ID, NC2_ID);
InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
@@ -163,13 +163,16 @@
RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, new MultiFieldsAggregatorFactory(
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(
new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }), desc3);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID, NC2_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ spec.addResultSetId(rsId);
+
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
@@ -220,12 +223,10 @@
RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
- PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(
- spec,
- new int[] { 0 },
+ PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
- desc2);
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }), desc2);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group, NC1_ID, NC2_ID, NC1_ID, NC2_ID);
InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
@@ -235,13 +236,16 @@
RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, new MultiFieldsAggregatorFactory(
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(
new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }), desc3);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID, NC2_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ spec.addResultSetId(rsId);
+
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java
index 0d5a627..93ed2c7 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java
@@ -26,8 +26,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
@@ -52,12 +52,13 @@
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.aggregators.FloatSumFieldAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.tests.util.ResultSerializerFactoryProvider;
public class LocalityAwareConnectorTest extends AbstractMultiNCIntegrationTest {
@@ -200,13 +201,13 @@
runTest(spec);
}
- private AbstractSingleActivityOperatorDescriptor getPrinter(IOperatorDescriptorRegistry spec, String prefix)
+ private AbstractSingleActivityOperatorDescriptor getPrinter(JobSpecification spec, String prefix)
throws IOException {
- AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec,
- new ConstantFileSplitProvider(new FileSplit[] {
- new FileSplit("asterix-005", createTempFile().getAbsolutePath()),
- new FileSplit("asterix-006", createTempFile().getAbsolutePath()) }), "\t");
+ ResultSetId rsId = new ResultSetId(1);
+ AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ spec.addResultSetId(rsId);
return printer;
}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
index 1ee4400..ec9be32 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
@@ -40,9 +41,10 @@
import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.misc.LimitOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.OptimizedExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.tests.util.ResultSerializerFactoryProvider;
public class OptimizedSortMergeTest extends AbstractIntegrationTest {
@@ -75,9 +77,11 @@
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ spec.addResultSetId(rsId);
+
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -128,9 +132,11 @@
LimitOperatorDescriptor filter = new LimitOperatorDescriptor(spec, ordersDesc, outputLimit);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, filter, NC1_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ spec.addResultSetId(rsId);
+
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
index 9355110..961f780 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
@@ -42,7 +43,8 @@
import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.tests.util.ResultSerializerFactoryProvider;
public class ScanPrintTest extends AbstractIntegrationTest {
@Test
@@ -63,10 +65,11 @@
desc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID, NC1_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] {
- new FileSplit(NC2_ID, createTempFile().getAbsolutePath()),
- new FileSplit(NC1_ID, createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ spec.addResultSetId(rsId);
+
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn = new OneToOneConnectorDescriptor(spec);
@@ -98,9 +101,11 @@
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ spec.addResultSetId(rsId);
+
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
@@ -135,9 +140,11 @@
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ spec.addResultSetId(rsId);
+
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
index 2c3fddf..0da93f2 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
@@ -40,9 +41,10 @@
import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
+import edu.uci.ics.hyracks.tests.util.ResultSerializerFactoryProvider;
public class SortMergeTest extends AbstractIntegrationTest {
@Test
@@ -73,9 +75,11 @@
ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ spec.addResultSetId(rsId);
+
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -118,9 +122,11 @@
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ spec.addResultSetId(rsId);
+
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SplitOperatorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SplitOperatorTest.java
index 2b32142..6040748 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SplitOperatorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SplitOperatorTest.java
@@ -26,6 +26,7 @@
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
@@ -35,8 +36,9 @@
import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.tests.util.ResultSerializerFactoryProvider;
public class SplitOperatorTest extends AbstractIntegrationTest {
@@ -50,6 +52,8 @@
Assert.assertEquals(lineA, lineB);
}
Assert.assertNull(fileB.readLine());
+ fileA.close();
+ fileB.close();
}
@Test
@@ -83,8 +87,11 @@
IOperatorDescriptor outputOp[] = new IOperatorDescriptor[outputFile.length];
for (int i = 0; i < outputArity; i++) {
- outputOp[i] = new LineFileWriteOperatorDescriptor(spec, new FileSplit[] { new FileSplit(NC1_ID,
- outputFile[i].getAbsolutePath()) });
+ ResultSetId rsId = new ResultSetId(i);
+ spec.addResultSetId(rsId);
+
+ outputOp[i] = new ResultWriterOperatorDescriptor(spec, rsId, true,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i], locations);
}
@@ -96,10 +103,10 @@
for (int i = 0; i < outputArity; i++) {
spec.addRoot(outputOp[i]);
}
- runTest(spec);
-
+ String[] expectedResultsFileNames = new String[outputArity];
for (int i = 0; i < outputArity; i++) {
- compareFiles(inputFileName, outputFile[i].getAbsolutePath());
+ expectedResultsFileNames[i] = inputFileName;
}
+ runTestAndCompareResults(spec, expectedResultsFileNames);
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 622942b..b5eb850 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -14,9 +14,7 @@
*/
package edu.uci.ics.hyracks.tests.integration;
-import java.io.DataOutput;
import java.io.File;
-import java.io.IOException;
import org.junit.Test;
@@ -25,11 +23,10 @@
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
@@ -47,12 +44,13 @@
import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.join.GraceHashJoinOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.misc.MaterializingOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
import edu.uci.ics.hyracks.tests.util.NoopNullWriterFactory;
+import edu.uci.ics.hyracks.tests.util.ResultSerializerFactoryProvider;
public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
@@ -127,9 +125,11 @@
custOrderJoinDesc, 128);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ spec.addResultSetId(rsId);
+
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -208,9 +208,11 @@
custOrderJoinDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ spec.addResultSetId(rsId);
+
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -289,9 +291,11 @@
custOrderJoinDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ spec.addResultSetId(rsId);
+
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -371,9 +375,11 @@
custOrderJoinDesc, true, nullWriterFactories, 128);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ spec.addResultSetId(rsId);
+
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -457,9 +463,11 @@
custOrderJoinDesc, true, nullWriterFactories);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ spec.addResultSetId(rsId);
+
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -543,9 +551,11 @@
custOrderJoinDesc, true, nullWriterFactories);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ spec.addResultSetId(rsId);
+
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -622,9 +632,11 @@
custOrderJoinDesc, 128);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ spec.addResultSetId(rsId);
+
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
@@ -711,9 +723,11 @@
custOrderJoinDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ spec.addResultSetId(rsId);
+
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
@@ -800,9 +814,11 @@
custOrderJoinDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ spec.addResultSetId(rsId);
+
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
@@ -885,9 +901,11 @@
custOrderJoinDesc, 128);
PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ spec.addResultSetId(rsId);
+
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
@@ -976,9 +994,11 @@
custOrderJoinDesc, 128);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ spec.addResultSetId(rsId);
+
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordPartConn = new MToNPartitioningConnectorDescriptor(spec,
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
index 9233e39..99f2d18 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
@@ -15,7 +15,9 @@
package edu.uci.ics.hyracks.tests.integration;
import java.io.File;
+
import org.junit.Test;
+
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -28,6 +30,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
@@ -42,9 +45,10 @@
import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
import edu.uci.ics.hyracks.tests.util.NoopNullWriterFactory;
+import edu.uci.ics.hyracks.tests.util.ResultSerializerFactoryProvider;
public class TPCHCustomerOrderNestedLoopJoinTest extends AbstractIntegrationTest {
private static class JoinComparatorFactory implements ITuplePairComparatorFactory {
@@ -169,9 +173,11 @@
null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ spec.addResultSetId(rsId);
+
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -244,9 +250,11 @@
null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ spec.addResultSetId(rsId);
+
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -319,9 +327,11 @@
null);
PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ spec.addResultSetId(rsId);
+
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -399,9 +409,11 @@
nullWriterFactories);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ spec.addResultSetId(rsId);
+
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java
index 37b55b8..5b323e6 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java
@@ -22,6 +22,7 @@
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
@@ -33,8 +34,9 @@
import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.union.UnionAllOperatorDescriptor;
+import edu.uci.ics.hyracks.tests.util.ResultSerializerFactoryProvider;
public class UnionTest extends AbstractIntegrationTest {
@Test
@@ -65,10 +67,11 @@
UnionAllOperatorDescriptor unionAll = new UnionAllOperatorDescriptor(spec, 2, desc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, unionAll, NC2_ID, NC1_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] {
- new FileSplit(NC2_ID, createTempFile().getAbsolutePath()),
- new FileSplit(NC1_ID, createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ spec.addResultSetId(rsId);
+
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), csvScanner01, 0, unionAll, 0);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/util/ResultSerializerFactoryProvider.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/util/ResultSerializerFactoryProvider.java
new file mode 100644
index 0000000..19c4475
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/util/ResultSerializerFactoryProvider.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.util;
+
+import java.io.DataInputStream;
+import java.io.PrintStream;
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializer;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+
+public class ResultSerializerFactoryProvider implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public static final ResultSerializerFactoryProvider INSTANCE = new ResultSerializerFactoryProvider();
+
+ private ResultSerializerFactoryProvider() {
+ }
+
+ public IResultSerializerFactory getResultSerializerFactoryProvider() {
+ return new IResultSerializerFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IResultSerializer createResultSerializer(final RecordDescriptor recordDesc,
+ final PrintStream printStream) {
+ return new IResultSerializer() {
+ private static final long serialVersionUID = 1L;
+
+ ByteBufferInputStream bbis = new ByteBufferInputStream();
+ DataInputStream di = new DataInputStream(bbis);
+
+ @Override
+ public void init() throws HyracksDataException {
+
+ }
+
+ @Override
+ public boolean appendTuple(IFrameTupleAccessor tAccess, int tIdx) throws HyracksDataException {
+ int start = tAccess.getTupleStartOffset(tIdx) + tAccess.getFieldSlotsLength();
+
+ bbis.setByteBuffer(tAccess.getBuffer(), start);
+
+ Object[] record = new Object[recordDesc.getFieldCount()];
+ for (int i = 0; i < record.length; ++i) {
+ Object instance = recordDesc.getFields()[i].deserialize(di);
+ if (i == 0) {
+ printStream.print(String.valueOf(instance));
+ } else {
+ printStream.print(", " + String.valueOf(instance));
+ }
+ }
+ printStream.println();
+ return true;
+ }
+ };
+ }
+ };
+ }
+}
diff --git a/hyracks/hyracks-examples/text-example/pom.xml b/hyracks/hyracks-examples/text-example/pom.xml
index 016897e..748371f 100644
--- a/hyracks/hyracks-examples/text-example/pom.xml
+++ b/hyracks/hyracks-examples/text-example/pom.xml
@@ -13,6 +13,6 @@
<modules>
<module>texthelper</module>
<module>textclient</module>
- <module>textapp</module>
+ <module>textserver</module>
</modules>
</project>
diff --git a/hyracks/hyracks-examples/text-example/textapp/src/main/assembly/app-assembly.xml b/hyracks/hyracks-examples/text-example/textapp/src/main/assembly/app-assembly.xml
deleted file mode 100644
index 43ace6c..0000000
--- a/hyracks/hyracks-examples/text-example/textapp/src/main/assembly/app-assembly.xml
+++ /dev/null
@@ -1,13 +0,0 @@
-<assembly>
- <id>app-assembly</id>
- <formats>
- <format>zip</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <fileSets>
- <fileSet>
- <directory>target/application/lib</directory>
- <outputDirectory>lib</outputDirectory>
- </fileSet>
- </fileSets>
-</assembly>
diff --git a/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java b/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
index 943f232..18813a7 100644
--- a/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
+++ b/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
@@ -74,9 +74,6 @@
@Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
public int port = 1098;
- @Option(name = "-app", usage = "Hyracks Application name", required = true)
- public String app;
-
@Option(name = "-infile-splits", usage = "Comma separated list of file-splits for the input. A file-split is <node-name>:<path>", required = true)
public String inFileSplits;
@@ -122,7 +119,7 @@
System.out.print(i + "\t" + (System.currentTimeMillis() - start));
start = System.currentTimeMillis();
- JobId jobId = hcc.startJob(options.app, job);
+ JobId jobId = hcc.startJob(job);
hcc.waitForCompletion(jobId);
System.out.println("\t" + (System.currentTimeMillis() - start));
}
diff --git a/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java b/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
index 31019ab..5b6fad9 100644
--- a/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
+++ b/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
@@ -65,9 +65,6 @@
@Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
public int port = 1098;
- @Option(name = "-app", usage = "Hyracks Application name", required = true)
- public String app;
-
@Option(name = "-infile-splits", usage = "Comma separated list of file-splits for the input. A file-split is <node-name>:<path>", required = true)
public String inFileSplits;
@@ -101,8 +98,8 @@
options.algo, options.htSize, options.sbSize, options.format);
long start = System.currentTimeMillis();
- JobId jobId = hcc.startJob(options.app, job, options.runtimeProfiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME)
- : EnumSet.noneOf(JobFlag.class));
+ JobId jobId = hcc.startJob(job,
+ options.runtimeProfiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
hcc.waitForCompletion(jobId);
long end = System.currentTimeMillis();
System.err.println(start + " " + end + " " + (end - start));
diff --git a/hyracks/hyracks-examples/text-example/textapp/data/file1.txt b/hyracks/hyracks-examples/text-example/textserver/data/file1.txt
similarity index 100%
rename from hyracks/hyracks-examples/text-example/textapp/data/file1.txt
rename to hyracks/hyracks-examples/text-example/textserver/data/file1.txt
diff --git a/hyracks/hyracks-examples/text-example/textapp/data/file2.txt b/hyracks/hyracks-examples/text-example/textserver/data/file2.txt
similarity index 100%
rename from hyracks/hyracks-examples/text-example/textapp/data/file2.txt
rename to hyracks/hyracks-examples/text-example/textserver/data/file2.txt
diff --git a/hyracks/hyracks-examples/text-example/textapp/pom.xml b/hyracks/hyracks-examples/text-example/textserver/pom.xml
similarity index 63%
rename from hyracks/hyracks-examples/text-example/textapp/pom.xml
rename to hyracks/hyracks-examples/text-example/textserver/pom.xml
index fe5fc25..1b1f2d4 100644
--- a/hyracks/hyracks-examples/text-example/textapp/pom.xml
+++ b/hyracks/hyracks-examples/text-example/textserver/pom.xml
@@ -1,58 +1,42 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
- <artifactId>textapp</artifactId>
- <name>textapp</name>
+ <groupId>edu.uci.ics.hyracks.examples.text</groupId>
+ <artifactId>textserver</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <name>textserver</name>
<parent>
- <groupId>edu.uci.ics.hyracks</groupId>
+ <groupId>edu.uci.ics.hyracks.examples</groupId>
<artifactId>text-example</artifactId>
<version>0.2.3-SNAPSHOT</version>
</parent>
<build>
- <pluginManagement>
- <plugins>
- <plugin>
- <groupId>org.eclipse.m2e</groupId>
- <artifactId>lifecycle-mapping</artifactId>
- <version>1.0.0</version>
- <configuration>
- <lifecycleMappingMetadata>
- <pluginExecutions>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <versionRange>[1.0.0,)</versionRange>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore />
- </action>
- </pluginExecution>
- </pluginExecutions>
- </lifecycleMappingMetadata>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
-
<plugins>
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <version>1.3</version>
<executions>
<execution>
- <id>copy-dependencies</id>
+ <configuration>
+ <programs>
+ <program>
+ <mainClass>edu.uci.ics.hyracks.control.cc.CCDriver</mainClass>
+ <name>hyrackscc</name>
+ </program>
+ <program>
+ <mainClass>edu.uci.ics.hyracks.control.nc.NCDriver</mainClass>
+ <name>hyracksnc</name>
+ </program>
+ </programs>
+ <repositoryLayout>flat</repositoryLayout>
+ <repositoryName>lib</repositoryName>
+ </configuration>
<phase>package</phase>
<goals>
- <goal>copy-dependencies</goal>
+ <goal>assemble</goal>
</goals>
- <configuration>
- <outputDirectory>target/application/lib</outputDirectory>
- </configuration>
</execution>
</executions>
</plugin>
@@ -63,7 +47,7 @@
<execution>
<configuration>
<descriptors>
- <descriptor>src/main/assembly/app-assembly.xml</descriptor>
+ <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
</descriptors>
</configuration>
<phase>package</phase>
@@ -78,8 +62,7 @@
<artifactId>hyracks-virtualcluster-maven-plugin</artifactId>
<version>0.2.3-SNAPSHOT</version>
<configuration>
- <hyracksServerHome>${basedir}/../../../hyracks-server/target/hyracks-server-${project.version}-binary-assembly</hyracksServerHome>
- <hyracksCLIHome>${basedir}/../../../hyracks-cli/target/hyracks-cli-${project.version}-binary-assembly</hyracksCLIHome>
+ <hyracksServerHome>${basedir}/target/textserver-${project.version}-binary-assembly</hyracksServerHome>
<jvmOptions>${jvm.extraargs}</jvmOptions>
</configuration>
<executions>
@@ -115,18 +98,6 @@
</configuration>
</execution>
<execution>
- <id>deploy-app</id>
- <phase>pre-integration-test</phase>
- <goals>
- <goal>deploy-app</goal>
- </goals>
- <configuration>
- <ccHost>localhost</ccHost>
- <appName>text</appName>
- <harFile>${project.build.directory}/textapp-${project.version}-app-assembly.zip</harFile>
- </configuration>
- </execution>
- <execution>
<id>stop-services</id>
<phase>post-integration-test</phase>
<goals>
@@ -140,9 +111,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
- <fork>true</fork>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
<plugin>
@@ -163,13 +133,27 @@
</build>
<dependencies>
<dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
+ <groupId>edu.uci.ics.hyracks.examples.text</groupId>
<artifactId>texthelper</artifactId>
<version>0.2.3-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-cc</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-nc</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks.examples.text</groupId>
<artifactId>textclient</artifactId>
<version>0.2.3-SNAPSHOT</version>
<type>jar</type>
diff --git a/hyracks/hyracks-examples/text-example/textserver/src/main/assembly/binary-assembly.xml b/hyracks/hyracks-examples/text-example/textserver/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..cd598d9
--- /dev/null
+++ b/hyracks/hyracks-examples/text-example/textserver/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,23 @@
+<assembly>
+ <id>binary-assembly</id>
+ <formats>
+ <format>zip</format>
+ <format>dir</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>target/appassembler/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>target/appassembler/lib</directory>
+ <outputDirectory>lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>docs</directory>
+ <outputDirectory>docs</outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/hyracks/hyracks-examples/text-example/textapp/src/test/java/edu/uci/ics/hyracks/examples/text/test/WordCountIT.java b/hyracks/hyracks-examples/text-example/textserver/src/test/java/edu/uci/ics/hyracks/examples/text/test/WordCountIT.java
similarity index 90%
rename from hyracks/hyracks-examples/text-example/textapp/src/test/java/edu/uci/ics/hyracks/examples/text/test/WordCountIT.java
rename to hyracks/hyracks-examples/text-example/textserver/src/test/java/edu/uci/ics/hyracks/examples/text/test/WordCountIT.java
index 9659288..76af72d 100644
--- a/hyracks/hyracks-examples/text-example/textapp/src/test/java/edu/uci/ics/hyracks/examples/text/test/WordCountIT.java
+++ b/hyracks/hyracks-examples/text-example/textserver/src/test/java/edu/uci/ics/hyracks/examples/text/test/WordCountIT.java
@@ -10,7 +10,7 @@
@Test
public void runWordCount() throws Exception {
WordCountMain.main(new String[] { "-host", "localhost", "-infile-splits", getInfileSplits(), "-outfile-splits",
- getOutfileSplits(), "-algo", "-hash", "-app", "text" });
+ getOutfileSplits(), "-algo", "-hash" });
}
private String getInfileSplits() {
@@ -22,4 +22,4 @@
return "NC1:" + new File("target/wc1.txt").getAbsolutePath() + ",NC2:"
+ new File("target/wc2.txt").getAbsolutePath();
}
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-examples/tpch-example/pom.xml b/hyracks/hyracks-examples/tpch-example/pom.xml
index ccfe002..79053e8 100644
--- a/hyracks/hyracks-examples/tpch-example/pom.xml
+++ b/hyracks/hyracks-examples/tpch-example/pom.xml
@@ -12,6 +12,6 @@
<modules>
<module>tpchclient</module>
- <module>tpchapp</module>
+ <module>tpchserver</module>
</modules>
</project>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchapp/pom.xml b/hyracks/hyracks-examples/tpch-example/tpchapp/pom.xml
deleted file mode 100644
index 354cc16..0000000
--- a/hyracks/hyracks-examples/tpch-example/tpchapp/pom.xml
+++ /dev/null
@@ -1,90 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <artifactId>tpchapp</artifactId>
- <name>tpchapp</name>
- <parent>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>tpch-example</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- </parent>
-
- <build>
- <pluginManagement>
- <plugins>
- <plugin>
- <groupId>org.eclipse.m2e</groupId>
- <artifactId>lifecycle-mapping</artifactId>
- <version>1.0.0</version>
- <configuration>
- <lifecycleMappingMetadata>
- <pluginExecutions>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <versionRange>[1.0.0,)</versionRange>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore />
- </action>
- </pluginExecution>
- </pluginExecutions>
- </lifecycleMappingMetadata>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
-
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <executions>
- <execution>
- <id>copy-dependencies</id>
- <phase>package</phase>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- <configuration>
- <outputDirectory>target/application/lib</outputDirectory>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>2.2-beta-5</version>
- <executions>
- <execution>
- <configuration>
- <descriptors>
- <descriptor>src/main/assembly/app-assembly.xml</descriptor>
- </descriptors>
- </configuration>
- <phase>package</phase>
- <goals>
- <goal>attached</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- <dependencies>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-dataflow-std</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-data-std</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- </dependency>
- </dependencies>
-</project>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchapp/src/main/assembly/app-assembly.xml b/hyracks/hyracks-examples/tpch-example/tpchapp/src/main/assembly/app-assembly.xml
deleted file mode 100644
index 43ace6c..0000000
--- a/hyracks/hyracks-examples/tpch-example/tpchapp/src/main/assembly/app-assembly.xml
+++ /dev/null
@@ -1,13 +0,0 @@
-<assembly>
- <id>app-assembly</id>
- <formats>
- <format>zip</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <fileSets>
- <fileSet>
- <directory>target/application/lib</directory>
- <outputDirectory>lib</outputDirectory>
- </fileSet>
- </fileSets>
-</assembly>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index 0ad0ff0..6df9ff8 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -72,9 +72,6 @@
@Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)", required = false)
public int port = 1098;
- @Option(name = "-app", usage = "Hyracks Application name", required = true)
- public String app;
-
@Option(name = "-infile-customer-splits", usage = "Comma separated list of file-splits for the CUSTOMER input. A file-split is <node-name>:<path>", required = true)
public String inFileCustomerSplits;
@@ -127,7 +124,7 @@
options.graceFactor, options.memSize, options.tableSize, options.hasGroupBy);
long start = System.currentTimeMillis();
- JobId jobId = hcc.startJob(options.app, job,
+ JobId jobId = hcc.startJob(job,
options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
hcc.waitForCompletion(jobId);
long end = System.currentTimeMillis();
diff --git a/hyracks/hyracks-examples/tpch-example/tpchserver/pom.xml b/hyracks/hyracks-examples/tpch-example/tpchserver/pom.xml
new file mode 100644
index 0000000..94d897a
--- /dev/null
+++ b/hyracks/hyracks-examples/tpch-example/tpchserver/pom.xml
@@ -0,0 +1,89 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.hyracks.examples.tpch</groupId>
+ <artifactId>tpchserver</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <name>tpchserver</name>
+
+ <parent>
+ <groupId>edu.uci.ics.hyracks.examples</groupId>
+ <artifactId>tpch-example</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <version>1.3</version>
+ <executions>
+ <execution>
+ <configuration>
+ <programs>
+ <program>
+ <mainClass>edu.uci.ics.hyracks.control.cc.CCDriver</mainClass>
+ <name>hyrackscc</name>
+ </program>
+ <program>
+ <mainClass>edu.uci.ics.hyracks.control.nc.NCDriver</mainClass>
+ <name>hyracksnc</name>
+ </program>
+ </programs>
+ <repositoryLayout>flat</repositoryLayout>
+ <repositoryName>lib</repositoryName>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>assemble</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2-beta-5</version>
+ <executions>
+ <execution>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>attached</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-std</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-data-std</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-cc</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-nc</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchserver/src/main/assembly/binary-assembly.xml b/hyracks/hyracks-examples/tpch-example/tpchserver/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..cd598d9
--- /dev/null
+++ b/hyracks/hyracks-examples/tpch-example/tpchserver/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,23 @@
+<assembly>
+ <id>binary-assembly</id>
+ <formats>
+ <format>zip</format>
+ <format>dir</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>target/appassembler/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>target/appassembler/lib</directory>
+ <outputDirectory>lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>docs</directory>
+ <outputDirectory>docs</outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>