Added onlineaggregation example
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_online_aggregation@195 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/onlineaggregation-example/.project b/hyracks-examples/onlineaggregation-example/.project
new file mode 100644
index 0000000..04d3146
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/.project
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>onlineaggregation-example</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.maven.ide.eclipse.maven2Builder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.maven.ide.eclipse.maven2Nature</nature>
+ </natures>
+</projectDescription>
diff --git a/hyracks-examples/onlineaggregation-example/.settings/org.maven.ide.eclipse.prefs b/hyracks-examples/onlineaggregation-example/.settings/org.maven.ide.eclipse.prefs
new file mode 100644
index 0000000..53f6f77
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/.settings/org.maven.ide.eclipse.prefs
@@ -0,0 +1,9 @@
+#Thu Nov 11 13:12:17 PST 2010
+activeProfiles=
+eclipse.preferences.version=1
+fullBuildGoals=process-test-resources
+includeModules=false
+resolveWorkspaceProjects=true
+resourceFilterGoals=process-resources resources\:testResources
+skipCompilerPlugin=true
+version=1
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationapp/.classpath b/hyracks-examples/onlineaggregation-example/onlineaggregationapp/.classpath
new file mode 100644
index 0000000..3f62785
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationapp/.classpath
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/J2SE-1.4"/>
+ <classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
+ <classpathentry kind="output" path="target/classes"/>
+</classpath>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationapp/.project b/hyracks-examples/onlineaggregation-example/onlineaggregationapp/.project
new file mode 100644
index 0000000..4c98f7a
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationapp/.project
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>onlineaggregationapp</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.maven.ide.eclipse.maven2Builder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ <nature>org.maven.ide.eclipse.maven2Nature</nature>
+ </natures>
+</projectDescription>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationapp/.settings/org.eclipse.jdt.core.prefs b/hyracks-examples/onlineaggregation-example/onlineaggregationapp/.settings/org.eclipse.jdt.core.prefs
new file mode 100644
index 0000000..5ee0b93
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationapp/.settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,6 @@
+#Thu Nov 11 13:12:17 PST 2010
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.4
+org.eclipse.jdt.core.compiler.compliance=1.4
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
+org.eclipse.jdt.core.compiler.source=1.4
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationapp/.settings/org.maven.ide.eclipse.prefs b/hyracks-examples/onlineaggregation-example/onlineaggregationapp/.settings/org.maven.ide.eclipse.prefs
new file mode 100644
index 0000000..53f6f77
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationapp/.settings/org.maven.ide.eclipse.prefs
@@ -0,0 +1,9 @@
+#Thu Nov 11 13:12:17 PST 2010
+activeProfiles=
+eclipse.preferences.version=1
+fullBuildGoals=process-test-resources
+includeModules=false
+resolveWorkspaceProjects=true
+resourceFilterGoals=process-resources resources\:testResources
+skipCompilerPlugin=true
+version=1
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationapp/pom.xml b/hyracks-examples/onlineaggregation-example/onlineaggregationapp/pom.xml
new file mode 100644
index 0000000..6e95f3c
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationapp/pom.xml
@@ -0,0 +1,58 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.hyracks.examples.onlineaggregation</groupId>
+ <artifactId>onlineaggregationapp</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+
+ <parent>
+ <groupId>edu.uci.ics.hyracks.examples</groupId>
+ <artifactId>onlineaggregation-example</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-dependencies</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>target/application/lib</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2-beta-5</version>
+ <executions>
+ <execution>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/app-assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>attached</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks.examples.onlineaggregation</groupId>
+ <artifactId>onlineaggregationhelper</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationapp/src/main/assembly/app-assembly.xml b/hyracks-examples/onlineaggregation-example/onlineaggregationapp/src/main/assembly/app-assembly.xml
new file mode 100644
index 0000000..43ace6c
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationapp/src/main/assembly/app-assembly.xml
@@ -0,0 +1,13 @@
+<assembly>
+ <id>app-assembly</id>
+ <formats>
+ <format>zip</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>target/application/lib</directory>
+ <outputDirectory>lib</outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/.classpath b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/.classpath
new file mode 100644
index 0000000..1f3c1ff
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/.classpath
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="src" output="target/classes" path="src/main/java"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
+ <classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
+ <classpathentry kind="output" path="target/classes"/>
+</classpath>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/.project b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/.project
new file mode 100644
index 0000000..38c4876
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/.project
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>onlineaggregationclient</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.maven.ide.eclipse.maven2Builder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ <nature>org.maven.ide.eclipse.maven2Nature</nature>
+ </natures>
+</projectDescription>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/.settings/org.eclipse.jdt.core.prefs b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/.settings/org.eclipse.jdt.core.prefs
new file mode 100644
index 0000000..96d3d9f
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/.settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,6 @@
+#Thu Nov 11 13:12:17 PST 2010
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
+org.eclipse.jdt.core.compiler.compliance=1.6
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
+org.eclipse.jdt.core.compiler.source=1.6
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/.settings/org.maven.ide.eclipse.prefs b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/.settings/org.maven.ide.eclipse.prefs
new file mode 100644
index 0000000..53f6f77
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/.settings/org.maven.ide.eclipse.prefs
@@ -0,0 +1,9 @@
+#Thu Nov 11 13:12:17 PST 2010
+activeProfiles=
+eclipse.preferences.version=1
+fullBuildGoals=process-test-resources
+includeModules=false
+resolveWorkspaceProjects=true
+resourceFilterGoals=process-resources resources\:testResources
+skipCompilerPlugin=true
+version=1
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/pom.xml b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/pom.xml
new file mode 100644
index 0000000..ec9b303
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/pom.xml
@@ -0,0 +1,80 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.hyracks.examples.onlineaggregation</groupId>
+ <artifactId>onlineaggregationclient</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+
+ <parent>
+ <groupId>edu.uci.ics.hyracks.examples</groupId>
+ <artifactId>onlineaggregation-example</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+ </parent>
+
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-std</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks.examples.onlineaggregation</groupId>
+ <artifactId>onlineaggregationhelper</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <configuration>
+ <programs>
+ <program>
+ <mainClass>edu.uci.ics.hyracks.examples.onlineaggregation.client.WordCountMain</mainClass>
+ <name>onlineaggregationclient</name>
+ </program>
+ </programs>
+ <repositoryLayout>flat</repositoryLayout>
+ <repositoryName>lib</repositoryName>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>assemble</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2-beta-5</version>
+ <executions>
+ <execution>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>attached</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/assembly/binary-assembly.xml b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..0500499
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,19 @@
+<assembly>
+ <id>binary-assembly</id>
+ <formats>
+ <format>zip</format>
+ <format>dir</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>target/appassembler/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>target/appassembler/lib</directory>
+ <outputDirectory>lib</outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/.classpath b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/.classpath
new file mode 100644
index 0000000..1f3c1ff
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/.classpath
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="src" output="target/classes" path="src/main/java"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
+ <classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
+ <classpathentry kind="output" path="target/classes"/>
+</classpath>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/.project b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/.project
new file mode 100644
index 0000000..832bfd9
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/.project
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>onlineaggregationhelper</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.maven.ide.eclipse.maven2Builder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ <nature>org.maven.ide.eclipse.maven2Nature</nature>
+ </natures>
+</projectDescription>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/.settings/org.eclipse.jdt.core.prefs b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/.settings/org.eclipse.jdt.core.prefs
new file mode 100644
index 0000000..96d3d9f
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/.settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,6 @@
+#Thu Nov 11 13:12:17 PST 2010
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
+org.eclipse.jdt.core.compiler.compliance=1.6
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
+org.eclipse.jdt.core.compiler.source=1.6
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/.settings/org.maven.ide.eclipse.prefs b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/.settings/org.maven.ide.eclipse.prefs
new file mode 100644
index 0000000..53f6f77
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/.settings/org.maven.ide.eclipse.prefs
@@ -0,0 +1,9 @@
+#Thu Nov 11 13:12:17 PST 2010
+activeProfiles=
+eclipse.preferences.version=1
+fullBuildGoals=process-test-resources
+includeModules=false
+resolveWorkspaceProjects=true
+resourceFilterGoals=process-resources resources\:testResources
+skipCompilerPlugin=true
+version=1
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/pom.xml b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/pom.xml
new file mode 100644
index 0000000..3c8dab4
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/pom.xml
@@ -0,0 +1,47 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.hyracks.examples.onlineaggregation</groupId>
+ <artifactId>onlineaggregationhelper</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+
+ <parent>
+ <groupId>edu.uci.ics.hyracks.examples</groupId>
+ <artifactId>onlineaggregation-example</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+ </parent>
+
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-std</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-hadoop</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HadoopHelper.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HadoopHelper.java
new file mode 100644
index 0000000..a16c63d
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HadoopHelper.java
@@ -0,0 +1,186 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.onlineaggregation;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.HadoopNewPartitionerTuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+
+public class HadoopHelper {
+ public static final int KEY_FIELD_INDEX = 0;
+ public static final int VALUE_FIELD_INDEX = 1;
+ public static final int BLOCKID_FIELD_INDEX = 2;
+ private static final int[] KEY_SORT_FIELDS = new int[] { 0 };
+
+ private MarshalledWritable<Configuration> mConfig;
+ private Configuration config;
+ private Job job;
+
+ public HadoopHelper(MarshalledWritable<Configuration> mConfig) throws HyracksDataException {
+ this.mConfig = mConfig;
+ try {
+ config = mConfig.get();
+ job = new Job(config);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public RecordDescriptor getMapOutputRecordDescriptor() throws HyracksDataException {
+ try {
+ return new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+ .getMapOutputKeyClass()),
+ DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+ .getMapOutputValueClass()), IntegerSerializerDeserializer.INSTANCE });
+
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public <K1, V1, K2, V2> Mapper<K1, V1, K2, V2> getMapper() throws HyracksDataException {
+ try {
+ return (Mapper<K1, V1, K2, V2>) HadoopTools.newInstance(job.getMapperClass());
+ } catch (ClassNotFoundException e) {
+ throw new HyracksDataException(e);
+ } catch (InstantiationException e) {
+ throw new HyracksDataException(e);
+ } catch (IllegalAccessException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public <K2, V2, K3, V3> Reducer<K2, V2, K3, V3> getReducer() throws HyracksDataException {
+ try {
+ return (Reducer<K2, V2, K3, V3>) HadoopTools.newInstance(job.getReducerClass());
+ } catch (ClassNotFoundException e) {
+ throw new HyracksDataException(e);
+ } catch (InstantiationException e) {
+ throw new HyracksDataException(e);
+ } catch (IllegalAccessException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public <K, V> InputFormat<K, V> getInputFormat() throws HyracksDataException {
+ try {
+ return (InputFormat<K, V>) ReflectionUtils.newInstance(job.getInputFormatClass(), config);
+ } catch (ClassNotFoundException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public IBinaryComparatorFactory[] getSortComparatorFactories() {
+ WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(job
+ .getSortComparator().getClass());
+
+ return new IBinaryComparatorFactory[] { comparatorFactory };
+ }
+
+ public IBinaryComparatorFactory[] getGroupingComparatorFactories() {
+ WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(job
+ .getGroupingComparator().getClass());
+
+ return new IBinaryComparatorFactory[] { comparatorFactory };
+ }
+
+ public RawComparator<?> getRawGroupingComparator() {
+ return job.getGroupingComparator();
+ }
+
+ public int getSortFrameLimit(IHyracksContext ctx) {
+ int sortMemory = job.getConfiguration().getInt("io.sort.mb", 100);
+ return (int) (((long) sortMemory * 1024 * 1024) / ctx.getFrameSize());
+ }
+
+ public Job getJob() {
+ return job;
+ }
+
+ public MarshalledWritable<Configuration> getMarshalledConfiguration() {
+ return mConfig;
+ }
+
+ public Configuration getConfiguration() {
+ return config;
+ }
+
+ public ITuplePartitionComputerFactory getTuplePartitionComputer() throws HyracksDataException {
+ int nReducers = job.getNumReduceTasks();
+ try {
+ return new HadoopNewPartitionerTuplePartitionComputerFactory<Writable, Writable>(
+ (Class<? extends Partitioner<Writable, Writable>>) job.getPartitionerClass(),
+ (ISerializerDeserializer<Writable>) DatatypeHelper
+ .createSerializerDeserializer((Class<? extends Writable>) job.getMapOutputKeyClass()),
+ (ISerializerDeserializer<Writable>) DatatypeHelper
+ .createSerializerDeserializer((Class<? extends Writable>) job.getMapOutputValueClass()));
+ } catch (ClassNotFoundException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public int[] getSortFields() {
+ return KEY_SORT_FIELDS;
+ }
+
+ public <K> ISerializerDeserializer<K> getMapOutputKeySerializerDeserializer() {
+ return (ISerializerDeserializer<K>) DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+ .getMapOutputKeyClass());
+ }
+
+ public <V> ISerializerDeserializer<V> getMapOutputValueSerializerDeserializer() {
+ return (ISerializerDeserializer<V>) DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+ .getMapOutputValueClass());
+ }
+
+ public FileSystem getFilesystem() throws HyracksDataException {
+ try {
+ return FileSystem.get(config);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public <K, V> OutputFormat<K, V> getOutputFormat() throws HyracksDataException {
+ try {
+ return (OutputFormat<K, V>) ReflectionUtils.newInstance(job.getOutputFormatClass(), config);
+ } catch (ClassNotFoundException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HadoopTools.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HadoopTools.java
new file mode 100644
index 0000000..f32a9f7
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HadoopTools.java
@@ -0,0 +1,19 @@
+package edu.uci.ics.hyracks.examples.onlineaggregation;
+
+public class HadoopTools {
+ public static Object newInstance(String className) throws ClassNotFoundException, InstantiationException,
+ IllegalAccessException {
+ ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(HadoopTools.class.getClassLoader());
+ Class<?> clazz = Class.forName(className, true, HadoopTools.class.getClassLoader());
+ return newInstance(clazz);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ }
+
+ public static Object newInstance(Class<?> clazz) throws InstantiationException, IllegalAccessException {
+ return clazz.newInstance();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HashPartitioningShuffleConnectorDescriptor.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HashPartitioningShuffleConnectorDescriptor.java
new file mode 100644
index 0000000..b36b371
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HashPartitioningShuffleConnectorDescriptor.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.onlineaggregation;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IEndpointDataWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.HashDataWriter;
+
+public class HashPartitioningShuffleConnectorDescriptor extends AbstractConnectorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ private final MarshalledWritable<Configuration> mConfig;
+
+ public HashPartitioningShuffleConnectorDescriptor(JobSpecification spec, MarshalledWritable<Configuration> mConfig) {
+ super(spec);
+ this.mConfig = mConfig;
+ }
+
+ @Override
+ public IFrameWriter createSendSideWriter(IHyracksContext ctx, RecordDescriptor recordDesc,
+ IEndpointDataWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+ throws HyracksDataException {
+ HadoopHelper helper = new HadoopHelper(mConfig);
+ ITuplePartitionComputerFactory tpcf = helper.getTuplePartitionComputer();
+ return new HashDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner());
+ }
+
+ @Override
+ public IFrameReader createReceiveSideReader(IHyracksContext ctx, RecordDescriptor recordDesc,
+ IConnectionDemultiplexer demux, int index, int nProducerPartitions, int nConsumerPartitions)
+ throws HyracksDataException {
+ return new ShuffleFrameReader(ctx, demux, mConfig);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProvider.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProvider.java
new file mode 100644
index 0000000..6c8c8e9
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProvider.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.onlineaggregation;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+public interface IInputSplitProvider {
+ public boolean next();
+
+ public InputSplit getInputSplit();
+
+ public int getBlockId();
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProviderFactory.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProviderFactory.java
new file mode 100644
index 0000000..dad5ec5
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProviderFactory.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.onlineaggregation;
+
+public interface IInputSplitProviderFactory {
+ public IInputSplitProvider createInputSplitProvider();
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MapperOperatorDescriptor.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MapperOperatorDescriptor.java
new file mode 100644
index 0000000..10f343f
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MapperOperatorDescriptor.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.onlineaggregation;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunMerger;
+
+public class MapperOperatorDescriptor<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable>
+ extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ private final MarshalledWritable<Configuration> config;
+ private final IInputSplitProviderFactory isProviderFactory;
+
+ public MapperOperatorDescriptor(JobSpecification spec, MarshalledWritable<Configuration> config,
+ IInputSplitProviderFactory isProviderFactory) throws HyracksDataException {
+ super(spec, 0, 1);
+ this.config = config;
+ this.isProviderFactory = isProviderFactory;
+ HadoopHelper helper = new HadoopHelper(config);
+ recordDescriptors[0] = helper.getMapOutputRecordDescriptor();
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ HadoopHelper helper = new HadoopHelper(config);
+ final Configuration conf = helper.getConfiguration();
+ final Mapper<K1, V1, K2, V2> mapper = helper.getMapper();
+ final InputFormat<K1, V1> inputFormat = helper.getInputFormat();
+ final IInputSplitProvider isp = isProviderFactory.createInputSplitProvider();
+ final TaskAttemptContext taskAttemptContext = new TaskAttemptContext(conf, null);
+
+ final int framesLimit = helper.getSortFrameLimit(ctx);
+ final IBinaryComparatorFactory[] comparatorFactories = helper.getSortComparatorFactories();
+
+ class SortingRecordWriter extends RecordWriter<K2, V2> {
+ private final ArrayTupleBuilder tb;
+ private final ByteBuffer frame;
+ private final FrameTupleAppender fta;
+ private ExternalSortRunGenerator runGen;
+ private int blockId;
+
+ public SortingRecordWriter() throws HyracksDataException {
+ tb = new ArrayTupleBuilder(3);
+ frame = ctx.getResourceManager().allocateFrame();
+ fta = new FrameTupleAppender(ctx);
+ fta.reset(frame, true);
+ }
+
+ public void initBlock(int blockId) {
+ runGen = new ExternalSortRunGenerator(ctx, new int[] { 0 }, null, comparatorFactories,
+ recordDescriptors[0], framesLimit);
+ this.blockId = blockId;
+ }
+
+ @Override
+ public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
+ if (fta.getTupleCount() > 0) {
+ runGen.nextFrame(frame);
+ fta.reset(frame, true);
+ }
+ }
+
+ @Override
+ public void write(K2 key, V2 value) throws IOException, InterruptedException {
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ key.write(dos);
+ tb.addFieldEndOffset();
+ value.write(dos);
+ tb.addFieldEndOffset();
+ dos.writeInt(blockId);
+ tb.addFieldEndOffset();
+ if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ runGen.nextFrame(frame);
+ fta.reset(frame, true);
+ if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ public void sortAndFlushBlock(final IFrameWriter writer) throws HyracksDataException {
+ IFrameWriter delegatingWriter = new IFrameWriter() {
+ @Override
+ public void open() throws HyracksDataException {
+ // do nothing
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ writer.nextFrame(buffer);
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ writer.flush();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ // do nothing
+ }
+ };
+ ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, runGen.getFrameSorter(),
+ runGen.getRuns(), new int[] { 0 }, comparatorFactories, recordDescriptors[0], framesLimit,
+ delegatingWriter);
+ merger.process();
+ }
+ }
+
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ @Override
+ public void initialize() throws HyracksDataException {
+ writer.open();
+ try {
+ SortingRecordWriter recordWriter = new SortingRecordWriter();
+ while (isp.next()) {
+ InputSplit is = isp.getInputSplit();
+ int blockId = isp.getBlockId();
+ try {
+ RecordReader<K1, V1> recordReader = inputFormat.createRecordReader(is, taskAttemptContext);
+ recordWriter.initBlock(blockId);
+ Mapper<K1, V1, K2, V2>.Context mCtx = mapper.new Context(conf, null, recordReader,
+ recordWriter, null, null, is);
+ mapper.run(mCtx);
+ recordWriter.sortAndFlushBlock(writer);
+ writer.flush();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ } finally {
+ writer.close();
+ }
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MarshalledWritable.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MarshalledWritable.java
new file mode 100644
index 0000000..4455c2b
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MarshalledWritable.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2010 University of California, Irvine
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.onlineaggregation;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.Writable;
+
+public class MarshalledWritable<T extends Writable> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private byte[] bytes;
+
+ public MarshalledWritable() {
+ }
+
+ public void set(T o) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ dos.writeUTF(o.getClass().getName());
+ o.write(dos);
+ dos.close();
+ bytes = baos.toByteArray();
+ }
+
+ public T get() throws Exception {
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ DataInputStream dis = new DataInputStream(bais);
+ String className = dis.readUTF();
+ T o = (T) HadoopTools.newInstance(className);
+ o.readFields(dis);
+ return o;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ReducerOperatorDescriptor.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ReducerOperatorDescriptor.java
new file mode 100644
index 0000000..8bedd02
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ReducerOperatorDescriptor.java
@@ -0,0 +1,253 @@
+package edu.uci.ics.hyracks.examples.onlineaggregation;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.Progress;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+public class ReducerOperatorDescriptor<K2, V2, K3, V3> extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ private MarshalledWritable<Configuration> mConfig;
+
+ public ReducerOperatorDescriptor(JobSpecification spec, MarshalledWritable<Configuration> mConfig) {
+ super(spec, 1, 0);
+ this.mConfig = mConfig;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ final HadoopHelper helper = new HadoopHelper(mConfig);
+ final Reducer<K2, V2, K3, V3> reducer = helper.getReducer();
+ final RecordDescriptor recordDescriptor = helper.getMapOutputRecordDescriptor();
+ final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx, recordDescriptor);
+ final FrameTupleAccessor accessor1 = new FrameTupleAccessor(ctx, recordDescriptor);
+ final ByteBuffer copyFrame = ctx.getResourceManager().allocateFrame();
+ accessor1.reset(copyFrame);
+ final int[] groupFields = helper.getSortFields();
+ IBinaryComparatorFactory[] groupingComparators = helper.getGroupingComparatorFactories();
+ final IBinaryComparator[] comparators = new IBinaryComparator[groupingComparators.length];
+ for (int i = 0; i < comparators.length; ++i) {
+ comparators[i] = groupingComparators[i].createBinaryComparator();
+ }
+
+ class KVIterator implements RawKeyValueIterator {
+ private FrameTupleAccessor accessor;
+ private DataInputBuffer kBuffer;
+ private DataInputBuffer vBuffer;
+ private List<ByteBuffer> buffers;
+ private int bSize;
+ private int bPtr;
+ private int tIdx;
+ private boolean eog;
+
+ public KVIterator() {
+ accessor = new FrameTupleAccessor(ctx, recordDescriptor);
+ kBuffer = new DataInputBuffer();
+ vBuffer = new DataInputBuffer();
+ }
+
+ void reset(List<ByteBuffer> buffers, int bSize) {
+ this.buffers = buffers;
+ this.bSize = bSize;
+ bPtr = 0;
+ tIdx = 0;
+ eog = false;
+ if (bSize > 0) {
+ accessor.reset(buffers.get(0));
+ tIdx = -1;
+ } else {
+ eog = true;
+ }
+ }
+
+ @Override
+ public DataInputBuffer getKey() throws IOException {
+ return kBuffer;
+ }
+
+ @Override
+ public DataInputBuffer getValue() throws IOException {
+ return vBuffer;
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ while (true) {
+ if (eog) {
+ return false;
+ }
+ ++tIdx;
+ if (accessor.getTupleCount() >= tIdx) {
+ ++bPtr;
+ if (bPtr >= bSize) {
+ eog = true;
+ continue;
+ }
+ tIdx = -1;
+ accessor.reset(buffers.get(bPtr));
+ continue;
+ }
+ kBuffer.reset(accessor.getBuffer().array(),
+ FrameUtils.getAbsoluteFieldStartOffset(accessor, tIdx, helper.KEY_FIELD_INDEX),
+ accessor.getFieldLength(tIdx, helper.KEY_FIELD_INDEX));
+ vBuffer.reset(accessor.getBuffer().array(),
+ FrameUtils.getAbsoluteFieldStartOffset(accessor, tIdx, helper.VALUE_FIELD_INDEX),
+ accessor.getFieldLength(tIdx, helper.VALUE_FIELD_INDEX));
+ break;
+ }
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public Progress getProgress() {
+ return null;
+ }
+ }
+
+ final KVIterator kvi = new KVIterator();
+ final TaskAttemptContext taskAttemptContext = new TaskAttemptContext(helper.getConfiguration(), null);
+ final RecordWriter<K3, V3> recordWriter;
+ try {
+ recordWriter = (RecordWriter<K3, V3>) helper.getOutputFormat().getRecordWriter(taskAttemptContext);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
+ private boolean first;
+ private boolean groupStarted;
+ private List<ByteBuffer> group;
+ private int bPtr;
+ private FrameTupleAppender fta;
+
+ @Override
+ public void open() throws HyracksDataException {
+ first = true;
+ groupStarted = false;
+ group = new ArrayList<ByteBuffer>();
+ bPtr = 0;
+ group.add(ctx.getResourceManager().allocateFrame());
+ fta = new FrameTupleAppender(ctx);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor0.reset(buffer);
+ int nTuples = accessor0.getTupleCount();
+ for (int i = 0; i < nTuples; ++i) {
+ if (first) {
+ groupInit();
+ first = false;
+ } else {
+ if (i == 0) {
+ switchGroupIfRequired(accessor1, accessor1.getTupleCount() - 1, accessor0, i);
+ } else {
+ switchGroupIfRequired(accessor0, i - 1, accessor0, i);
+ }
+ }
+ accumulate(accessor0, i);
+ }
+ FrameUtils.copy(buffer, copyFrame);
+ }
+
+ private void accumulate(FrameTupleAccessor accessor, int tIndex) {
+ if (!fta.append(accessor, tIndex)) {
+ ++bPtr;
+ if (group.size() <= bPtr) {
+ group.add(ctx.getResourceManager().allocateFrame());
+ }
+ fta.reset(group.get(bPtr), true);
+ if (!fta.append(accessor, tIndex)) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
+ FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
+ if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
+ reduce();
+ groupInit();
+ }
+ }
+
+ private void groupInit() {
+ groupStarted = true;
+ bPtr = 0;
+ fta.reset(group.get(0), true);
+ }
+
+ private void reduce() throws HyracksDataException {
+ kvi.reset(group, bPtr + 1);
+ try {
+ Reducer<K2, V2, K3, V3>.Context rCtx = reducer.new Context(helper.getConfiguration(), null, kvi,
+ null, null, recordWriter, null, null,
+ (RawComparator<K2>) helper.getRawGroupingComparator(), (Class<K2>) helper.getJob()
+ .getMapOutputKeyClass(), (Class<V2>) helper.getJob().getMapOutputValueClass());
+ reducer.run(rCtx);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ groupStarted = false;
+ }
+
+ private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) {
+ for (int i = 0; i < comparators.length; ++i) {
+ int fIdx = groupFields[i];
+ int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength()
+ + a1.getFieldStartOffset(t1Idx, fIdx);
+ int l1 = a1.getFieldLength(t1Idx, fIdx);
+ int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength()
+ + a2.getFieldStartOffset(t2Idx, fIdx);
+ int l2 = a2.getFieldLength(t2Idx, fIdx);
+ if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (groupStarted) {
+ reduce();
+ }
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ShuffleFrameReader.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ShuffleFrameReader.java
new file mode 100644
index 0000000..e03d0c9
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ShuffleFrameReader.java
@@ -0,0 +1,203 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.onlineaggregation;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
+import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunMerger;
+import edu.uci.ics.hyracks.dataflow.std.sort.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.std.sort.RunFileWriter;
+
+public class ShuffleFrameReader implements IFrameReader {
+ private final IHyracksContext ctx;
+ private final IConnectionDemultiplexer demux;
+ private final HadoopHelper helper;
+ private final RecordDescriptor recordDescriptor;
+ private List<File> runs;
+ private Map<File, Integer> file2BlockIdMap;
+ private Map<Integer, File> blockId2FileMap;
+ private int lastReadSender;
+ private RunFileReader reader;
+
+ public ShuffleFrameReader(IHyracksContext ctx, IConnectionDemultiplexer demux,
+ MarshalledWritable<Configuration> mConfig) throws HyracksDataException {
+ this.ctx = ctx;
+ this.demux = demux;
+ helper = new HadoopHelper(mConfig);
+ this.recordDescriptor = helper.getMapOutputRecordDescriptor();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ runs = new LinkedList<File>();
+ file2BlockIdMap = new HashMap<File, Integer>();
+ blockId2FileMap = new HashMap<Integer, File>();
+ int nSenders = demux.getSenderCount();
+ RunInfo[] infos = new RunInfo[nSenders];
+ FrameTupleAccessor accessor = new FrameTupleAccessor(ctx, recordDescriptor);
+ while (true) {
+ IConnectionEntry entry = demux.findNextReadyEntry(lastReadSender);
+ lastReadSender = (Integer) entry.getAttachment();
+ ByteBuffer netBuffer = entry.getReadBuffer();
+ accessor.reset(netBuffer);
+ int tupleCount = accessor.getTupleCount();
+ if (tupleCount == 0) {
+ int openEntries = demux.closeEntry(lastReadSender);
+ netBuffer.clear();
+ demux.unreadyEntry(lastReadSender);
+ if (openEntries == 0) {
+ break;
+ }
+ } else {
+ RunInfo info = infos[lastReadSender];
+ int nTuples = accessor.getTupleCount();
+ for (int i = 0; i < nTuples; ++i) {
+ int tBlockId = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+ FrameUtils.getAbsoluteFieldStartOffset(accessor, i, HadoopHelper.BLOCKID_FIELD_INDEX));
+ if (info == null) {
+ info = new RunInfo();
+ info.reset(tBlockId);
+ infos[lastReadSender] = info;
+ } else if (info.blockId != tBlockId) {
+ info.close();
+ info.reset(tBlockId);
+ }
+ info.write(accessor, i);
+ }
+ netBuffer.clear();
+ demux.unreadyEntry(lastReadSender);
+ }
+ }
+ for (int i = 0; i < infos.length; ++i) {
+ RunInfo info = infos[i];
+ if (info != null) {
+ info.close();
+ }
+ }
+ infos = null;
+
+ File outFile;
+ try {
+ outFile = ctx.getResourceManager().createFile(ShuffleFrameReader.class.getName(), ".run");
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ int framesLimit = helper.getSortFrameLimit(ctx);
+ ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, null, runs, new int[] { 0 },
+ helper.getSortComparatorFactories(), recordDescriptor, framesLimit, new RunFileWriter(outFile));
+ merger.process();
+
+ try {
+ reader = new RunFileReader(outFile);
+ } catch (FileNotFoundException e) {
+ throw new HyracksDataException(e);
+ }
+ reader.open();
+ }
+
+ @Override
+ public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ return reader.nextFrame(buffer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ reader.close();
+ }
+
+ private class RunInfo {
+ private final ByteBuffer buffer;
+ private final FrameTupleAppender fta;
+
+ private File file;
+ private RandomAccessFile raf;
+ private FileChannel channel;
+ private int blockId;
+
+ public RunInfo() {
+ buffer = ctx.getResourceManager().allocateFrame();
+ fta = new FrameTupleAppender(ctx);
+ }
+
+ public void reset(int blockId) throws HyracksDataException {
+ this.blockId = blockId;
+ fta.reset(buffer, true);
+ try {
+ file = ctx.getResourceManager().createFile(ShuffleFrameReader.class.getName(), ".run");
+ raf = new RandomAccessFile(file, "rw");
+ channel = raf.getChannel();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public void write(FrameTupleAccessor accessor, int tIdx) throws HyracksDataException {
+ if (!fta.append(accessor, tIdx)) {
+ flush();
+ if (!fta.append(accessor, tIdx)) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ public void close() throws HyracksDataException {
+ flush();
+ try {
+ channel.close();
+ raf.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ runs.add(file);
+ file2BlockIdMap.put(file, blockId);
+ blockId2FileMap.put(blockId, file);
+ }
+
+ private void flush() throws HyracksDataException {
+ if (fta.getTupleCount() <= 0) {
+ return;
+ }
+ buffer.limit(buffer.capacity());
+ buffer.position(0);
+ try {
+ channel.write(buffer);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ fta.reset(buffer, true);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/pom.xml b/hyracks-examples/onlineaggregation-example/pom.xml
new file mode 100644
index 0000000..f14e5ac
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/pom.xml
@@ -0,0 +1,19 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.hyracks.examples</groupId>
+ <artifactId>onlineaggregation-example</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+ <packaging>pom</packaging>
+
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-examples</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+ </parent>
+
+ <modules>
+ <module>onlineaggregationhelper</module>
+ <module>onlineaggregationclient</module>
+ <module>onlineaggregationapp</module>
+ </modules>
+</project>