Merged hyracks_dev_next -r 1287 into trunk
git-svn-id: https://hyracks.googlecode.com/svn/trunk@1288 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-dataflow-hadoop/.classpath b/hyracks/hyracks-dataflow-hadoop/.classpath
deleted file mode 100644
index 1f3c1ff..0000000
--- a/hyracks/hyracks-dataflow-hadoop/.classpath
+++ /dev/null
@@ -1,7 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<classpath>
- <classpathentry kind="src" output="target/classes" path="src/main/java"/>
- <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
- <classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
- <classpathentry kind="output" path="target/classes"/>
-</classpath>
diff --git a/hyracks/hyracks-dataflow-hadoop/.project b/hyracks/hyracks-dataflow-hadoop/.project
deleted file mode 100644
index d6edecf..0000000
--- a/hyracks/hyracks-dataflow-hadoop/.project
+++ /dev/null
@@ -1,23 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<projectDescription>
- <name>hyracks-dataflow-hadoop</name>
- <comment></comment>
- <projects>
- </projects>
- <buildSpec>
- <buildCommand>
- <name>org.eclipse.jdt.core.javabuilder</name>
- <arguments>
- </arguments>
- </buildCommand>
- <buildCommand>
- <name>org.maven.ide.eclipse.maven2Builder</name>
- <arguments>
- </arguments>
- </buildCommand>
- </buildSpec>
- <natures>
- <nature>org.maven.ide.eclipse.maven2Nature</nature>
- <nature>org.eclipse.jdt.core.javanature</nature>
- </natures>
-</projectDescription>
diff --git a/hyracks/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs b/hyracks/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs
deleted file mode 100644
index 0272d6e..0000000
--- a/hyracks/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs
+++ /dev/null
@@ -1,274 +0,0 @@
-#Thu Jun 02 13:11:16 PDT 2011
-eclipse.preferences.version=1
-org.eclipse.jdt.core.codeComplete.argumentPrefixes=
-org.eclipse.jdt.core.codeComplete.argumentSuffixes=
-org.eclipse.jdt.core.codeComplete.fieldPrefixes=
-org.eclipse.jdt.core.codeComplete.fieldSuffixes=
-org.eclipse.jdt.core.codeComplete.localPrefixes=
-org.eclipse.jdt.core.codeComplete.localSuffixes=
-org.eclipse.jdt.core.codeComplete.staticFieldPrefixes=
-org.eclipse.jdt.core.codeComplete.staticFieldSuffixes=
-org.eclipse.jdt.core.codeComplete.staticFinalFieldPrefixes=
-org.eclipse.jdt.core.codeComplete.staticFinalFieldSuffixes=
-org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
-org.eclipse.jdt.core.compiler.compliance=1.6
-org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
-org.eclipse.jdt.core.compiler.source=1.6
-org.eclipse.jdt.core.formatter.align_type_members_on_columns=false
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_allocation_expression=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_enum_constant=48
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_call=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_method_invocation=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_qualified_allocation_expression=16
-org.eclipse.jdt.core.formatter.alignment_for_assignment=0
-org.eclipse.jdt.core.formatter.alignment_for_binary_expression=16
-org.eclipse.jdt.core.formatter.alignment_for_compact_if=16
-org.eclipse.jdt.core.formatter.alignment_for_conditional_expression=80
-org.eclipse.jdt.core.formatter.alignment_for_enum_constants=48
-org.eclipse.jdt.core.formatter.alignment_for_expressions_in_array_initializer=16
-org.eclipse.jdt.core.formatter.alignment_for_multiple_fields=16
-org.eclipse.jdt.core.formatter.alignment_for_parameters_in_constructor_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_parameters_in_method_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_selector_in_method_invocation=16
-org.eclipse.jdt.core.formatter.alignment_for_superclass_in_type_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_enum_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_type_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_constructor_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_method_declaration=16
-org.eclipse.jdt.core.formatter.blank_lines_after_imports=1
-org.eclipse.jdt.core.formatter.blank_lines_after_package=1
-org.eclipse.jdt.core.formatter.blank_lines_before_field=0
-org.eclipse.jdt.core.formatter.blank_lines_before_first_class_body_declaration=0
-org.eclipse.jdt.core.formatter.blank_lines_before_imports=1
-org.eclipse.jdt.core.formatter.blank_lines_before_member_type=1
-org.eclipse.jdt.core.formatter.blank_lines_before_method=1
-org.eclipse.jdt.core.formatter.blank_lines_before_new_chunk=1
-org.eclipse.jdt.core.formatter.blank_lines_before_package=0
-org.eclipse.jdt.core.formatter.blank_lines_between_import_groups=1
-org.eclipse.jdt.core.formatter.blank_lines_between_type_declarations=1
-org.eclipse.jdt.core.formatter.brace_position_for_annotation_type_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_anonymous_type_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_array_initializer=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_block=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_block_in_case=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_constructor_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_enum_constant=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_enum_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_method_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_switch=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_type_declaration=end_of_line
-org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_block_comment=false
-org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment=false
-org.eclipse.jdt.core.formatter.comment.format_block_comments=true
-org.eclipse.jdt.core.formatter.comment.format_header=false
-org.eclipse.jdt.core.formatter.comment.format_html=true
-org.eclipse.jdt.core.formatter.comment.format_javadoc_comments=true
-org.eclipse.jdt.core.formatter.comment.format_line_comments=true
-org.eclipse.jdt.core.formatter.comment.format_source_code=true
-org.eclipse.jdt.core.formatter.comment.indent_parameter_description=true
-org.eclipse.jdt.core.formatter.comment.indent_root_tags=true
-org.eclipse.jdt.core.formatter.comment.insert_new_line_before_root_tags=insert
-org.eclipse.jdt.core.formatter.comment.insert_new_line_for_parameter=insert
-org.eclipse.jdt.core.formatter.comment.line_length=80
-org.eclipse.jdt.core.formatter.compact_else_if=true
-org.eclipse.jdt.core.formatter.continuation_indentation=2
-org.eclipse.jdt.core.formatter.continuation_indentation_for_array_initializer=2
-org.eclipse.jdt.core.formatter.format_guardian_clause_on_one_line=false
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_annotation_declaration_header=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_constant_header=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_declaration_header=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_type_header=true
-org.eclipse.jdt.core.formatter.indent_breaks_compare_to_cases=true
-org.eclipse.jdt.core.formatter.indent_empty_lines=false
-org.eclipse.jdt.core.formatter.indent_statements_compare_to_block=true
-org.eclipse.jdt.core.formatter.indent_statements_compare_to_body=true
-org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_cases=true
-org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_switch=true
-org.eclipse.jdt.core.formatter.indentation.size=4
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_local_variable=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_member=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_parameter=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_opening_brace_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_at_end_of_file_if_missing=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_catch_in_try_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_closing_brace_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_else_in_if_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_finally_in_try_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_while_in_do_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_annotation_declaration=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_anonymous_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_block=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_constant=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_declaration=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_method_body=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_after_and_in_type_parameter=insert
-org.eclipse.jdt.core.formatter.insert_space_after_assignment_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation_type_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_binary_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_brace_in_block=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_paren_in_cast=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_assert=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_case=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_labeled_statement=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_allocation_expression=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_annotation=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_throws=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_constant_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_declarations=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_explicitconstructorcall_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_increments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_inits=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_throws=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_invocation_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_field_declarations=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_local_declarations=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_parameterized_type_reference=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_superinterfaces=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_ellipsis=insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_brace_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_cast=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_catch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_for=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_if=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_parenthesized_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_switch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_synchronized=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_while=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_postfix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_prefix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_question_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_after_question_in_wildcard=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_after_unary_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_and_in_type_parameter=insert
-org.eclipse.jdt.core.formatter.insert_space_before_assignment_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_before_at_in_annotation_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_binary_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_brace_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_cast=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_catch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_for=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_if=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_parenthesized_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_switch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_synchronized=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_while=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_assert=insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_case=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_default=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_labeled_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_throws=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_constant_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_declarations=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_explicitconstructorcall_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_increments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_inits=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_throws=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_invocation_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_field_declarations=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_local_declarations=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_superinterfaces=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_ellipsis=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_annotation_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_anonymous_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_block=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_constructor_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_constant=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_method_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_switch=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation_type_member_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_catch=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_if=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_parenthesized_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_switch=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_synchronized=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_while=insert
-org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_return=insert
-org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_throw=insert
-org.eclipse.jdt.core.formatter.insert_space_before_postfix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_prefix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_question_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_before_question_in_wildcard=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_semicolon=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_for=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_unary_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_brackets_in_array_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_braces_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_brackets_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_annotation_type_member_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.join_lines_in_comments=true
-org.eclipse.jdt.core.formatter.join_wrapped_lines=true
-org.eclipse.jdt.core.formatter.keep_else_statement_on_same_line=false
-org.eclipse.jdt.core.formatter.keep_empty_array_initializer_on_one_line=false
-org.eclipse.jdt.core.formatter.keep_imple_if_on_one_line=false
-org.eclipse.jdt.core.formatter.keep_then_statement_on_same_line=false
-org.eclipse.jdt.core.formatter.lineSplit=120
-org.eclipse.jdt.core.formatter.never_indent_block_comments_on_first_column=false
-org.eclipse.jdt.core.formatter.never_indent_line_comments_on_first_column=false
-org.eclipse.jdt.core.formatter.number_of_blank_lines_at_beginning_of_method_body=0
-org.eclipse.jdt.core.formatter.number_of_empty_lines_to_preserve=1
-org.eclipse.jdt.core.formatter.put_empty_statement_on_new_line=true
-org.eclipse.jdt.core.formatter.tabulation.char=space
-org.eclipse.jdt.core.formatter.tabulation.size=4
-org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations=false
-org.eclipse.jdt.core.formatter.wrap_before_binary_operator=true
diff --git a/hyracks/hyracks-dataflow-hadoop/.settings/org.maven.ide.eclipse.prefs b/hyracks/hyracks-dataflow-hadoop/.settings/org.maven.ide.eclipse.prefs
deleted file mode 100644
index 9e0c887..0000000
--- a/hyracks/hyracks-dataflow-hadoop/.settings/org.maven.ide.eclipse.prefs
+++ /dev/null
@@ -1,9 +0,0 @@
-#Fri Aug 13 04:07:00 PDT 2010
-activeProfiles=
-eclipse.preferences.version=1
-fullBuildGoals=process-test-resources
-includeModules=false
-resolveWorkspaceProjects=true
-resourceFilterGoals=process-resources resources\:testResources
-skipCompilerPlugin=true
-version=1
diff --git a/hyracks/hyracks-dataflow-hadoop/pom.xml b/hyracks/hyracks-dataflow-hadoop/pom.xml
index 4605153..e3f9b1e 100644
--- a/hyracks/hyracks-dataflow-hadoop/pom.xml
+++ b/hyracks/hyracks-dataflow-hadoop/pom.xml
@@ -2,12 +2,12 @@
<modelVersion>4.0.0</modelVersion>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-hadoop</artifactId>
- <version>0.1.9-SNAPSHOT</version>
+ <version>0.2.0-SNAPSHOT</version>
<parent>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks</artifactId>
- <version>0.1.9-SNAPSHOT</version>
+ <version>0.2.0-SNAPSHOT</version>
</parent>
<build>
@@ -27,14 +27,14 @@
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
- <version>0.1.9-SNAPSHOT</version>
+ <version>0.2.0-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-common</artifactId>
- <version>0.1.9-SNAPSHOT</version>
+ <version>0.2.0-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
@@ -54,7 +54,7 @@
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-std</artifactId>
- <version>0.1.9-SNAPSHOT</version>
+ <version>0.2.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
index fd549b3..0ea56c4 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -34,13 +34,12 @@
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.ReflectionUtils;
-import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.io.SerializingDataWriter;
import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
@@ -56,7 +55,7 @@
protected OutputCollector<K2, V2> output;
protected Reporter reporter;
protected Object mapper;
- //protected Mapper<K1, V1, K2, V2> mapper;
+ // protected Mapper<K1, V1, K2, V2> mapper;
protected int partition;
protected JobConf conf;
protected IOpenableDataWriter<Object[]> writer;
@@ -96,7 +95,8 @@
if (!conf.getUseNewMapper()) {
((org.apache.hadoop.mapred.Mapper) mapper).close();
} else {
- // do nothing. closing the mapper is handled internally by run method on context.
+ // do nothing. closing the mapper is handled internally by
+ // run method on context.
}
} catch (IOException ioe) {
throw new HyracksDataException(ioe);
@@ -118,6 +118,11 @@
}
@Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
public void open() throws HyracksDataException {
initializeMapper();
writer.open();
@@ -172,16 +177,18 @@
} else if (splitRead instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
conf.set("map.input.file", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getPath()
.toString());
- conf.setLong("map.input.start", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead)
- .getStart());
- conf.setLong("map.input.length", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead)
- .getLength());
+ conf.setLong("map.input.start",
+ ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getStart());
+ conf.setLong("map.input.length",
+ ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getLength());
}
} catch (Exception e) {
e.printStackTrace();
- // we do not throw the exception here as we are setting additional parameters that may not be
- // required by the mapper. If they are indeed required, the configure method invoked on the mapper
- // shall report an exception because of the missing parameters.
+ // we do not throw the exception here as we are setting
+ // additional parameters that may not be
+ // required by the mapper. If they are indeed required, the
+ // configure method invoked on the mapper
+ // shall report an exception because of the missing parameters.
}
}
@@ -223,7 +230,9 @@
data[1] = value;
writer.writeData(data);
}
- };;;
+ };
+ ;
+ ;
OutputCommitter outputCommitter = new org.apache.hadoop.mapreduce.lib.output.NullOutputFormat()
.getOutputCommitter(new TaskAttemptContext(conf, new TaskAttemptID()));
@@ -245,7 +254,9 @@
public Counter getCounter(Enum<?> arg0) {
return null;
}
- };;;
+ };
+ ;
+ ;
context = new org.apache.hadoop.mapreduce.Mapper().new Context(conf, new TaskAttemptID(),
newReader, recordWriter, outputCommitter, statusReporter,
(org.apache.hadoop.mapreduce.InputSplit) inputSplit);
@@ -315,9 +326,9 @@
String mapOutputValueClassName = conf.getMapOutputValueClass().getName();
try {
if (hadoopClassFactory == null) {
- recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) Class
- .forName(mapOutputKeyClassName), (Class<? extends Writable>) Class
- .forName(mapOutputValueClassName));
+ recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+ (Class<? extends Writable>) Class.forName(mapOutputKeyClassName),
+ (Class<? extends Writable>) Class.forName(mapOutputValueClassName));
} else {
recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
(Class<? extends Writable>) hadoopClassFactory.loadClass(mapOutputKeyClassName),
@@ -361,13 +372,13 @@
} else {
Class inputFormatClass = conf.getInputFormat().getClass();
InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
- return inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit) inputSplit, conf, super
- .createReporter());
+ return inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit) inputSplit, conf,
+ super.createReporter());
}
}
@Override
- public IOperatorNodePushable createPushRuntime(IHyracksStageletContext ctx, IOperatorEnvironment env,
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
JobConf conf = getJobConf();
@@ -398,17 +409,17 @@
(Class<? extends Writable>) oldReader.createKey().getClass(),
(Class<? extends Writable>) oldReader.createValue().getClass());
}
- return createSelfReadingMapper(ctx, env, recordDescriptor, partition);
+ return createSelfReadingMapper(ctx, recordDescriptor, partition);
} else {
- return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition), recordDescProvider
- .getInputRecordDescriptor(this.odId, 0));
+ return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition),
+ recordDescProvider.getInputRecordDescriptor(this.odId, 0));
}
} catch (Exception e) {
throw new HyracksDataException(e);
}
}
- private IOperatorNodePushable createSelfReadingMapper(final IHyracksStageletContext ctx, IOperatorEnvironment env,
+ private IOperatorNodePushable createSelfReadingMapper(final IHyracksTaskContext ctx,
final RecordDescriptor recordDescriptor, final int partition) {
return new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
@@ -418,9 +429,11 @@
try {
readMapOp.mapInput();
} catch (Exception e) {
+ writer.fail();
throw new HyracksDataException(e);
+ } finally {
+ readMapOp.close();
}
- readMapOp.close();
}
};
}
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
index 915d09b..2841509 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
@@ -32,12 +32,11 @@
import org.apache.hadoop.util.ReflectionUtils;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -143,7 +142,7 @@
@SuppressWarnings("deprecation")
@Override
- public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx, IOperatorEnvironment env,
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
return new AbstractUnaryOutputSourceOperatorNodePushable() {
@@ -192,28 +191,35 @@
RecordDescriptor outputRecordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
(Class<? extends Writable>) hadoopRecordReader.createKey().getClass(),
(Class<? extends Writable>) hadoopRecordReader.createValue().getClass());
- int nFields = outputRecordDescriptor.getFields().length;
+ int nFields = outputRecordDescriptor.getFieldCount();
ArrayTupleBuilder tb = new ArrayTupleBuilder(nFields);
- while (hadoopRecordReader.next(key, value)) {
- tb.reset();
- switch (nFields) {
- case 2:
- tb.addField(outputRecordDescriptor.getFields()[0], key);
- case 1:
- tb.addField(outputRecordDescriptor.getFields()[1], value);
- }
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- FrameUtils.flushFrame(outBuffer, writer);
- appender.reset(outBuffer, true);
+ writer.open();
+ try {
+ while (hadoopRecordReader.next(key, value)) {
+ tb.reset();
+ switch (nFields) {
+ case 2:
+ tb.addField(outputRecordDescriptor.getFields()[0], key);
+ case 1:
+ tb.addField(outputRecordDescriptor.getFields()[1], value);
+ }
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException();
+ FrameUtils.flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
}
}
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outBuffer, writer);
+ }
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ writer.close();
}
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outBuffer, writer);
- }
- writer.close();
hadoopRecordReader.close();
} catch (InstantiationException e) {
throw new HyracksDataException(e);
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
index 5857f36..73a8c16 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
@@ -23,19 +23,18 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
-import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IDataReader;
import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
@@ -44,7 +43,6 @@
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.hadoop.data.KeyComparatorFactory;
import edu.uci.ics.hyracks.dataflow.hadoop.data.RawComparingComparatorFactory;
@@ -348,7 +346,7 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(IHyracksStageletContext ctx, IOperatorEnvironment env,
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
try {
if (this.comparatorFactory == null) {
@@ -395,8 +393,9 @@
RecordDescriptor recordDescriptor = null;
try {
if (classFactory == null) {
- recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) Class
- .forName(outputKeyClassName), (Class<? extends Writable>) Class.forName(outputValueClassName));
+ recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+ (Class<? extends Writable>) Class.forName(outputKeyClassName),
+ (Class<? extends Writable>) Class.forName(outputValueClassName));
} else {
recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
(Class<? extends Writable>) classFactory.loadClass(outputKeyClassName),
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java
new file mode 100644
index 0000000..e8b6e8b
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java
@@ -0,0 +1,268 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.HadoopNewPartitionerTuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+
+public class HadoopHelper {
+ public static final int KEY_FIELD_INDEX = 0;
+ public static final int VALUE_FIELD_INDEX = 1;
+ public static final int BLOCKID_FIELD_INDEX = 2;
+ private static final int[] KEY_SORT_FIELDS = new int[] { 0 };
+
+ private MarshalledWritable<Configuration> mConfig;
+ private Configuration config;
+ private Job job;
+
+ public HadoopHelper(MarshalledWritable<Configuration> mConfig) throws HyracksDataException {
+ this.mConfig = mConfig;
+ ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ config = mConfig.get();
+ config.setClassLoader(getClass().getClassLoader());
+ job = new Job(config);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ }
+
+ public RecordDescriptor getMapOutputRecordDescriptor() throws HyracksDataException {
+ try {
+ return new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+ .getMapOutputKeyClass()),
+ DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+ .getMapOutputValueClass()), IntegerSerializerDeserializer.INSTANCE });
+
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public RecordDescriptor getMapOutputRecordDescriptorWithoutExtraFields() throws HyracksDataException {
+ try {
+ return new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+ .getMapOutputKeyClass()),
+ DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+ .getMapOutputValueClass()) });
+
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public TaskAttemptContext createTaskAttemptContext(TaskAttemptID taId) {
+ ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(config.getClassLoader());
+ return new TaskAttemptContext(config, taId);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ }
+
+ public JobContext createJobContext() {
+ ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(config.getClassLoader());
+ return new JobContext(config, null);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ }
+
+ public <K1, V1, K2, V2> Mapper<K1, V1, K2, V2> getMapper() throws HyracksDataException {
+ try {
+ return (Mapper<K1, V1, K2, V2>) HadoopTools.newInstance(job.getMapperClass());
+ } catch (ClassNotFoundException e) {
+ throw new HyracksDataException(e);
+ } catch (InstantiationException e) {
+ throw new HyracksDataException(e);
+ } catch (IllegalAccessException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public <K2, V2, K3, V3> Reducer<K2, V2, K3, V3> getReducer() throws HyracksDataException {
+ try {
+ return (Reducer<K2, V2, K3, V3>) HadoopTools.newInstance(job.getReducerClass());
+ } catch (ClassNotFoundException e) {
+ throw new HyracksDataException(e);
+ } catch (InstantiationException e) {
+ throw new HyracksDataException(e);
+ } catch (IllegalAccessException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public <K2, V2> Reducer<K2, V2, K2, V2> getCombiner() throws HyracksDataException {
+ try {
+ return (Reducer<K2, V2, K2, V2>) HadoopTools.newInstance(job.getCombinerClass());
+ } catch (ClassNotFoundException e) {
+ throw new HyracksDataException(e);
+ } catch (InstantiationException e) {
+ throw new HyracksDataException(e);
+ } catch (IllegalAccessException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public <K, V> InputFormat<K, V> getInputFormat() throws HyracksDataException {
+ try {
+ return (InputFormat<K, V>) ReflectionUtils.newInstance(job.getInputFormatClass(), config);
+ } catch (ClassNotFoundException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public <K, V> List<InputSplit> getInputSplits() throws HyracksDataException {
+ ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ InputFormat<K, V> fmt = getInputFormat();
+ JobContext jCtx = new JobContext(config, null);
+ try {
+ return fmt.getSplits(jCtx);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ }
+
+ public IBinaryComparatorFactory[] getSortComparatorFactories() {
+ WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(job
+ .getSortComparator().getClass());
+
+ return new IBinaryComparatorFactory[] { comparatorFactory };
+ }
+
+ public IBinaryComparatorFactory[] getGroupingComparatorFactories() {
+ WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(job
+ .getGroupingComparator().getClass());
+
+ return new IBinaryComparatorFactory[] { comparatorFactory };
+ }
+
+ public RawComparator<?> getRawGroupingComparator() {
+ return job.getGroupingComparator();
+ }
+
+ public int getSortFrameLimit(IHyracksCommonContext ctx) {
+ int sortMemory = job.getConfiguration().getInt("io.sort.mb", 100);
+ return (int) (((long) sortMemory * 1024 * 1024) / ctx.getFrameSize());
+ }
+
+ public Job getJob() {
+ return job;
+ }
+
+ public MarshalledWritable<Configuration> getMarshalledConfiguration() {
+ return mConfig;
+ }
+
+ public Configuration getConfiguration() {
+ return config;
+ }
+
+ public ITuplePartitionComputerFactory getTuplePartitionComputer() throws HyracksDataException {
+ int nReducers = job.getNumReduceTasks();
+ try {
+ return new HadoopNewPartitionerTuplePartitionComputerFactory<Writable, Writable>(
+ (Class<? extends Partitioner<Writable, Writable>>) job.getPartitionerClass(),
+ (ISerializerDeserializer<Writable>) DatatypeHelper
+ .createSerializerDeserializer((Class<? extends Writable>) job.getMapOutputKeyClass()),
+ (ISerializerDeserializer<Writable>) DatatypeHelper
+ .createSerializerDeserializer((Class<? extends Writable>) job.getMapOutputValueClass()));
+ } catch (ClassNotFoundException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public int[] getSortFields() {
+ return KEY_SORT_FIELDS;
+ }
+
+ public <K> ISerializerDeserializer<K> getMapOutputKeySerializerDeserializer() {
+ return (ISerializerDeserializer<K>) DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+ .getMapOutputKeyClass());
+ }
+
+ public <V> ISerializerDeserializer<V> getMapOutputValueSerializerDeserializer() {
+ return (ISerializerDeserializer<V>) DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+ .getMapOutputValueClass());
+ }
+
+ public FileSystem getFilesystem() throws HyracksDataException {
+ try {
+ return FileSystem.get(config);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public <K, V> OutputFormat<K, V> getOutputFormat() throws HyracksDataException {
+ try {
+ return (OutputFormat<K, V>) ReflectionUtils.newInstance(job.getOutputFormatClass(), config);
+ } catch (ClassNotFoundException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public boolean hasCombiner() throws HyracksDataException {
+ try {
+ return job.getCombinerClass() != null;
+ } catch (ClassNotFoundException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopTools.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopTools.java
new file mode 100644
index 0000000..7e4d67f
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopTools.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+public class HadoopTools {
+ public static Object newInstance(String className) throws ClassNotFoundException, InstantiationException,
+ IllegalAccessException {
+ ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(HadoopTools.class.getClassLoader());
+ Class<?> clazz = Class.forName(className, true, HadoopTools.class.getClassLoader());
+ return newInstance(clazz);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ }
+
+ public static Object newInstance(Class<?> clazz) throws InstantiationException, IllegalAccessException {
+ return clazz.newInstance();
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HashPartitioningShuffleConnectorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HashPartitioningShuffleConnectorDescriptor.java
new file mode 100644
index 0000000..7c6bf86
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HashPartitioningShuffleConnectorDescriptor.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.util.BitSet;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
+import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
+import edu.uci.ics.hyracks.dataflow.std.collectors.PartitionCollector;
+import edu.uci.ics.hyracks.dataflow.std.connectors.PartitionDataWriter;
+
+public class HashPartitioningShuffleConnectorDescriptor extends AbstractMToNConnectorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ private final MarshalledWritable<Configuration> mConfig;
+
+ public HashPartitioningShuffleConnectorDescriptor(JobSpecification spec, MarshalledWritable<Configuration> mConfig) {
+ super(spec);
+ this.mConfig = mConfig;
+ }
+
+ @Override
+ public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
+ IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+ throws HyracksDataException {
+ HadoopHelper helper = new HadoopHelper(mConfig);
+ ITuplePartitionComputerFactory tpcf = helper.getTuplePartitionComputer();
+ return new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner());
+ }
+
+ @Override
+ public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
+ int receiverIndex, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+ BitSet expectedPartitions = new BitSet();
+ expectedPartitions.set(0, nProducerPartitions);
+ NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
+ expectedPartitions);
+ IFrameReader frameReader = new ShuffleFrameReader(ctx, channelReader, mConfig);
+ return new PartitionCollector(ctx, getConnectorId(), receiverIndex, expectedPartitions, frameReader,
+ channelReader);
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/IInputSplitProvider.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/IInputSplitProvider.java
new file mode 100644
index 0000000..1545c06
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/IInputSplitProvider.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IInputSplitProvider {
+ public InputSplit next() throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/IInputSplitProviderFactory.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/IInputSplitProviderFactory.java
new file mode 100644
index 0000000..73588ab
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/IInputSplitProviderFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IInputSplitProviderFactory extends Serializable {
+ public IInputSplitProvider createInputSplitProvider(int id) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/InputFileSplit.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/InputFileSplit.java
new file mode 100644
index 0000000..b37084e
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/InputFileSplit.java
@@ -0,0 +1,92 @@
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+public class InputFileSplit extends InputSplit implements Writable {
+ private Path file;
+ private long start;
+ private long length;
+ private int blockId;
+ private String[] hosts;
+ private long scheduleTime;
+
+ public InputFileSplit() {
+ }
+
+ public InputFileSplit(int blockId, Path file, long start, long length, String[] hosts, long schedule_time) {
+ this.blockId = blockId;
+ this.file = file;
+ this.start = start;
+ this.length = length;
+ this.hosts = hosts;
+ this.scheduleTime = schedule_time;
+ }
+
+ public int blockId() {
+ return blockId;
+ }
+
+ public long scheduleTime() {
+ return this.scheduleTime;
+ }
+
+ public Path getPath() {
+ return file;
+ }
+
+ /** The position of the first byte in the file to process. */
+ public long getStart() {
+ return start;
+ }
+
+ /** The number of bytes in the file to process. */
+ @Override
+ public long getLength() {
+ return length;
+ }
+
+ @Override
+ public String toString() {
+ return file + ":" + start + "+" + length;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, file.toString());
+ out.writeLong(start);
+ out.writeLong(length);
+ out.writeInt(blockId);
+ out.writeLong(this.scheduleTime);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ file = new Path(Text.readString(in));
+ start = in.readLong();
+ length = in.readLong();
+ hosts = null;
+ this.blockId = in.readInt();
+ this.scheduleTime = in.readLong();
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ if (this.hosts == null) {
+ return new String[] {};
+ } else {
+ return this.hosts;
+ }
+ }
+
+ public FileSplit toFileSplit() {
+ return new FileSplit(file, start, length, hosts);
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/KVIterator.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/KVIterator.java
new file mode 100644
index 0000000..a00fa7f0
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/KVIterator.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.util.Progress;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class KVIterator implements RawKeyValueIterator {
+ private final HadoopHelper helper;
+ private FrameTupleAccessor accessor;
+ private DataInputBuffer kBuffer;
+ private DataInputBuffer vBuffer;
+ private List<ByteBuffer> buffers;
+ private int bSize;
+ private int bPtr;
+ private int tIdx;
+ private boolean eog;
+
+ public KVIterator(IHyracksTaskContext ctx, HadoopHelper helper, RecordDescriptor recordDescriptor) {
+ this.helper = helper;
+ accessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ kBuffer = new DataInputBuffer();
+ vBuffer = new DataInputBuffer();
+ }
+
+ void reset(List<ByteBuffer> buffers, int bSize) {
+ this.buffers = buffers;
+ this.bSize = bSize;
+ bPtr = 0;
+ tIdx = 0;
+ eog = false;
+ if (bSize > 0) {
+ accessor.reset(buffers.get(0));
+ tIdx = -1;
+ } else {
+ eog = true;
+ }
+ }
+
+ @Override
+ public DataInputBuffer getKey() throws IOException {
+ return kBuffer;
+ }
+
+ @Override
+ public DataInputBuffer getValue() throws IOException {
+ return vBuffer;
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ while (true) {
+ if (eog) {
+ return false;
+ }
+ ++tIdx;
+ if (accessor.getTupleCount() <= tIdx) {
+ ++bPtr;
+ if (bPtr >= bSize) {
+ eog = true;
+ continue;
+ }
+ tIdx = -1;
+ accessor.reset(buffers.get(bPtr));
+ continue;
+ }
+ kBuffer.reset(accessor.getBuffer().array(),
+ FrameUtils.getAbsoluteFieldStartOffset(accessor, tIdx, helper.KEY_FIELD_INDEX),
+ accessor.getFieldLength(tIdx, helper.KEY_FIELD_INDEX));
+ vBuffer.reset(accessor.getBuffer().array(),
+ FrameUtils.getAbsoluteFieldStartOffset(accessor, tIdx, helper.VALUE_FIELD_INDEX),
+ accessor.getFieldLength(tIdx, helper.VALUE_FIELD_INDEX));
+ break;
+ }
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public Progress getProgress() {
+ return null;
+ }
+}
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
new file mode 100644
index 0000000..4a61296
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
@@ -0,0 +1,271 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunMerger;
+
+public class MapperOperatorDescriptor<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable>
+ extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ private final int jobId;
+ private final MarshalledWritable<Configuration> config;
+ private final IInputSplitProviderFactory factory;
+
+ public MapperOperatorDescriptor(JobSpecification spec, int jobId, MarshalledWritable<Configuration> config,
+ IInputSplitProviderFactory factory) throws HyracksDataException {
+ super(spec, 0, 1);
+ this.jobId = jobId;
+ this.config = config;
+ this.factory = factory;
+ HadoopHelper helper = new HadoopHelper(config);
+ recordDescriptors[0] = helper.getMapOutputRecordDescriptor();
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+ throws HyracksDataException {
+ final HadoopHelper helper = new HadoopHelper(config);
+ final Configuration conf = helper.getConfiguration();
+ final Mapper<K1, V1, K2, V2> mapper = helper.getMapper();
+ final InputFormat<K1, V1> inputFormat = helper.getInputFormat();
+ final IInputSplitProvider isp = factory.createInputSplitProvider(partition);
+ final TaskAttemptID taId = new TaskAttemptID("foo", jobId, true, partition, 0);
+ final TaskAttemptContext taskAttemptContext = helper.createTaskAttemptContext(taId);
+
+ final int framesLimit = helper.getSortFrameLimit(ctx);
+ final IBinaryComparatorFactory[] comparatorFactories = helper.getSortComparatorFactories();
+
+ class SortingRecordWriter extends RecordWriter<K2, V2> {
+ private final ArrayTupleBuilder tb;
+ private final ByteBuffer frame;
+ private final FrameTupleAppender fta;
+ private ExternalSortRunGenerator runGen;
+ private int blockId;
+
+ public SortingRecordWriter() throws HyracksDataException {
+ tb = new ArrayTupleBuilder(2);
+ frame = ctx.allocateFrame();
+ fta = new FrameTupleAppender(ctx.getFrameSize());
+ fta.reset(frame, true);
+ }
+
+ public void initBlock(int blockId) throws HyracksDataException {
+ runGen = new ExternalSortRunGenerator(ctx, new int[] { 0 }, null, comparatorFactories,
+ helper.getMapOutputRecordDescriptorWithoutExtraFields(), framesLimit);
+ this.blockId = blockId;
+ }
+
+ @Override
+ public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
+ }
+
+ @Override
+ public void write(K2 key, V2 value) throws IOException, InterruptedException {
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ key.write(dos);
+ tb.addFieldEndOffset();
+ value.write(dos);
+ tb.addFieldEndOffset();
+ if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ runGen.nextFrame(frame);
+ fta.reset(frame, true);
+ if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ public void sortAndFlushBlock(final IFrameWriter writer) throws HyracksDataException {
+ if (fta.getTupleCount() > 0) {
+ runGen.nextFrame(frame);
+ fta.reset(frame, true);
+ }
+ runGen.close();
+ IFrameWriter delegatingWriter = new IFrameWriter() {
+ private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+ private final ByteBuffer outFrame = ctx.allocateFrame();
+ private final FrameTupleAccessor fta = new FrameTupleAccessor(ctx.getFrameSize(),
+ helper.getMapOutputRecordDescriptorWithoutExtraFields());
+ private final ArrayTupleBuilder tb = new ArrayTupleBuilder(3);
+
+ @Override
+ public void open() throws HyracksDataException {
+ appender.reset(outFrame, true);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ fta.reset(buffer);
+ int n = fta.getTupleCount();
+ for (int i = 0; i < n; ++i) {
+ tb.reset();
+ tb.addField(fta, i, 0);
+ tb.addField(fta, i, 1);
+ try {
+ tb.getDataOutput().writeInt(blockId);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ tb.addFieldEndOffset();
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(outFrame, writer);
+ appender.reset(outFrame, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outFrame, writer);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+ };
+ if (helper.hasCombiner()) {
+ Reducer<K2, V2, K2, V2> combiner = helper.getCombiner();
+ TaskAttemptID ctaId = new TaskAttemptID("foo", jobId, true, partition, 0);
+ TaskAttemptContext ctaskAttemptContext = helper.createTaskAttemptContext(taId);
+ final IFrameWriter outputWriter = delegatingWriter;
+ RecordWriter<K2, V2> recordWriter = new RecordWriter<K2, V2>() {
+ private final FrameTupleAppender fta = new FrameTupleAppender(ctx.getFrameSize());
+ private final ByteBuffer buffer = ctx.allocateFrame();
+ private final ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+
+ {
+ fta.reset(buffer, true);
+ outputWriter.open();
+ }
+
+ @Override
+ public void write(K2 key, V2 value) throws IOException, InterruptedException {
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ key.write(dos);
+ tb.addFieldEndOffset();
+ value.write(dos);
+ tb.addFieldEndOffset();
+ if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(buffer, outputWriter);
+ fta.reset(buffer, true);
+ if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ if (fta.getTupleCount() > 0) {
+ FrameUtils.flushFrame(buffer, outputWriter);
+ outputWriter.close();
+ }
+ }
+ };
+ delegatingWriter = new ReduceWriter<K2, V2, K2, V2>(ctx, helper,
+ new int[] { HadoopHelper.KEY_FIELD_INDEX }, helper.getGroupingComparatorFactories(),
+ helper.getMapOutputRecordDescriptorWithoutExtraFields(), combiner, recordWriter, ctaId,
+ ctaskAttemptContext);
+ }
+ IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, runGen.getFrameSorter(),
+ runGen.getRuns(), new int[] { 0 }, comparators,
+ helper.getMapOutputRecordDescriptorWithoutExtraFields(), framesLimit, delegatingWriter);
+ merger.process();
+ }
+ }
+
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ @Override
+ public void initialize() throws HyracksDataException {
+ writer.open();
+ try {
+ SortingRecordWriter recordWriter = new SortingRecordWriter();
+ InputSplit split = null;
+ int blockId = 0;
+ while ((split = isp.next()) != null) {
+ try {
+ RecordReader<K1, V1> recordReader = inputFormat.createRecordReader(split,
+ taskAttemptContext);
+ ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ recordReader.initialize(split, taskAttemptContext);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ recordWriter.initBlock(blockId);
+ Mapper<K1, V1, K2, V2>.Context mCtx = mapper.new Context(conf, taId, recordReader,
+ recordWriter, null, null, split);
+ mapper.run(mCtx);
+ recordReader.close();
+ recordWriter.sortAndFlushBlock(writer);
+ ++blockId;
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ } finally {
+ writer.close();
+ }
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MarshalledWritable.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MarshalledWritable.java
new file mode 100644
index 0000000..be05f22
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MarshalledWritable.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2010 University of California, Irvine
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.Writable;
+
+public class MarshalledWritable<T extends Writable> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private byte[] bytes;
+
+ public MarshalledWritable() {
+ }
+
+ public void set(T o) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ dos.writeUTF(o.getClass().getName());
+ o.write(dos);
+ dos.close();
+ bytes = baos.toByteArray();
+ }
+
+ public T get() throws Exception {
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ DataInputStream dis = new DataInputStream(bais);
+ String className = dis.readUTF();
+ T o = (T) HadoopTools.newInstance(className);
+ o.readFields(dis);
+ return o;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java
new file mode 100644
index 0000000..33d58af
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java
@@ -0,0 +1,187 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class ReduceWriter<K2, V2, K3, V3> implements IFrameWriter {
+ private final IHyracksTaskContext ctx;
+ private final HadoopHelper helper;
+ private final int[] groupFields;
+ private final FrameTupleAccessor accessor0;
+ private final FrameTupleAccessor accessor1;
+ private final ByteBuffer copyFrame;
+ private final IBinaryComparator[] comparators;
+ private final KVIterator kvi;
+ private final Reducer<K2, V2, K3, V3> reducer;
+ private final RecordWriter<K3, V3> recordWriter;
+ private final TaskAttemptID taId;
+ private final TaskAttemptContext taskAttemptContext;
+
+ private boolean first;
+ private boolean groupStarted;
+ private List<ByteBuffer> group;
+ private int bPtr;
+ private FrameTupleAppender fta;
+ private Counter keyCounter;
+ private Counter valueCounter;
+
+ public ReduceWriter(IHyracksTaskContext ctx, HadoopHelper helper, int[] groupFields,
+ IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
+ Reducer<K2, V2, K3, V3> reducer, RecordWriter<K3, V3> recordWriter, TaskAttemptID taId,
+ TaskAttemptContext taskAttemptContext) {
+ this.ctx = ctx;
+ this.helper = helper;
+ this.groupFields = groupFields;
+ accessor0 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ accessor1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ copyFrame = ctx.allocateFrame();
+ accessor1.reset(copyFrame);
+ comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ this.reducer = reducer;
+ this.recordWriter = recordWriter;
+ this.taId = taId;
+ this.taskAttemptContext = taskAttemptContext;
+
+ kvi = new KVIterator(ctx, helper, recordDescriptor);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ first = true;
+ groupStarted = false;
+ group = new ArrayList<ByteBuffer>();
+ bPtr = 0;
+ group.add(ctx.allocateFrame());
+ fta = new FrameTupleAppender(ctx.getFrameSize());
+ keyCounter = new Counter() {
+ };
+ valueCounter = new Counter() {
+ };
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor0.reset(buffer);
+ int nTuples = accessor0.getTupleCount();
+ for (int i = 0; i < nTuples; ++i) {
+ if (first) {
+ groupInit();
+ first = false;
+ } else {
+ if (i == 0) {
+ switchGroupIfRequired(accessor1, accessor1.getTupleCount() - 1, accessor0, i);
+ } else {
+ switchGroupIfRequired(accessor0, i - 1, accessor0, i);
+ }
+ }
+ accumulate(accessor0, i);
+ }
+ FrameUtils.copy(buffer, copyFrame);
+ }
+
+ private void accumulate(FrameTupleAccessor accessor, int tIndex) {
+ if (!fta.append(accessor, tIndex)) {
+ ++bPtr;
+ if (group.size() <= bPtr) {
+ group.add(ctx.allocateFrame());
+ }
+ fta.reset(group.get(bPtr), true);
+ if (!fta.append(accessor, tIndex)) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
+ FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
+ if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
+ reduce();
+ groupInit();
+ }
+ }
+
+ private void groupInit() {
+ groupStarted = true;
+ bPtr = 0;
+ fta.reset(group.get(0), true);
+ }
+
+ private void reduce() throws HyracksDataException {
+ kvi.reset(group, bPtr + 1);
+ try {
+ Reducer<K2, V2, K3, V3>.Context rCtx = reducer.new Context(helper.getConfiguration(), taId, kvi,
+ keyCounter, valueCounter, recordWriter, null, null,
+ (RawComparator<K2>) helper.getRawGroupingComparator(), (Class<K2>) helper.getJob()
+ .getMapOutputKeyClass(), (Class<V2>) helper.getJob().getMapOutputValueClass());
+ reducer.run(rCtx);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ groupStarted = false;
+ }
+
+ private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) {
+ for (int i = 0; i < comparators.length; ++i) {
+ int fIdx = groupFields[i];
+ int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength() + a1.getFieldStartOffset(t1Idx, fIdx);
+ int l1 = a1.getFieldLength(t1Idx, fIdx);
+ int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, fIdx);
+ int l2 = a2.getFieldLength(t2Idx, fIdx);
+ if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (groupStarted) {
+ reduce();
+ }
+ try {
+ recordWriter.close(taskAttemptContext);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReducerOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReducerOperatorDescriptor.java
new file mode 100644
index 0000000..bbb0d74
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReducerOperatorDescriptor.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+public class ReducerOperatorDescriptor<K2 extends Writable, V2 extends Writable, K3 extends Writable, V3 extends Writable>
+ extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ private final int jobId;
+
+ private MarshalledWritable<Configuration> mConfig;
+
+ public ReducerOperatorDescriptor(JobSpecification spec, int jobId, MarshalledWritable<Configuration> mConfig) {
+ super(spec, 1, 0);
+ this.jobId = jobId;
+ this.mConfig = mConfig;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ final HadoopHelper helper = new HadoopHelper(mConfig);
+ final Reducer<K2, V2, K3, V3> reducer = helper.getReducer();
+ final RecordDescriptor recordDescriptor = helper.getMapOutputRecordDescriptor();
+ final int[] groupFields = helper.getSortFields();
+ IBinaryComparatorFactory[] groupingComparators = helper.getGroupingComparatorFactories();
+
+ final TaskAttemptID taId = new TaskAttemptID("foo", jobId, false, partition, 0);
+ final TaskAttemptContext taskAttemptContext = helper.createTaskAttemptContext(taId);
+ final RecordWriter recordWriter;
+ try {
+ recordWriter = helper.getOutputFormat().getRecordWriter(taskAttemptContext);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+
+ final ReduceWriter<K2, V2, K3, V3> rw = new ReduceWriter<K2, V2, K3, V3>(ctx, helper, groupFields,
+ groupingComparators, recordDescriptor, reducer, recordWriter, taId, taskAttemptContext);
+
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
+ @Override
+ public void open() throws HyracksDataException {
+ rw.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ rw.nextFrame(buffer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ rw.close();
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
new file mode 100644
index 0000000..8e03eae
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunMerger;
+
+public class ShuffleFrameReader implements IFrameReader {
+ private final IHyracksTaskContext ctx;
+ private final NonDeterministicChannelReader channelReader;
+ private final HadoopHelper helper;
+ private final RecordDescriptor recordDescriptor;
+ private List<RunFileWriter> runFileWriters;
+ private RunFileReader reader;
+
+ public ShuffleFrameReader(IHyracksTaskContext ctx, NonDeterministicChannelReader channelReader,
+ MarshalledWritable<Configuration> mConfig) throws HyracksDataException {
+ this.ctx = ctx;
+ this.channelReader = channelReader;
+ helper = new HadoopHelper(mConfig);
+ this.recordDescriptor = helper.getMapOutputRecordDescriptor();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ channelReader.open();
+ int nSenders = channelReader.getSenderPartitionCount();
+ runFileWriters = new ArrayList<RunFileWriter>();
+ RunInfo[] infos = new RunInfo[nSenders];
+ FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ IInputChannel[] channels = channelReader.getChannels();
+ while (true) {
+ int entry = channelReader.findNextSender();
+ if (entry < 0) {
+ break;
+ }
+ RunInfo info = infos[entry];
+ IInputChannel channel = channels[entry];
+ ByteBuffer netBuffer = channel.getNextBuffer();
+ accessor.reset(netBuffer);
+ int nTuples = accessor.getTupleCount();
+ for (int i = 0; i < nTuples; ++i) {
+ int tBlockId = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+ FrameUtils.getAbsoluteFieldStartOffset(accessor, i, HadoopHelper.BLOCKID_FIELD_INDEX));
+ if (info == null) {
+ info = new RunInfo();
+ info.reset(tBlockId);
+ infos[entry] = info;
+ } else if (info.blockId != tBlockId) {
+ info.close();
+ info.reset(tBlockId);
+ }
+ info.write(accessor, i);
+ }
+ channel.recycleBuffer(netBuffer);
+ }
+ for (int i = 0; i < infos.length; ++i) {
+ RunInfo info = infos[i];
+ if (info != null) {
+ info.close();
+ }
+ }
+ infos = null;
+
+ FileReference outFile = ctx.createManagedWorkspaceFile(ShuffleFrameReader.class.getName() + ".run");
+ int framesLimit = helper.getSortFrameLimit(ctx);
+ IBinaryComparatorFactory[] comparatorFactories = helper.getSortComparatorFactories();
+ IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ List<IFrameReader> runs = new LinkedList<IFrameReader>();
+ for (RunFileWriter rfw : runFileWriters) {
+ runs.add(rfw.createReader());
+ }
+ RunFileWriter rfw = new RunFileWriter(outFile, ctx.getIOManager());
+ ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, null, runs, new int[] { 0 }, comparators,
+ recordDescriptor, framesLimit, rfw);
+ merger.process();
+
+ reader = rfw.createReader();
+ reader.open();
+ }
+
+ @Override
+ public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ return reader.nextFrame(buffer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ reader.close();
+ }
+
+ private class RunInfo {
+ private final ByteBuffer buffer;
+ private final FrameTupleAppender fta;
+
+ private FileReference file;
+ private RunFileWriter rfw;
+ private int blockId;
+
+ public RunInfo() {
+ buffer = ctx.allocateFrame();
+ fta = new FrameTupleAppender(ctx.getFrameSize());
+ }
+
+ public void reset(int blockId) throws HyracksDataException {
+ this.blockId = blockId;
+ fta.reset(buffer, true);
+ try {
+ file = ctx.createManagedWorkspaceFile(ShuffleFrameReader.class.getName() + ".run");
+ rfw = new RunFileWriter(file, ctx.getIOManager());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public void write(FrameTupleAccessor accessor, int tIdx) throws HyracksDataException {
+ if (!fta.append(accessor, tIdx)) {
+ flush();
+ if (!fta.append(accessor, tIdx)) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ public void close() throws HyracksDataException {
+ flush();
+ rfw.close();
+ runFileWriters.add(rfw);
+ }
+
+ private void flush() throws HyracksDataException {
+ if (fta.getTupleCount() <= 0) {
+ return;
+ }
+ buffer.limit(buffer.capacity());
+ buffer.position(0);
+ rfw.nextFrame(buffer);
+ fta.reset(buffer, true);
+ }
+ }
+}
\ No newline at end of file