Merged hyracks_dev_next -r 1287 into trunk

git-svn-id: https://hyracks.googlecode.com/svn/trunk@1288 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-dataflow-hadoop/.classpath b/hyracks/hyracks-dataflow-hadoop/.classpath
deleted file mode 100644
index 1f3c1ff..0000000
--- a/hyracks/hyracks-dataflow-hadoop/.classpath
+++ /dev/null
@@ -1,7 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<classpath>
-	<classpathentry kind="src" output="target/classes" path="src/main/java"/>
-	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
-	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
-	<classpathentry kind="output" path="target/classes"/>
-</classpath>
diff --git a/hyracks/hyracks-dataflow-hadoop/.project b/hyracks/hyracks-dataflow-hadoop/.project
deleted file mode 100644
index d6edecf..0000000
--- a/hyracks/hyracks-dataflow-hadoop/.project
+++ /dev/null
@@ -1,23 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<projectDescription>
-	<name>hyracks-dataflow-hadoop</name>
-	<comment></comment>
-	<projects>
-	</projects>
-	<buildSpec>
-		<buildCommand>
-			<name>org.eclipse.jdt.core.javabuilder</name>
-			<arguments>
-			</arguments>
-		</buildCommand>
-		<buildCommand>
-			<name>org.maven.ide.eclipse.maven2Builder</name>
-			<arguments>
-			</arguments>
-		</buildCommand>
-	</buildSpec>
-	<natures>
-		<nature>org.maven.ide.eclipse.maven2Nature</nature>
-		<nature>org.eclipse.jdt.core.javanature</nature>
-	</natures>
-</projectDescription>
diff --git a/hyracks/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs b/hyracks/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs
deleted file mode 100644
index 0272d6e..0000000
--- a/hyracks/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs
+++ /dev/null
@@ -1,274 +0,0 @@
-#Thu Jun 02 13:11:16 PDT 2011
-eclipse.preferences.version=1
-org.eclipse.jdt.core.codeComplete.argumentPrefixes=
-org.eclipse.jdt.core.codeComplete.argumentSuffixes=
-org.eclipse.jdt.core.codeComplete.fieldPrefixes=
-org.eclipse.jdt.core.codeComplete.fieldSuffixes=
-org.eclipse.jdt.core.codeComplete.localPrefixes=
-org.eclipse.jdt.core.codeComplete.localSuffixes=
-org.eclipse.jdt.core.codeComplete.staticFieldPrefixes=
-org.eclipse.jdt.core.codeComplete.staticFieldSuffixes=
-org.eclipse.jdt.core.codeComplete.staticFinalFieldPrefixes=
-org.eclipse.jdt.core.codeComplete.staticFinalFieldSuffixes=
-org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
-org.eclipse.jdt.core.compiler.compliance=1.6
-org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
-org.eclipse.jdt.core.compiler.source=1.6
-org.eclipse.jdt.core.formatter.align_type_members_on_columns=false
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_allocation_expression=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_enum_constant=48
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_call=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_method_invocation=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_qualified_allocation_expression=16
-org.eclipse.jdt.core.formatter.alignment_for_assignment=0
-org.eclipse.jdt.core.formatter.alignment_for_binary_expression=16
-org.eclipse.jdt.core.formatter.alignment_for_compact_if=16
-org.eclipse.jdt.core.formatter.alignment_for_conditional_expression=80
-org.eclipse.jdt.core.formatter.alignment_for_enum_constants=48
-org.eclipse.jdt.core.formatter.alignment_for_expressions_in_array_initializer=16
-org.eclipse.jdt.core.formatter.alignment_for_multiple_fields=16
-org.eclipse.jdt.core.formatter.alignment_for_parameters_in_constructor_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_parameters_in_method_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_selector_in_method_invocation=16
-org.eclipse.jdt.core.formatter.alignment_for_superclass_in_type_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_enum_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_type_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_constructor_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_method_declaration=16
-org.eclipse.jdt.core.formatter.blank_lines_after_imports=1
-org.eclipse.jdt.core.formatter.blank_lines_after_package=1
-org.eclipse.jdt.core.formatter.blank_lines_before_field=0
-org.eclipse.jdt.core.formatter.blank_lines_before_first_class_body_declaration=0
-org.eclipse.jdt.core.formatter.blank_lines_before_imports=1
-org.eclipse.jdt.core.formatter.blank_lines_before_member_type=1
-org.eclipse.jdt.core.formatter.blank_lines_before_method=1
-org.eclipse.jdt.core.formatter.blank_lines_before_new_chunk=1
-org.eclipse.jdt.core.formatter.blank_lines_before_package=0
-org.eclipse.jdt.core.formatter.blank_lines_between_import_groups=1
-org.eclipse.jdt.core.formatter.blank_lines_between_type_declarations=1
-org.eclipse.jdt.core.formatter.brace_position_for_annotation_type_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_anonymous_type_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_array_initializer=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_block=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_block_in_case=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_constructor_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_enum_constant=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_enum_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_method_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_switch=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_type_declaration=end_of_line
-org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_block_comment=false
-org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment=false
-org.eclipse.jdt.core.formatter.comment.format_block_comments=true
-org.eclipse.jdt.core.formatter.comment.format_header=false
-org.eclipse.jdt.core.formatter.comment.format_html=true
-org.eclipse.jdt.core.formatter.comment.format_javadoc_comments=true
-org.eclipse.jdt.core.formatter.comment.format_line_comments=true
-org.eclipse.jdt.core.formatter.comment.format_source_code=true
-org.eclipse.jdt.core.formatter.comment.indent_parameter_description=true
-org.eclipse.jdt.core.formatter.comment.indent_root_tags=true
-org.eclipse.jdt.core.formatter.comment.insert_new_line_before_root_tags=insert
-org.eclipse.jdt.core.formatter.comment.insert_new_line_for_parameter=insert
-org.eclipse.jdt.core.formatter.comment.line_length=80
-org.eclipse.jdt.core.formatter.compact_else_if=true
-org.eclipse.jdt.core.formatter.continuation_indentation=2
-org.eclipse.jdt.core.formatter.continuation_indentation_for_array_initializer=2
-org.eclipse.jdt.core.formatter.format_guardian_clause_on_one_line=false
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_annotation_declaration_header=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_constant_header=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_declaration_header=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_type_header=true
-org.eclipse.jdt.core.formatter.indent_breaks_compare_to_cases=true
-org.eclipse.jdt.core.formatter.indent_empty_lines=false
-org.eclipse.jdt.core.formatter.indent_statements_compare_to_block=true
-org.eclipse.jdt.core.formatter.indent_statements_compare_to_body=true
-org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_cases=true
-org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_switch=true
-org.eclipse.jdt.core.formatter.indentation.size=4
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_local_variable=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_member=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_parameter=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_opening_brace_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_at_end_of_file_if_missing=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_catch_in_try_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_closing_brace_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_else_in_if_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_finally_in_try_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_while_in_do_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_annotation_declaration=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_anonymous_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_block=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_constant=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_declaration=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_method_body=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_after_and_in_type_parameter=insert
-org.eclipse.jdt.core.formatter.insert_space_after_assignment_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation_type_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_binary_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_brace_in_block=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_paren_in_cast=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_assert=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_case=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_labeled_statement=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_allocation_expression=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_annotation=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_throws=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_constant_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_declarations=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_explicitconstructorcall_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_increments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_inits=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_throws=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_invocation_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_field_declarations=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_local_declarations=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_parameterized_type_reference=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_superinterfaces=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_ellipsis=insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_brace_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_cast=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_catch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_for=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_if=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_parenthesized_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_switch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_synchronized=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_while=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_postfix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_prefix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_question_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_after_question_in_wildcard=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_after_unary_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_and_in_type_parameter=insert
-org.eclipse.jdt.core.formatter.insert_space_before_assignment_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_before_at_in_annotation_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_binary_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_brace_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_cast=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_catch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_for=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_if=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_parenthesized_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_switch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_synchronized=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_while=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_assert=insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_case=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_default=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_labeled_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_throws=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_constant_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_declarations=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_explicitconstructorcall_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_increments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_inits=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_throws=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_invocation_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_field_declarations=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_local_declarations=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_superinterfaces=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_ellipsis=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_annotation_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_anonymous_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_block=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_constructor_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_constant=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_method_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_switch=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation_type_member_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_catch=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_if=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_parenthesized_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_switch=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_synchronized=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_while=insert
-org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_return=insert
-org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_throw=insert
-org.eclipse.jdt.core.formatter.insert_space_before_postfix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_prefix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_question_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_before_question_in_wildcard=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_semicolon=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_for=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_unary_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_brackets_in_array_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_braces_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_brackets_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_annotation_type_member_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.join_lines_in_comments=true
-org.eclipse.jdt.core.formatter.join_wrapped_lines=true
-org.eclipse.jdt.core.formatter.keep_else_statement_on_same_line=false
-org.eclipse.jdt.core.formatter.keep_empty_array_initializer_on_one_line=false
-org.eclipse.jdt.core.formatter.keep_imple_if_on_one_line=false
-org.eclipse.jdt.core.formatter.keep_then_statement_on_same_line=false
-org.eclipse.jdt.core.formatter.lineSplit=120
-org.eclipse.jdt.core.formatter.never_indent_block_comments_on_first_column=false
-org.eclipse.jdt.core.formatter.never_indent_line_comments_on_first_column=false
-org.eclipse.jdt.core.formatter.number_of_blank_lines_at_beginning_of_method_body=0
-org.eclipse.jdt.core.formatter.number_of_empty_lines_to_preserve=1
-org.eclipse.jdt.core.formatter.put_empty_statement_on_new_line=true
-org.eclipse.jdt.core.formatter.tabulation.char=space
-org.eclipse.jdt.core.formatter.tabulation.size=4
-org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations=false
-org.eclipse.jdt.core.formatter.wrap_before_binary_operator=true
diff --git a/hyracks/hyracks-dataflow-hadoop/.settings/org.maven.ide.eclipse.prefs b/hyracks/hyracks-dataflow-hadoop/.settings/org.maven.ide.eclipse.prefs
deleted file mode 100644
index 9e0c887..0000000
--- a/hyracks/hyracks-dataflow-hadoop/.settings/org.maven.ide.eclipse.prefs
+++ /dev/null
@@ -1,9 +0,0 @@
-#Fri Aug 13 04:07:00 PDT 2010
-activeProfiles=
-eclipse.preferences.version=1
-fullBuildGoals=process-test-resources
-includeModules=false
-resolveWorkspaceProjects=true
-resourceFilterGoals=process-resources resources\:testResources
-skipCompilerPlugin=true
-version=1
diff --git a/hyracks/hyracks-dataflow-hadoop/pom.xml b/hyracks/hyracks-dataflow-hadoop/pom.xml
index 4605153..e3f9b1e 100644
--- a/hyracks/hyracks-dataflow-hadoop/pom.xml
+++ b/hyracks/hyracks-dataflow-hadoop/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-dataflow-hadoop</artifactId>
-  <version>0.1.9-SNAPSHOT</version>
+  <version>0.2.0-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.9-SNAPSHOT</version>
+    <version>0.2.0-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -27,14 +27,14 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-api</artifactId>
-  		<version>0.1.9-SNAPSHOT</version>
+  		<version>0.2.0-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-common</artifactId>
-  		<version>0.1.9-SNAPSHOT</version>
+  		<version>0.2.0-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
@@ -54,7 +54,7 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-std</artifactId>
-  		<version>0.1.9-SNAPSHOT</version>
+  		<version>0.2.0-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
   </dependencies>
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
index fd549b3..0ea56c4 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -34,13 +34,12 @@
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.util.ReflectionUtils;
 
-import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.SerializingDataWriter;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
@@ -56,7 +55,7 @@
         protected OutputCollector<K2, V2> output;
         protected Reporter reporter;
         protected Object mapper;
-        //protected Mapper<K1, V1, K2, V2> mapper;
+        // protected Mapper<K1, V1, K2, V2> mapper;
         protected int partition;
         protected JobConf conf;
         protected IOpenableDataWriter<Object[]> writer;
@@ -96,7 +95,8 @@
                 if (!conf.getUseNewMapper()) {
                     ((org.apache.hadoop.mapred.Mapper) mapper).close();
                 } else {
-                    // do nothing. closing the mapper is handled internally by run method on context. 
+                    // do nothing. closing the mapper is handled internally by
+                    // run method on context.
                 }
             } catch (IOException ioe) {
                 throw new HyracksDataException(ioe);
@@ -118,6 +118,11 @@
         }
 
         @Override
+        public void fail() throws HyracksDataException {
+            writer.fail();
+        }
+
+        @Override
         public void open() throws HyracksDataException {
             initializeMapper();
             writer.open();
@@ -172,16 +177,18 @@
                 } else if (splitRead instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
                     conf.set("map.input.file", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getPath()
                             .toString());
-                    conf.setLong("map.input.start", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead)
-                            .getStart());
-                    conf.setLong("map.input.length", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead)
-                            .getLength());
+                    conf.setLong("map.input.start",
+                            ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getStart());
+                    conf.setLong("map.input.length",
+                            ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getLength());
                 }
             } catch (Exception e) {
                 e.printStackTrace();
-                // we do not throw the exception here as we are setting additional parameters that may not be 
-                // required by the mapper. If they are  indeed required,  the configure method invoked on the mapper
-                // shall report an exception because of the missing parameters. 
+                // we do not throw the exception here as we are setting
+                // additional parameters that may not be
+                // required by the mapper. If they are indeed required, the
+                // configure method invoked on the mapper
+                // shall report an exception because of the missing parameters.
             }
         }
 
@@ -223,7 +230,9 @@
                             data[1] = value;
                             writer.writeData(data);
                         }
-                    };;;
+                    };
+                    ;
+                    ;
 
                     OutputCommitter outputCommitter = new org.apache.hadoop.mapreduce.lib.output.NullOutputFormat()
                             .getOutputCommitter(new TaskAttemptContext(conf, new TaskAttemptID()));
@@ -245,7 +254,9 @@
                         public Counter getCounter(Enum<?> arg0) {
                             return null;
                         }
-                    };;;
+                    };
+                    ;
+                    ;
                     context = new org.apache.hadoop.mapreduce.Mapper().new Context(conf, new TaskAttemptID(),
                             newReader, recordWriter, outputCommitter, statusReporter,
                             (org.apache.hadoop.mapreduce.InputSplit) inputSplit);
@@ -315,9 +326,9 @@
         String mapOutputValueClassName = conf.getMapOutputValueClass().getName();
         try {
             if (hadoopClassFactory == null) {
-                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) Class
-                        .forName(mapOutputKeyClassName), (Class<? extends Writable>) Class
-                        .forName(mapOutputValueClassName));
+                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+                        (Class<? extends Writable>) Class.forName(mapOutputKeyClassName),
+                        (Class<? extends Writable>) Class.forName(mapOutputValueClassName));
             } else {
                 recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
                         (Class<? extends Writable>) hadoopClassFactory.loadClass(mapOutputKeyClassName),
@@ -361,13 +372,13 @@
         } else {
             Class inputFormatClass = conf.getInputFormat().getClass();
             InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
-            return inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit) inputSplit, conf, super
-                    .createReporter());
+            return inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit) inputSplit, conf,
+                    super.createReporter());
         }
     }
 
     @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksStageletContext ctx, IOperatorEnvironment env,
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
 
         JobConf conf = getJobConf();
@@ -398,17 +409,17 @@
                             (Class<? extends Writable>) oldReader.createKey().getClass(),
                             (Class<? extends Writable>) oldReader.createValue().getClass());
                 }
-                return createSelfReadingMapper(ctx, env, recordDescriptor, partition);
+                return createSelfReadingMapper(ctx, recordDescriptor, partition);
             } else {
-                return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition), recordDescProvider
-                        .getInputRecordDescriptor(this.odId, 0));
+                return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition),
+                        recordDescProvider.getInputRecordDescriptor(this.odId, 0));
             }
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
     }
 
-    private IOperatorNodePushable createSelfReadingMapper(final IHyracksStageletContext ctx, IOperatorEnvironment env,
+    private IOperatorNodePushable createSelfReadingMapper(final IHyracksTaskContext ctx,
             final RecordDescriptor recordDescriptor, final int partition) {
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
             @Override
@@ -418,9 +429,11 @@
                 try {
                     readMapOp.mapInput();
                 } catch (Exception e) {
+                    writer.fail();
                     throw new HyracksDataException(e);
+                } finally {
+                    readMapOp.close();
                 }
-                readMapOp.close();
             }
         };
     }
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
index 915d09b..2841509 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
@@ -32,12 +32,11 @@
 import org.apache.hadoop.util.ReflectionUtils;
 
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -143,7 +142,7 @@
 
     @SuppressWarnings("deprecation")
     @Override
-    public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx, IOperatorEnvironment env,
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
             throws HyracksDataException {
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
@@ -192,28 +191,35 @@
                     RecordDescriptor outputRecordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
                             (Class<? extends Writable>) hadoopRecordReader.createKey().getClass(),
                             (Class<? extends Writable>) hadoopRecordReader.createValue().getClass());
-                    int nFields = outputRecordDescriptor.getFields().length;
+                    int nFields = outputRecordDescriptor.getFieldCount();
                     ArrayTupleBuilder tb = new ArrayTupleBuilder(nFields);
-                    while (hadoopRecordReader.next(key, value)) {
-                        tb.reset();
-                        switch (nFields) {
-                            case 2:
-                                tb.addField(outputRecordDescriptor.getFields()[0], key);
-                            case 1:
-                                tb.addField(outputRecordDescriptor.getFields()[1], value);
-                        }
-                        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                            FrameUtils.flushFrame(outBuffer, writer);
-                            appender.reset(outBuffer, true);
+                    writer.open();
+                    try {
+                        while (hadoopRecordReader.next(key, value)) {
+                            tb.reset();
+                            switch (nFields) {
+                                case 2:
+                                    tb.addField(outputRecordDescriptor.getFields()[0], key);
+                                case 1:
+                                    tb.addField(outputRecordDescriptor.getFields()[1], value);
+                            }
                             if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                                throw new IllegalStateException();
+                                FrameUtils.flushFrame(outBuffer, writer);
+                                appender.reset(outBuffer, true);
+                                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                                    throw new IllegalStateException();
+                                }
                             }
                         }
+                        if (appender.getTupleCount() > 0) {
+                            FrameUtils.flushFrame(outBuffer, writer);
+                        }
+                    } catch (Exception e) {
+                        writer.fail();
+                        throw new HyracksDataException(e);
+                    } finally {
+                        writer.close();
                     }
-                    if (appender.getTupleCount() > 0) {
-                        FrameUtils.flushFrame(outBuffer, writer);
-                    }
-                    writer.close();
                     hadoopRecordReader.close();
                 } catch (InstantiationException e) {
                     throw new HyracksDataException(e);
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
index 5857f36..73a8c16 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
@@ -23,19 +23,18 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RawKeyValueIterator;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.StatusReporter;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
 
-import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IDataReader;
 import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
@@ -44,7 +43,6 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.hadoop.data.KeyComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.hadoop.data.RawComparingComparatorFactory;
@@ -348,7 +346,7 @@
     }
 
     @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksStageletContext ctx, IOperatorEnvironment env,
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
         try {
             if (this.comparatorFactory == null) {
@@ -395,8 +393,9 @@
         RecordDescriptor recordDescriptor = null;
         try {
             if (classFactory == null) {
-                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) Class
-                        .forName(outputKeyClassName), (Class<? extends Writable>) Class.forName(outputValueClassName));
+                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+                        (Class<? extends Writable>) Class.forName(outputKeyClassName),
+                        (Class<? extends Writable>) Class.forName(outputValueClassName));
             } else {
                 recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
                         (Class<? extends Writable>) classFactory.loadClass(outputKeyClassName),
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java
new file mode 100644
index 0000000..e8b6e8b
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java
@@ -0,0 +1,268 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.HadoopNewPartitionerTuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+
+public class HadoopHelper {
+    public static final int KEY_FIELD_INDEX = 0;
+    public static final int VALUE_FIELD_INDEX = 1;
+    public static final int BLOCKID_FIELD_INDEX = 2;
+    private static final int[] KEY_SORT_FIELDS = new int[] { 0 };
+
+    private MarshalledWritable<Configuration> mConfig;
+    private Configuration config;
+    private Job job;
+
+    public HadoopHelper(MarshalledWritable<Configuration> mConfig) throws HyracksDataException {
+        this.mConfig = mConfig;
+        ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+            config = mConfig.get();
+            config.setClassLoader(getClass().getClassLoader());
+            job = new Job(config);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            Thread.currentThread().setContextClassLoader(ctxCL);
+        }
+    }
+
+    public RecordDescriptor getMapOutputRecordDescriptor() throws HyracksDataException {
+        try {
+            return new RecordDescriptor(
+                    new ISerializerDeserializer[] {
+                            DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+                                    .getMapOutputKeyClass()),
+                            DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+                                    .getMapOutputValueClass()), IntegerSerializerDeserializer.INSTANCE });
+
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public RecordDescriptor getMapOutputRecordDescriptorWithoutExtraFields() throws HyracksDataException {
+        try {
+            return new RecordDescriptor(
+                    new ISerializerDeserializer[] {
+                            DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+                                    .getMapOutputKeyClass()),
+                            DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+                                    .getMapOutputValueClass()) });
+
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public TaskAttemptContext createTaskAttemptContext(TaskAttemptID taId) {
+        ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(config.getClassLoader());
+            return new TaskAttemptContext(config, taId);
+        } finally {
+            Thread.currentThread().setContextClassLoader(ctxCL);
+        }
+    }
+
+    public JobContext createJobContext() {
+        ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(config.getClassLoader());
+            return new JobContext(config, null);
+        } finally {
+            Thread.currentThread().setContextClassLoader(ctxCL);
+        }
+    }
+
+    public <K1, V1, K2, V2> Mapper<K1, V1, K2, V2> getMapper() throws HyracksDataException {
+        try {
+            return (Mapper<K1, V1, K2, V2>) HadoopTools.newInstance(job.getMapperClass());
+        } catch (ClassNotFoundException e) {
+            throw new HyracksDataException(e);
+        } catch (InstantiationException e) {
+            throw new HyracksDataException(e);
+        } catch (IllegalAccessException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public <K2, V2, K3, V3> Reducer<K2, V2, K3, V3> getReducer() throws HyracksDataException {
+        try {
+            return (Reducer<K2, V2, K3, V3>) HadoopTools.newInstance(job.getReducerClass());
+        } catch (ClassNotFoundException e) {
+            throw new HyracksDataException(e);
+        } catch (InstantiationException e) {
+            throw new HyracksDataException(e);
+        } catch (IllegalAccessException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public <K2, V2> Reducer<K2, V2, K2, V2> getCombiner() throws HyracksDataException {
+        try {
+            return (Reducer<K2, V2, K2, V2>) HadoopTools.newInstance(job.getCombinerClass());
+        } catch (ClassNotFoundException e) {
+            throw new HyracksDataException(e);
+        } catch (InstantiationException e) {
+            throw new HyracksDataException(e);
+        } catch (IllegalAccessException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public <K, V> InputFormat<K, V> getInputFormat() throws HyracksDataException {
+        try {
+            return (InputFormat<K, V>) ReflectionUtils.newInstance(job.getInputFormatClass(), config);
+        } catch (ClassNotFoundException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public <K, V> List<InputSplit> getInputSplits() throws HyracksDataException {
+        ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+            InputFormat<K, V> fmt = getInputFormat();
+            JobContext jCtx = new JobContext(config, null);
+            try {
+                return fmt.getSplits(jCtx);
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            } catch (InterruptedException e) {
+                throw new HyracksDataException(e);
+            }
+        } finally {
+            Thread.currentThread().setContextClassLoader(ctxCL);
+        }
+    }
+
+    public IBinaryComparatorFactory[] getSortComparatorFactories() {
+        WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(job
+                .getSortComparator().getClass());
+
+        return new IBinaryComparatorFactory[] { comparatorFactory };
+    }
+
+    public IBinaryComparatorFactory[] getGroupingComparatorFactories() {
+        WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(job
+                .getGroupingComparator().getClass());
+
+        return new IBinaryComparatorFactory[] { comparatorFactory };
+    }
+
+    public RawComparator<?> getRawGroupingComparator() {
+        return job.getGroupingComparator();
+    }
+
+    public int getSortFrameLimit(IHyracksCommonContext ctx) {
+        int sortMemory = job.getConfiguration().getInt("io.sort.mb", 100);
+        return (int) (((long) sortMemory * 1024 * 1024) / ctx.getFrameSize());
+    }
+
+    public Job getJob() {
+        return job;
+    }
+
+    public MarshalledWritable<Configuration> getMarshalledConfiguration() {
+        return mConfig;
+    }
+
+    public Configuration getConfiguration() {
+        return config;
+    }
+
+    public ITuplePartitionComputerFactory getTuplePartitionComputer() throws HyracksDataException {
+        int nReducers = job.getNumReduceTasks();
+        try {
+            return new HadoopNewPartitionerTuplePartitionComputerFactory<Writable, Writable>(
+                    (Class<? extends Partitioner<Writable, Writable>>) job.getPartitionerClass(),
+                    (ISerializerDeserializer<Writable>) DatatypeHelper
+                            .createSerializerDeserializer((Class<? extends Writable>) job.getMapOutputKeyClass()),
+                    (ISerializerDeserializer<Writable>) DatatypeHelper
+                            .createSerializerDeserializer((Class<? extends Writable>) job.getMapOutputValueClass()));
+        } catch (ClassNotFoundException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public int[] getSortFields() {
+        return KEY_SORT_FIELDS;
+    }
+
+    public <K> ISerializerDeserializer<K> getMapOutputKeySerializerDeserializer() {
+        return (ISerializerDeserializer<K>) DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+                .getMapOutputKeyClass());
+    }
+
+    public <V> ISerializerDeserializer<V> getMapOutputValueSerializerDeserializer() {
+        return (ISerializerDeserializer<V>) DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+                .getMapOutputValueClass());
+    }
+
+    public FileSystem getFilesystem() throws HyracksDataException {
+        try {
+            return FileSystem.get(config);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public <K, V> OutputFormat<K, V> getOutputFormat() throws HyracksDataException {
+        try {
+            return (OutputFormat<K, V>) ReflectionUtils.newInstance(job.getOutputFormatClass(), config);
+        } catch (ClassNotFoundException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public boolean hasCombiner() throws HyracksDataException {
+        try {
+            return job.getCombinerClass() != null;
+        } catch (ClassNotFoundException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopTools.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopTools.java
new file mode 100644
index 0000000..7e4d67f
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopTools.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+public class HadoopTools {
+    public static Object newInstance(String className) throws ClassNotFoundException, InstantiationException,
+            IllegalAccessException {
+        ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(HadoopTools.class.getClassLoader());
+            Class<?> clazz = Class.forName(className, true, HadoopTools.class.getClassLoader());
+            return newInstance(clazz);
+        } finally {
+            Thread.currentThread().setContextClassLoader(ctxCL);
+        }
+    }
+
+    public static Object newInstance(Class<?> clazz) throws InstantiationException, IllegalAccessException {
+        return clazz.newInstance();
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HashPartitioningShuffleConnectorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HashPartitioningShuffleConnectorDescriptor.java
new file mode 100644
index 0000000..7c6bf86
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HashPartitioningShuffleConnectorDescriptor.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.util.BitSet;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
+import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
+import edu.uci.ics.hyracks.dataflow.std.collectors.PartitionCollector;
+import edu.uci.ics.hyracks.dataflow.std.connectors.PartitionDataWriter;
+
+public class HashPartitioningShuffleConnectorDescriptor extends AbstractMToNConnectorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    private final MarshalledWritable<Configuration> mConfig;
+
+    public HashPartitioningShuffleConnectorDescriptor(JobSpecification spec, MarshalledWritable<Configuration> mConfig) {
+        super(spec);
+        this.mConfig = mConfig;
+    }
+
+    @Override
+    public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
+            IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+            throws HyracksDataException {
+        HadoopHelper helper = new HadoopHelper(mConfig);
+        ITuplePartitionComputerFactory tpcf = helper.getTuplePartitionComputer();
+        return new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner());
+    }
+
+    @Override
+    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
+            int receiverIndex, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+        BitSet expectedPartitions = new BitSet();
+        expectedPartitions.set(0, nProducerPartitions);
+        NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
+                expectedPartitions);
+        IFrameReader frameReader = new ShuffleFrameReader(ctx, channelReader, mConfig);
+        return new PartitionCollector(ctx, getConnectorId(), receiverIndex, expectedPartitions, frameReader,
+                channelReader);
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/IInputSplitProvider.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/IInputSplitProvider.java
new file mode 100644
index 0000000..1545c06
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/IInputSplitProvider.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IInputSplitProvider {
+    public InputSplit next() throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/IInputSplitProviderFactory.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/IInputSplitProviderFactory.java
new file mode 100644
index 0000000..73588ab
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/IInputSplitProviderFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IInputSplitProviderFactory extends Serializable {
+    public IInputSplitProvider createInputSplitProvider(int id) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/InputFileSplit.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/InputFileSplit.java
new file mode 100644
index 0000000..b37084e
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/InputFileSplit.java
@@ -0,0 +1,92 @@
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+public class InputFileSplit extends InputSplit implements Writable {
+    private Path file;
+    private long start;
+    private long length;
+    private int blockId;
+    private String[] hosts;
+    private long scheduleTime;
+
+    public InputFileSplit() {
+    }
+
+    public InputFileSplit(int blockId, Path file, long start, long length, String[] hosts, long schedule_time) {
+        this.blockId = blockId;
+        this.file = file;
+        this.start = start;
+        this.length = length;
+        this.hosts = hosts;
+        this.scheduleTime = schedule_time;
+    }
+
+    public int blockId() {
+        return blockId;
+    }
+
+    public long scheduleTime() {
+        return this.scheduleTime;
+    }
+
+    public Path getPath() {
+        return file;
+    }
+
+    /** The position of the first byte in the file to process. */
+    public long getStart() {
+        return start;
+    }
+
+    /** The number of bytes in the file to process. */
+    @Override
+    public long getLength() {
+        return length;
+    }
+
+    @Override
+    public String toString() {
+        return file + ":" + start + "+" + length;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, file.toString());
+        out.writeLong(start);
+        out.writeLong(length);
+        out.writeInt(blockId);
+        out.writeLong(this.scheduleTime);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        file = new Path(Text.readString(in));
+        start = in.readLong();
+        length = in.readLong();
+        hosts = null;
+        this.blockId = in.readInt();
+        this.scheduleTime = in.readLong();
+    }
+
+    @Override
+    public String[] getLocations() throws IOException {
+        if (this.hosts == null) {
+            return new String[] {};
+        } else {
+            return this.hosts;
+        }
+    }
+
+    public FileSplit toFileSplit() {
+        return new FileSplit(file, start, length, hosts);
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/KVIterator.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/KVIterator.java
new file mode 100644
index 0000000..a00fa7f0
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/KVIterator.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.util.Progress;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class KVIterator implements RawKeyValueIterator {
+    private final HadoopHelper helper;
+    private FrameTupleAccessor accessor;
+    private DataInputBuffer kBuffer;
+    private DataInputBuffer vBuffer;
+    private List<ByteBuffer> buffers;
+    private int bSize;
+    private int bPtr;
+    private int tIdx;
+    private boolean eog;
+
+    public KVIterator(IHyracksTaskContext ctx, HadoopHelper helper, RecordDescriptor recordDescriptor) {
+        this.helper = helper;
+        accessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+        kBuffer = new DataInputBuffer();
+        vBuffer = new DataInputBuffer();
+    }
+
+    void reset(List<ByteBuffer> buffers, int bSize) {
+        this.buffers = buffers;
+        this.bSize = bSize;
+        bPtr = 0;
+        tIdx = 0;
+        eog = false;
+        if (bSize > 0) {
+            accessor.reset(buffers.get(0));
+            tIdx = -1;
+        } else {
+            eog = true;
+        }
+    }
+
+    @Override
+    public DataInputBuffer getKey() throws IOException {
+        return kBuffer;
+    }
+
+    @Override
+    public DataInputBuffer getValue() throws IOException {
+        return vBuffer;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+        while (true) {
+            if (eog) {
+                return false;
+            }
+            ++tIdx;
+            if (accessor.getTupleCount() <= tIdx) {
+                ++bPtr;
+                if (bPtr >= bSize) {
+                    eog = true;
+                    continue;
+                }
+                tIdx = -1;
+                accessor.reset(buffers.get(bPtr));
+                continue;
+            }
+            kBuffer.reset(accessor.getBuffer().array(),
+                    FrameUtils.getAbsoluteFieldStartOffset(accessor, tIdx, helper.KEY_FIELD_INDEX),
+                    accessor.getFieldLength(tIdx, helper.KEY_FIELD_INDEX));
+            vBuffer.reset(accessor.getBuffer().array(),
+                    FrameUtils.getAbsoluteFieldStartOffset(accessor, tIdx, helper.VALUE_FIELD_INDEX),
+                    accessor.getFieldLength(tIdx, helper.VALUE_FIELD_INDEX));
+            break;
+        }
+        return true;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public Progress getProgress() {
+        return null;
+    }
+}
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
new file mode 100644
index 0000000..4a61296
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
@@ -0,0 +1,271 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunMerger;
+
+public class MapperOperatorDescriptor<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable>
+        extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+    private final int jobId;
+    private final MarshalledWritable<Configuration> config;
+    private final IInputSplitProviderFactory factory;
+
+    public MapperOperatorDescriptor(JobSpecification spec, int jobId, MarshalledWritable<Configuration> config,
+            IInputSplitProviderFactory factory) throws HyracksDataException {
+        super(spec, 0, 1);
+        this.jobId = jobId;
+        this.config = config;
+        this.factory = factory;
+        HadoopHelper helper = new HadoopHelper(config);
+        recordDescriptors[0] = helper.getMapOutputRecordDescriptor();
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+            throws HyracksDataException {
+        final HadoopHelper helper = new HadoopHelper(config);
+        final Configuration conf = helper.getConfiguration();
+        final Mapper<K1, V1, K2, V2> mapper = helper.getMapper();
+        final InputFormat<K1, V1> inputFormat = helper.getInputFormat();
+        final IInputSplitProvider isp = factory.createInputSplitProvider(partition);
+        final TaskAttemptID taId = new TaskAttemptID("foo", jobId, true, partition, 0);
+        final TaskAttemptContext taskAttemptContext = helper.createTaskAttemptContext(taId);
+
+        final int framesLimit = helper.getSortFrameLimit(ctx);
+        final IBinaryComparatorFactory[] comparatorFactories = helper.getSortComparatorFactories();
+
+        class SortingRecordWriter extends RecordWriter<K2, V2> {
+            private final ArrayTupleBuilder tb;
+            private final ByteBuffer frame;
+            private final FrameTupleAppender fta;
+            private ExternalSortRunGenerator runGen;
+            private int blockId;
+
+            public SortingRecordWriter() throws HyracksDataException {
+                tb = new ArrayTupleBuilder(2);
+                frame = ctx.allocateFrame();
+                fta = new FrameTupleAppender(ctx.getFrameSize());
+                fta.reset(frame, true);
+            }
+
+            public void initBlock(int blockId) throws HyracksDataException {
+                runGen = new ExternalSortRunGenerator(ctx, new int[] { 0 }, null, comparatorFactories,
+                        helper.getMapOutputRecordDescriptorWithoutExtraFields(), framesLimit);
+                this.blockId = blockId;
+            }
+
+            @Override
+            public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
+            }
+
+            @Override
+            public void write(K2 key, V2 value) throws IOException, InterruptedException {
+                DataOutput dos = tb.getDataOutput();
+                tb.reset();
+                key.write(dos);
+                tb.addFieldEndOffset();
+                value.write(dos);
+                tb.addFieldEndOffset();
+                if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                    runGen.nextFrame(frame);
+                    fta.reset(frame, true);
+                    if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                        throw new IllegalStateException();
+                    }
+                }
+            }
+
+            public void sortAndFlushBlock(final IFrameWriter writer) throws HyracksDataException {
+                if (fta.getTupleCount() > 0) {
+                    runGen.nextFrame(frame);
+                    fta.reset(frame, true);
+                }
+                runGen.close();
+                IFrameWriter delegatingWriter = new IFrameWriter() {
+                    private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+                    private final ByteBuffer outFrame = ctx.allocateFrame();
+                    private final FrameTupleAccessor fta = new FrameTupleAccessor(ctx.getFrameSize(),
+                            helper.getMapOutputRecordDescriptorWithoutExtraFields());
+                    private final ArrayTupleBuilder tb = new ArrayTupleBuilder(3);
+
+                    @Override
+                    public void open() throws HyracksDataException {
+                        appender.reset(outFrame, true);
+                    }
+
+                    @Override
+                    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                        fta.reset(buffer);
+                        int n = fta.getTupleCount();
+                        for (int i = 0; i < n; ++i) {
+                            tb.reset();
+                            tb.addField(fta, i, 0);
+                            tb.addField(fta, i, 1);
+                            try {
+                                tb.getDataOutput().writeInt(blockId);
+                            } catch (IOException e) {
+                                throw new HyracksDataException(e);
+                            }
+                            tb.addFieldEndOffset();
+                            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                                FrameUtils.flushFrame(outFrame, writer);
+                                appender.reset(outFrame, true);
+                                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                                    throw new IllegalStateException();
+                                }
+                            }
+                        }
+                    }
+
+                    @Override
+                    public void close() throws HyracksDataException {
+                        if (appender.getTupleCount() > 0) {
+                            FrameUtils.flushFrame(outFrame, writer);
+                        }
+                    }
+
+                    @Override
+                    public void fail() throws HyracksDataException {
+                        // TODO Auto-generated method stub
+
+                    }
+                };
+                if (helper.hasCombiner()) {
+                    Reducer<K2, V2, K2, V2> combiner = helper.getCombiner();
+                    TaskAttemptID ctaId = new TaskAttemptID("foo", jobId, true, partition, 0);
+                    TaskAttemptContext ctaskAttemptContext = helper.createTaskAttemptContext(taId);
+                    final IFrameWriter outputWriter = delegatingWriter;
+                    RecordWriter<K2, V2> recordWriter = new RecordWriter<K2, V2>() {
+                        private final FrameTupleAppender fta = new FrameTupleAppender(ctx.getFrameSize());
+                        private final ByteBuffer buffer = ctx.allocateFrame();
+                        private final ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+
+                        {
+                            fta.reset(buffer, true);
+                            outputWriter.open();
+                        }
+
+                        @Override
+                        public void write(K2 key, V2 value) throws IOException, InterruptedException {
+                            DataOutput dos = tb.getDataOutput();
+                            tb.reset();
+                            key.write(dos);
+                            tb.addFieldEndOffset();
+                            value.write(dos);
+                            tb.addFieldEndOffset();
+                            if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                                FrameUtils.flushFrame(buffer, outputWriter);
+                                fta.reset(buffer, true);
+                                if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                                    throw new IllegalStateException();
+                                }
+                            }
+                        }
+
+                        @Override
+                        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+                            if (fta.getTupleCount() > 0) {
+                                FrameUtils.flushFrame(buffer, outputWriter);
+                                outputWriter.close();
+                            }
+                        }
+                    };
+                    delegatingWriter = new ReduceWriter<K2, V2, K2, V2>(ctx, helper,
+                            new int[] { HadoopHelper.KEY_FIELD_INDEX }, helper.getGroupingComparatorFactories(),
+                            helper.getMapOutputRecordDescriptorWithoutExtraFields(), combiner, recordWriter, ctaId,
+                            ctaskAttemptContext);
+                }
+                IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+                for (int i = 0; i < comparatorFactories.length; ++i) {
+                    comparators[i] = comparatorFactories[i].createBinaryComparator();
+                }
+                ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, runGen.getFrameSorter(),
+                        runGen.getRuns(), new int[] { 0 }, comparators,
+                        helper.getMapOutputRecordDescriptorWithoutExtraFields(), framesLimit, delegatingWriter);
+                merger.process();
+            }
+        }
+
+        return new AbstractUnaryOutputSourceOperatorNodePushable() {
+            @Override
+            public void initialize() throws HyracksDataException {
+                writer.open();
+                try {
+                    SortingRecordWriter recordWriter = new SortingRecordWriter();
+                    InputSplit split = null;
+                    int blockId = 0;
+                    while ((split = isp.next()) != null) {
+                        try {
+                            RecordReader<K1, V1> recordReader = inputFormat.createRecordReader(split,
+                                    taskAttemptContext);
+                            ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+                            try {
+                                Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+                                recordReader.initialize(split, taskAttemptContext);
+                            } finally {
+                                Thread.currentThread().setContextClassLoader(ctxCL);
+                            }
+                            recordWriter.initBlock(blockId);
+                            Mapper<K1, V1, K2, V2>.Context mCtx = mapper.new Context(conf, taId, recordReader,
+                                    recordWriter, null, null, split);
+                            mapper.run(mCtx);
+                            recordReader.close();
+                            recordWriter.sortAndFlushBlock(writer);
+                            ++blockId;
+                        } catch (IOException e) {
+                            throw new HyracksDataException(e);
+                        } catch (InterruptedException e) {
+                            throw new HyracksDataException(e);
+                        }
+                    }
+                } finally {
+                    writer.close();
+                }
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MarshalledWritable.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MarshalledWritable.java
new file mode 100644
index 0000000..be05f22
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MarshalledWritable.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2010 University of California, Irvine
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.Writable;
+
+public class MarshalledWritable<T extends Writable> implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private byte[] bytes;
+
+    public MarshalledWritable() {
+    }
+
+    public void set(T o) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+        dos.writeUTF(o.getClass().getName());
+        o.write(dos);
+        dos.close();
+        bytes = baos.toByteArray();
+    }
+
+    public T get() throws Exception {
+        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+        DataInputStream dis = new DataInputStream(bais);
+        String className = dis.readUTF();
+        T o = (T) HadoopTools.newInstance(className);
+        o.readFields(dis);
+        return o;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java
new file mode 100644
index 0000000..33d58af
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java
@@ -0,0 +1,187 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class ReduceWriter<K2, V2, K3, V3> implements IFrameWriter {
+    private final IHyracksTaskContext ctx;
+    private final HadoopHelper helper;
+    private final int[] groupFields;
+    private final FrameTupleAccessor accessor0;
+    private final FrameTupleAccessor accessor1;
+    private final ByteBuffer copyFrame;
+    private final IBinaryComparator[] comparators;
+    private final KVIterator kvi;
+    private final Reducer<K2, V2, K3, V3> reducer;
+    private final RecordWriter<K3, V3> recordWriter;
+    private final TaskAttemptID taId;
+    private final TaskAttemptContext taskAttemptContext;
+
+    private boolean first;
+    private boolean groupStarted;
+    private List<ByteBuffer> group;
+    private int bPtr;
+    private FrameTupleAppender fta;
+    private Counter keyCounter;
+    private Counter valueCounter;
+
+    public ReduceWriter(IHyracksTaskContext ctx, HadoopHelper helper, int[] groupFields,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
+            Reducer<K2, V2, K3, V3> reducer, RecordWriter<K3, V3> recordWriter, TaskAttemptID taId,
+            TaskAttemptContext taskAttemptContext) {
+        this.ctx = ctx;
+        this.helper = helper;
+        this.groupFields = groupFields;
+        accessor0 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+        accessor1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+        copyFrame = ctx.allocateFrame();
+        accessor1.reset(copyFrame);
+        comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        this.reducer = reducer;
+        this.recordWriter = recordWriter;
+        this.taId = taId;
+        this.taskAttemptContext = taskAttemptContext;
+
+        kvi = new KVIterator(ctx, helper, recordDescriptor);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        first = true;
+        groupStarted = false;
+        group = new ArrayList<ByteBuffer>();
+        bPtr = 0;
+        group.add(ctx.allocateFrame());
+        fta = new FrameTupleAppender(ctx.getFrameSize());
+        keyCounter = new Counter() {
+        };
+        valueCounter = new Counter() {
+        };
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        accessor0.reset(buffer);
+        int nTuples = accessor0.getTupleCount();
+        for (int i = 0; i < nTuples; ++i) {
+            if (first) {
+                groupInit();
+                first = false;
+            } else {
+                if (i == 0) {
+                    switchGroupIfRequired(accessor1, accessor1.getTupleCount() - 1, accessor0, i);
+                } else {
+                    switchGroupIfRequired(accessor0, i - 1, accessor0, i);
+                }
+            }
+            accumulate(accessor0, i);
+        }
+        FrameUtils.copy(buffer, copyFrame);
+    }
+
+    private void accumulate(FrameTupleAccessor accessor, int tIndex) {
+        if (!fta.append(accessor, tIndex)) {
+            ++bPtr;
+            if (group.size() <= bPtr) {
+                group.add(ctx.allocateFrame());
+            }
+            fta.reset(group.get(bPtr), true);
+            if (!fta.append(accessor, tIndex)) {
+                throw new IllegalStateException();
+            }
+        }
+    }
+
+    private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
+            FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
+        if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
+            reduce();
+            groupInit();
+        }
+    }
+
+    private void groupInit() {
+        groupStarted = true;
+        bPtr = 0;
+        fta.reset(group.get(0), true);
+    }
+
+    private void reduce() throws HyracksDataException {
+        kvi.reset(group, bPtr + 1);
+        try {
+            Reducer<K2, V2, K3, V3>.Context rCtx = reducer.new Context(helper.getConfiguration(), taId, kvi,
+                    keyCounter, valueCounter, recordWriter, null, null,
+                    (RawComparator<K2>) helper.getRawGroupingComparator(), (Class<K2>) helper.getJob()
+                            .getMapOutputKeyClass(), (Class<V2>) helper.getJob().getMapOutputValueClass());
+            reducer.run(rCtx);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+        groupStarted = false;
+    }
+
+    private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) {
+        for (int i = 0; i < comparators.length; ++i) {
+            int fIdx = groupFields[i];
+            int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength() + a1.getFieldStartOffset(t1Idx, fIdx);
+            int l1 = a1.getFieldLength(t1Idx, fIdx);
+            int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, fIdx);
+            int l2 = a2.getFieldLength(t2Idx, fIdx);
+            if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (groupStarted) {
+            reduce();
+        }
+        try {
+            recordWriter.close(taskAttemptContext);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReducerOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReducerOperatorDescriptor.java
new file mode 100644
index 0000000..bbb0d74
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReducerOperatorDescriptor.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+public class ReducerOperatorDescriptor<K2 extends Writable, V2 extends Writable, K3 extends Writable, V3 extends Writable>
+        extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    private final int jobId;
+
+    private MarshalledWritable<Configuration> mConfig;
+
+    public ReducerOperatorDescriptor(JobSpecification spec, int jobId, MarshalledWritable<Configuration> mConfig) {
+        super(spec, 1, 0);
+        this.jobId = jobId;
+        this.mConfig = mConfig;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        final HadoopHelper helper = new HadoopHelper(mConfig);
+        final Reducer<K2, V2, K3, V3> reducer = helper.getReducer();
+        final RecordDescriptor recordDescriptor = helper.getMapOutputRecordDescriptor();
+        final int[] groupFields = helper.getSortFields();
+        IBinaryComparatorFactory[] groupingComparators = helper.getGroupingComparatorFactories();
+
+        final TaskAttemptID taId = new TaskAttemptID("foo", jobId, false, partition, 0);
+        final TaskAttemptContext taskAttemptContext = helper.createTaskAttemptContext(taId);
+        final RecordWriter recordWriter;
+        try {
+            recordWriter = helper.getOutputFormat().getRecordWriter(taskAttemptContext);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+
+        final ReduceWriter<K2, V2, K3, V3> rw = new ReduceWriter<K2, V2, K3, V3>(ctx, helper, groupFields,
+                groupingComparators, recordDescriptor, reducer, recordWriter, taId, taskAttemptContext);
+
+        return new AbstractUnaryInputSinkOperatorNodePushable() {
+            @Override
+            public void open() throws HyracksDataException {
+                rw.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                rw.nextFrame(buffer);
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                rw.close();
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
new file mode 100644
index 0000000..8e03eae
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunMerger;
+
+public class ShuffleFrameReader implements IFrameReader {
+    private final IHyracksTaskContext ctx;
+    private final NonDeterministicChannelReader channelReader;
+    private final HadoopHelper helper;
+    private final RecordDescriptor recordDescriptor;
+    private List<RunFileWriter> runFileWriters;
+    private RunFileReader reader;
+
+    public ShuffleFrameReader(IHyracksTaskContext ctx, NonDeterministicChannelReader channelReader,
+            MarshalledWritable<Configuration> mConfig) throws HyracksDataException {
+        this.ctx = ctx;
+        this.channelReader = channelReader;
+        helper = new HadoopHelper(mConfig);
+        this.recordDescriptor = helper.getMapOutputRecordDescriptor();
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        channelReader.open();
+        int nSenders = channelReader.getSenderPartitionCount();
+        runFileWriters = new ArrayList<RunFileWriter>();
+        RunInfo[] infos = new RunInfo[nSenders];
+        FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+        IInputChannel[] channels = channelReader.getChannels();
+        while (true) {
+            int entry = channelReader.findNextSender();
+            if (entry < 0) {
+                break;
+            }
+            RunInfo info = infos[entry];
+            IInputChannel channel = channels[entry];
+            ByteBuffer netBuffer = channel.getNextBuffer();
+            accessor.reset(netBuffer);
+            int nTuples = accessor.getTupleCount();
+            for (int i = 0; i < nTuples; ++i) {
+                int tBlockId = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+                        FrameUtils.getAbsoluteFieldStartOffset(accessor, i, HadoopHelper.BLOCKID_FIELD_INDEX));
+                if (info == null) {
+                    info = new RunInfo();
+                    info.reset(tBlockId);
+                    infos[entry] = info;
+                } else if (info.blockId != tBlockId) {
+                    info.close();
+                    info.reset(tBlockId);
+                }
+                info.write(accessor, i);
+            }
+            channel.recycleBuffer(netBuffer);
+        }
+        for (int i = 0; i < infos.length; ++i) {
+            RunInfo info = infos[i];
+            if (info != null) {
+                info.close();
+            }
+        }
+        infos = null;
+
+        FileReference outFile = ctx.createManagedWorkspaceFile(ShuffleFrameReader.class.getName() + ".run");
+        int framesLimit = helper.getSortFrameLimit(ctx);
+        IBinaryComparatorFactory[] comparatorFactories = helper.getSortComparatorFactories();
+        IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        List<IFrameReader> runs = new LinkedList<IFrameReader>();
+        for (RunFileWriter rfw : runFileWriters) {
+            runs.add(rfw.createReader());
+        }
+        RunFileWriter rfw = new RunFileWriter(outFile, ctx.getIOManager());
+        ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, null, runs, new int[] { 0 }, comparators,
+                recordDescriptor, framesLimit, rfw);
+        merger.process();
+
+        reader = rfw.createReader();
+        reader.open();
+    }
+
+    @Override
+    public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        return reader.nextFrame(buffer);
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        reader.close();
+    }
+
+    private class RunInfo {
+        private final ByteBuffer buffer;
+        private final FrameTupleAppender fta;
+
+        private FileReference file;
+        private RunFileWriter rfw;
+        private int blockId;
+
+        public RunInfo() {
+            buffer = ctx.allocateFrame();
+            fta = new FrameTupleAppender(ctx.getFrameSize());
+        }
+
+        public void reset(int blockId) throws HyracksDataException {
+            this.blockId = blockId;
+            fta.reset(buffer, true);
+            try {
+                file = ctx.createManagedWorkspaceFile(ShuffleFrameReader.class.getName() + ".run");
+                rfw = new RunFileWriter(file, ctx.getIOManager());
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        public void write(FrameTupleAccessor accessor, int tIdx) throws HyracksDataException {
+            if (!fta.append(accessor, tIdx)) {
+                flush();
+                if (!fta.append(accessor, tIdx)) {
+                    throw new IllegalStateException();
+                }
+            }
+        }
+
+        public void close() throws HyracksDataException {
+            flush();
+            rfw.close();
+            runFileWriters.add(rfw);
+        }
+
+        private void flush() throws HyracksDataException {
+            if (fta.getTupleCount() <= 0) {
+                return;
+            }
+            buffer.limit(buffer.capacity());
+            buffer.position(0);
+            rfw.nextFrame(buffer);
+            fta.reset(buffer, true);
+        }
+    }
+}
\ No newline at end of file