initial checkin of the genomix project, from Hongzhi's code
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2652 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/HyracksCodeFormatProfile.xml b/genomix/HyracksCodeFormatProfile.xml
new file mode 100644
index 0000000..2cde66d
--- /dev/null
+++ b/genomix/HyracksCodeFormatProfile.xml
@@ -0,0 +1,279 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<profiles version="11">
+<profile kind="CodeFormatterProfile" name="HyracksCodeFormatProfile" version="11">
+<setting id="org.eclipse.jdt.core.formatter.comment.insert_new_line_before_root_tags" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.disabling_tag" value="@formatter:off"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_annotation" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_anonymous_type_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_case" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_brace_in_array_initializer" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.new_lines_at_block_boundaries" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_annotation_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_closing_brace_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_field" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_while" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.use_on_off_tags" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_annotation_type_member_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_else_in_if_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_prefix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_else_statement_on_same_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_ellipsis" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.insert_new_line_for_parameter" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_annotation_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_breaks_compare_to_cases" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_multiple_fields" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_expressions_in_array_initializer" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_conditional_expression" value="80"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_binary_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_question_in_wildcard" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_array_initializer" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_finally_in_try_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_local_variable" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_catch_in_try_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_while" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_after_package" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.continuation_indentation" value="2"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_postfix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_method_invocation" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_superinterfaces" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_new_chunk" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_binary_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_package" value="0"/>
+<setting id="org.eclipse.jdt.core.compiler.source" value="1.5"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_constant_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_line_comments" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_declarations" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.join_wrapped_lines" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_block" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_call" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_invocation_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_member_type" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.align_type_members_on_columns" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_for" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_method_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_selector_in_method_invocation" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_switch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_unary_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_case" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.indent_parameter_description" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_switch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_block_comment" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.lineSplit" value="120"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_if" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_brackets_in_array_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_parenthesized_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_explicitconstructorcall_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_constructor_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_first_class_body_declaration" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.indentation.size" value="4"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.enabling_tag" value="@formatter:on"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superclass_in_type_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_assignment" value="0"/>
+<setting id="org.eclipse.jdt.core.compiler.problem.assertIdentifier" value="error"/>
+<setting id="org.eclipse.jdt.core.formatter.tabulation.char" value="space"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_prefix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_statements_compare_to_body" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_method" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_outer_expressions_when_nested" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.format_guardian_clause_on_one_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_cast" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_parameters_in_constructor_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_labeled_statement" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_annotation_type_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_method_body" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_method_declaration" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_constant" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation_type_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_throws" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_if" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_switch" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_throws" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_return" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_question_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_question_in_wildcard" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_throw" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.compiler.problem.enumIdentifier" value="error"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_switch" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_ellipsis" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_block" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_inits" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_method_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.compact_else_if" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_increments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.format_line_comment_starting_on_first_column" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_enum_constant" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.indent_root_tags" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_declarations" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_explicitconstructorcall_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_switch" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_superinterfaces" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.tabulation.size" value="4"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_opening_brace_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_brace_in_block" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_constant" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_if" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_throws" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_constructor_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_assignment_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_assignment_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_empty_lines" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_synchronized" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_paren_in_cast" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_block_in_case" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.number_of_empty_lines_to_preserve" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_catch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_and_in_type_parameter" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_qualified_allocation_expression" value="16"/>
+<setting id="org.eclipse.jdt.core.compiler.compliance" value="1.5"/>
+<setting id="org.eclipse.jdt.core.formatter.continuation_indentation_for_array_initializer" value="2"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_brackets_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_at_in_annotation_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_allocation_expression" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_cast" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_unary_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_anonymous_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_empty_array_initializer_on_one_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_imple_if_on_one_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_at_end_of_file_if_missing" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_labeled_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_type_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_binary_expression" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_enum_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_while" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode" value="enabled"/>
+<setting id="org.eclipse.jdt.core.formatter.put_empty_statement_on_new_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_label" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_parameter" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_while_in_do_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_enum_constant" value="48"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_javadoc_comments" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.line_length" value="9999"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_between_import_groups" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_constant_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_semicolon" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_constructor_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.number_of_blank_lines_at_beginning_of_method_body" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_type_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation_type_member_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_before_binary_operator" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_declaration_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_between_type_declarations" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_synchronized" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_statements_compare_to_block" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_enum_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.join_lines_in_comments" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_question_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_field_declarations" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_compact_if" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_inits" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_cases" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_array_initializer" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_default" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_and_in_type_parameter" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_imports" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_assert" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_html" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_method_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_anonymous_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_for" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_postfix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_source_code" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_synchronized" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_allocation_expression" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_throws" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_parameters_in_method_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_brace_in_array_initializer" value="insert"/>
+<setting id="org.eclipse.jdt.core.compiler.codegen.targetPlatform" value="1.5"/>
+<setting id="org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_annotation" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_member" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_header" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_block_comments" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_enum_constants" value="49"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_block" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_annotation_declaration_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_parenthesized_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_parenthesized_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_catch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_local_declarations" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_switch" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_increments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_assert" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_type_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_array_initializer" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_braces_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_for" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_catch" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_field_declarations" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_parameterized_type_reference" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_invocation_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.new_lines_at_javadoc_boundaries" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_after_imports" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_local_declarations" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_constant_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.never_indent_line_comments_on_first_column" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.never_indent_block_comments_on_first_column" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_then_statement_on_same_line" value="false"/>
+</profile>
+</profiles>
diff --git a/genomix/pom.xml b/genomix/pom.xml
new file mode 100644
index 0000000..299a208
--- /dev/null
+++ b/genomix/pom.xml
@@ -0,0 +1,201 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>genomix</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <packaging>pom</packaging>
+ <name>genomix</name>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <configuration>
+ <programs>
+ <program>
+ <mainClass>edu.uci.ics.genomix.Driver</mainClass>
+ <name>genomix</name>
+ </program>
+ </programs>
+ <repositoryLayout>flat</repositoryLayout>
+ <repositoryName>lib</repositoryName>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>assemble</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2-beta-5</version>
+ <executions>
+ <execution>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>attached</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.7.2</version>
+ <configuration>
+ <forkMode>pertest</forkMode>
+ <argLine>-enableassertions -Xmx512m -XX:MaxPermSize=300m
+ -Dfile.encoding=UTF-8
+ -Djava.util.logging.config.file=src/test/resources/logging.properties</argLine>
+ <includes>
+ <include>**/*TestSuite.java</include>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <configuration>
+ <filesets>
+ <fileset>
+ <directory>.</directory>
+ <includes>
+ <include>teststore*</include>
+ <include>edu*</include>
+ <include>actual*</include>
+ <include>build*</include>
+ <include>expect*</include>
+ <include>ClusterController*</include>
+ </includes>
+ </fileset>
+ </filesets>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <distributionManagement>
+ <repository>
+ <id>hyracks-releases</id>
+ <url>http://obelix.ics.uci.edu/nexus/content/repositories/hyracks-releases/</url>
+ </repository>
+ <snapshotRepository>
+ <id>hyracks-snapshots</id>
+ <url>http://obelix.ics.uci.edu/nexus/content/repositories/hyracks-snapshots/</url>
+ </snapshotRepository>
+ </distributionManagement>
+
+ <repositories>
+ <repository>
+ <id>hyracks-public</id>
+ <url>http://obelix.ics.uci.edu/nexus/content/groups/hyracks-public/</url>
+ </repository>
+ <repository>
+ <id>jboss-public</id>
+ <url>https://repository.jboss.org/nexus/content/groups/public/</url>
+ </repository>
+ </repositories>
+
+ <pluginRepositories>
+ <pluginRepository>
+ <id>hyracks-public</id>
+ <url>http://obelix.ics.uci.edu/nexus/content/groups/hyracks-public/</url>
+ <releases>
+ <updatePolicy>always</updatePolicy>
+ </releases>
+ </pluginRepository>
+ </pluginRepositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-std</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-common</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-data-std</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-cc</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-nc</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.kenai.nbpwr</groupId>
+ <artifactId>org-apache-commons-io</artifactId>
+ <version>1.3.1-201002241208</version>
+ <type>nbm</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks.examples</groupId>
+ <artifactId>hyracks-integration-tests</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-ipc</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/genomix/src/main/assembly/binary-assembly.xml b/genomix/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..0500499
--- /dev/null
+++ b/genomix/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,19 @@
+<assembly>
+ <id>binary-assembly</id>
+ <formats>
+ <format>zip</format>
+ <format>dir</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>target/appassembler/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>target/appassembler/lib</directory>
+ <outputDirectory>lib</outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java b/genomix/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..bb868ce
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
@@ -0,0 +1,47 @@
+package edu.uci.ics.genomix.data.normalizers;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
+
+public class Integer64NormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+ private static final long serialVersionUID = 8735044913496854551L;
+
+ @Override
+ public INormalizedKeyComputer createNormalizedKeyComputer() {
+ return new INormalizedKeyComputer() {
+ private static final int POSTIVE_LONG_MASK = (3 << 30);
+ private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
+ private static final int NEGATIVE_LONG_MASK = (0 << 30);
+
+ @Override
+ public int normalize(byte[] bytes, int start, int length) {
+ long value = Integer64SerializerDeserializer.getLong(bytes, start);
+ int highValue = (int) (value >> 32);
+ if (highValue > 0) {
+ /** * larger than Integer.MAX */
+ int highNmk = getKey(highValue);
+ highNmk >>= 2;
+ highNmk |= POSTIVE_LONG_MASK;
+ return highNmk;
+ } else if (highValue == 0) {
+ /** * smaller than Integer.MAX but >=0 */
+ int lowNmk = (int) value;
+ lowNmk >>= 2;
+ lowNmk |= NON_NEGATIVE_INT_MASK;
+ return lowNmk;
+ } else {
+ /** * less than 0: have not optimized for that */
+ int highNmk = getKey(highValue);
+ highNmk >>= 2;
+ highNmk |= NEGATIVE_LONG_MASK;
+ return highNmk;
+ }
+ }
+
+ private int getKey(int value) {
+ return value ^ Integer.MIN_VALUE;
+ }
+ };
+ }
+}
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java b/genomix/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
new file mode 100644
index 0000000..90bd12f
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.genomix.data.partition;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+
+public class KmerHashPartitioncomputerFactory implements ITuplePartitionComputerFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ public KmerHashPartitioncomputerFactory() {
+ }
+
+ @Override
+ public ITuplePartitionComputer createPartitioner() {
+ return new ITuplePartitionComputer() {
+ @Override
+ public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) {
+ if (nParts == 1) {
+ return 0;
+ }
+ int startOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldOffset = accessor.getFieldStartOffset(tIndex, 0);
+ int slotLength = accessor.getFieldSlotsLength();
+
+ ByteBuffer buf = accessor.getBuffer();
+ buf.position(startOffset + fieldOffset + slotLength);
+ long l = accessor.getBuffer().getLong();
+ return (int) (l % nParts);
+ }
+ };
+ }
+}
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/data/serde/ByteSerializerDeserializer.java b/genomix/src/main/java/edu/uci/ics/genomix/data/serde/ByteSerializerDeserializer.java
new file mode 100644
index 0000000..41dcb07
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/data/serde/ByteSerializerDeserializer.java
@@ -0,0 +1,45 @@
+package edu.uci.ics.genomix.data.serde;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class ByteSerializerDeserializer implements ISerializerDeserializer<Byte> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final ByteSerializerDeserializer INSTANCE = new ByteSerializerDeserializer();
+
+ private ByteSerializerDeserializer() {
+ }
+
+ @Override
+ public Byte deserialize(DataInput in) throws HyracksDataException {
+ try {
+ return in.readByte();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void serialize(Byte instance, DataOutput out) throws HyracksDataException {
+ try {
+ out.writeByte(instance.intValue());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public static byte getByte(byte[] bytes, int offset) {
+ return bytes[offset];
+ }
+
+ public static void putByte(byte val, byte[] bytes, int offset) {
+ bytes[offset] = val;
+ }
+
+}
\ No newline at end of file
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
new file mode 100644
index 0000000..e7a8e6c
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
@@ -0,0 +1,241 @@
+package edu.uci.ics.genomix.dataflow;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+public class FileScanDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private int k;
+
+ public FileScanDescriptor(IOperatorDescriptorRegistry spec, int k) {
+ super(spec, 0, 1);
+ // TODO Auto-generated constructor stub
+ this.k = k;
+ //recordDescriptors[0] = news RecordDescriptor(
+ // new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+ recordDescriptors[0] = new RecordDescriptor(new ISerializerDeserializer[] {
+ Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE });
+ }
+
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+
+ final int temp = partition;
+
+ // TODO Auto-generated method stub
+ return (IOperatorNodePushable) new AbstractUnaryOutputSourceOperatorNodePushable() {
+ private ArrayTupleBuilder tupleBuilder;
+ private ByteBuffer outputBuffer;
+ private FrameTupleAppender outputAppender;
+ private long window;
+
+ @Override
+ public void initialize() {
+
+ window = 0;
+ for (int i = 0; i < k; i++) {
+ window <<= 2;
+ window |= 3;
+ }
+
+ tupleBuilder = new ArrayTupleBuilder(2);
+ outputBuffer = ctx.allocateFrame();
+ outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+ outputAppender.reset(outputBuffer, true);
+ try {// one try with multiple catch?
+ //FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+ // FileScanDescriptor.class.getSimpleName());
+ //writer = new RunFileWriter(file, ctx.getIOManager());
+ writer.open();
+ // read the file
+ InputStream filenames;
+ /*File roots = new File("G:\\data");
+ for (File file : roots.listFiles())
+ System.out.println(file);
+ String s = "G:" + File.separator + "data"
+ + File.separator + "filename.txt";*/
+
+ String s = "g:\\data\\filename" + String.valueOf(temp) + ".txt";
+
+ filenames = new FileInputStream(s);
+ // filenames = new FileInputStream("filename.txt");
+
+ String line;
+ BufferedReader reader = new BufferedReader(new InputStreamReader(filenames));
+ line = reader.readLine();
+ while (line != null) {
+ BufferedReader readsfile = new BufferedReader(new InputStreamReader(new FileInputStream(line)));
+ String read = readsfile.readLine();
+ while (read != null) {
+ read = readsfile.readLine();
+ SplitReads(read.getBytes());
+ //read.getBytes();
+
+ read = readsfile.readLine();
+ read = readsfile.readLine();
+
+ read = readsfile.readLine();
+ }
+ line = reader.readLine();
+ readsfile.close();
+ }
+ reader.close();
+ filenames.close();
+ if (outputAppender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ }
+ outputAppender = null;
+ outputBuffer = null;
+ // sort code for external sort here?
+ writer.close();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private long CompressKmer(byte[] array, int start) {
+ // a: 00; c: 01; G: 10; T: 11
+ long l = 0;
+ for (int i = start; i < start + k; i++) {
+ l <<= 2;
+ switch (array[i]) {
+ case 'A':
+ case 'a':
+ l |= 0;
+ break;
+ case 'C':
+ case 'c':
+ l |= 1;
+ break;
+ case 'G':
+ case 'g':
+ l |= 2;
+ break;
+ case 'T':
+ case 't':
+ l |= 3;
+ break;
+ }
+ }
+ return l;
+ }
+
+ private byte GetBitmap(byte t) {
+ byte r = 0;
+ switch (t) {
+ case 'A':
+ case 'a':
+ r = 1;
+ break;
+ case 'C':
+ case 'c':
+ r = 2;
+ break;
+ case 'G':
+ case 'g':
+ r = 4;
+ break;
+ case 'T':
+ case 't':
+ r = 8;
+ break;
+ }
+ return r;
+ }
+
+ private byte ConvertSymbol(byte t) {
+ byte r = 0;
+ switch (t) {
+ case 'A':
+ case 'a':
+ r = 0;
+ break;
+ case 'C':
+ case 'c':
+ r = 1;
+ break;
+ case 'G':
+ case 'g':
+ r = 2;
+ break;
+ case 'T':
+ case 't':
+ r = 3;
+ break;
+ }
+ return r;
+ }
+
+ private void SplitReads(byte[] array) {
+ try {
+ long l = 0;
+
+ byte pre = 0, next = 0;
+ byte r;
+
+ for (int i = 0; i < array.length - k + 1; i++) {
+ if (0 == i) {
+ l = CompressKmer(array, i);
+ } else {
+ l <<= 2;
+ l &= window;
+ l |= ConvertSymbol(array[i + k - 1]);
+ pre = GetBitmap(array[i - 1]);
+ }
+ if (i + k != array.length) {
+ next = GetBitmap(array[i + k]);
+ }
+
+ r = 0;
+ r |= pre;
+ r <<= 4;
+ r |= next;
+
+ /*System.out.print(l);
+ System.out.print(' ');
+ System.out.print(r);
+ System.out.println();*/
+
+ tupleBuilder.reset();
+
+ tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE, l);
+ tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, r);
+
+ if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ outputAppender.reset(outputBuffer, true);
+ if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(),
+ 0, tupleBuilder.getSize())) {
+ throw new IllegalStateException(
+ "Failed to copy an record into a frame: the record size is too large.");
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ };
+
+ }
+}
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/GenKmerDescriptor.java b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/GenKmerDescriptor.java
new file mode 100644
index 0000000..abcb54e
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/GenKmerDescriptor.java
@@ -0,0 +1,159 @@
+package edu.uci.ics.genomix.dataflow;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import edu.uci.ics.genomix.data.normalizers.Integer64NormalizedKeyComputerFactory;
+import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.LongPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunMerger;
+import edu.uci.ics.hyracks.dataflow.std.sort.FrameSorter;
+
+public class GenKmerDescriptor extends AbstractOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private static final int SPLIT_ACTIVITY_ID = 0;
+ private static final int MERGE_ACTIVITY_ID = 1;
+ private final int framesLimit;
+ private final int k;
+
+ public GenKmerDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int k) {
+ super(spec, 1, 1);
+ this.framesLimit = framesLimit;
+ this.k = k;
+
+ // TODO Auto-generated constructor stub
+ recordDescriptors[0] = new RecordDescriptor(new ISerializerDeserializer[] {
+ Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
+ }
+
+ @Override
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ // TODO Auto-generated method stub
+ SplitActivity sa = new SplitActivity(new ActivityId(odId, SPLIT_ACTIVITY_ID));
+ MergeActivity ma = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
+ builder.addActivity(this, sa);
+ builder.addSourceEdge(0, sa, 0);
+
+ builder.addActivity(this, ma);
+ builder.addTargetEdge(0, ma, 0);
+
+ builder.addBlockingEdge(sa, ma);
+ }
+
+ private class SplitActivity extends AbstractActivityNode {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public SplitActivity(ActivityId activityID) {
+ super(activityID);
+ // TODO Auto-generated constructor stub
+ }
+
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+ throws HyracksDataException {
+ // TODO Auto-generated method stub
+ //IHyracksTaskContext ctx, int k, RecordDescriptor rd_in, int buffer_size
+ KmerSplitOperatorNodePushable op = new KmerSplitOperatorNodePushable(ctx, k, new RecordDescriptor(
+ new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE }), framesLimit,
+ new TaskId(this.id, partition));
+ return op;
+ }
+ }
+
+ public static class SplitTaskState extends AbstractStateObject {
+ List<IFrameReader> runs;
+
+ public SplitTaskState() {
+ }
+
+ public SplitTaskState(JobId jobId, TaskId taskId, List<IFrameReader> runs) {
+ super(jobId, taskId);
+ this.runs = runs;
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+
+ }
+ }
+
+ private class MergeActivity extends AbstractActivityNode {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public MergeActivity(ActivityId id) {
+ super(id);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+ throws HyracksDataException {
+ // TODO Auto-generated method stub
+ IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
+ @Override
+ public void initialize() throws HyracksDataException {
+ SplitTaskState state = (SplitTaskState) ctx.getStateObject(new TaskId(new ActivityId(
+ getOperatorId(), SPLIT_ACTIVITY_ID), partition));
+ //List<IFrameReader> runs = runs = new LinkedList<IFrameReader>();;
+
+ IBinaryComparator[] comparators = new IBinaryComparator[1];
+ IBinaryComparatorFactory cf = PointableBinaryComparatorFactory.of(LongPointable.FACTORY);
+ comparators[0] = cf.createBinaryComparator();
+
+ //int necessaryFrames = Math.min(runs.size() + 2, framesLimit);
+
+ FrameSorter frameSorter = new FrameSorter(
+ ctx,
+ new int[] { 0 },
+ new Integer64NormalizedKeyComputerFactory(),
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
+ recordDescriptors[0]);
+
+ ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, frameSorter, state.runs,
+ new int[] { 0 }, comparators, recordDescriptors[0], framesLimit, writer);
+ merger.process();
+ }
+ };
+ return op;
+ }
+ }
+}
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/KmerSplitOperatorNodePushable.java b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/KmerSplitOperatorNodePushable.java
new file mode 100644
index 0000000..1f6731d
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/KmerSplitOperatorNodePushable.java
@@ -0,0 +1,254 @@
+package edu.uci.ics.genomix.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import edu.uci.ics.genomix.data.normalizers.Integer64NormalizedKeyComputerFactory;
+import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.dataflow.GenKmerDescriptor.SplitTaskState;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.LongPointable;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+public class KmerSplitOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
+
+ private final int k;
+ private long window;
+
+ private final SplitFrame frameSorter;
+
+ private FrameTupleAccessor accessor;
+ private ArrayTupleBuilder tupleBuilder;
+ private TaskId MytaskId;
+ private IHyracksTaskContext ctx;
+
+ public KmerSplitOperatorNodePushable(IHyracksTaskContext ctx, int k, RecordDescriptor rd_in, int buffer_size,
+ TaskId taskid) {
+
+ tupleBuilder = new ArrayTupleBuilder(3);
+ this.k = k;
+
+ RecordDescriptor rd = new RecordDescriptor(new ISerializerDeserializer[] {
+ Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
+
+ int[] sortFields = { 0 };
+ frameSorter = new SplitFrame(ctx, sortFields, new Integer64NormalizedKeyComputerFactory(),
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) }, rd,
+ buffer_size);
+
+ accessor = new FrameTupleAccessor(ctx.getFrameSize(), rd_in);
+
+ new FrameTupleAccessor(ctx.getFrameSize(), rd);
+ new FrameTupleAppender(ctx.getFrameSize());
+
+ ByteBuffer.allocate(ctx.getFrameSize());
+
+ //initialize the window
+ window = 0;
+ for (int i = 0; i < k; i++) {
+ window <<= 2;
+ window |= 3;
+ }
+
+ MytaskId = taskid;
+ this.ctx = ctx;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ // TODO Auto-generated method stub
+ //writer.open();
+
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ ByteBuffer temp_buf = accessor.getBuffer();
+ for (int i = 0; i < tupleCount; i++) {
+ int tupleStartOffset = accessor.getTupleStartOffset(i);
+ int fieldStartOffset = accessor.getFieldStartOffset(i, 0);
+ int loadLength = accessor.getFieldLength(i, 0);
+ //int loadLength = temp_buf.getInt(tupleStartOffset);
+ byte[] read = new byte[loadLength];
+ int slotLength = accessor.getFieldSlotsLength();
+ //temp_buf.position(tupleStartOffset+fieldStartOffset + accessor.getFieldSlotsLength());
+ int pos = tupleStartOffset + fieldStartOffset + slotLength;
+ //temp_buf
+ try {
+ temp_buf.position(pos);
+ temp_buf.get(read, 0, loadLength);
+ SplitReads(read);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ tupleBuilder.reset();
+
+ }
+ }
+
+ private long CompressKmer(byte[] array, int start, int k) {
+ // a: 00; c: 01; G: 10; T: 11
+ long l = 0;
+ for (int i = start; i < start + k; i++) {
+ l <<= 2;
+ switch (array[start + i]) {
+ case 'A':
+ case 'a':
+ l |= 0;
+ break;
+ case 'C':
+ case 'c':
+ l |= 1;
+ break;
+ case 'G':
+ case 'g':
+ l |= 2;
+ break;
+ case 'T':
+ case 't':
+ l |= 3;
+ break;
+ }
+ }
+ return l;
+ }
+
+ private byte GetBitmap(byte t) {
+ byte r = 0;
+ switch (t) {
+ case 'A':
+ case 'a':
+ r = 1;
+ break;
+ case 'C':
+ case 'c':
+ r = 2;
+ break;
+ case 'G':
+ case 'g':
+ r = 4;
+ break;
+ case 'T':
+ case 't':
+ r = 8;
+ break;
+ }
+ return r;
+ }
+
+ private byte ConvertSymbol(byte t) {
+ byte r = 0;
+ switch (t) {
+ case 'A':
+ case 'a':
+ r = 0;
+ break;
+ case 'C':
+ case 'c':
+ r = 1;
+ break;
+ case 'G':
+ case 'g':
+ r = 2;
+ break;
+ case 'T':
+ case 't':
+ r = 3;
+ break;
+ }
+ return r;
+ }
+
+ private void SplitReads(byte[] array) {
+ try {
+ long l = 0;
+
+ byte pre = 0, next = 0;
+ byte r;
+
+ for (int i = 2; i < array.length - k + 1; i++) {
+ if (2 == i) {
+ l = CompressKmer(array, i, k);
+ } else {
+ l <<= 2;
+ l &= window;
+ l |= ConvertSymbol(array[i + k - 1]);
+ pre = GetBitmap(array[i - 1]);
+ }
+ if (i + k != array.length) {
+ next = GetBitmap(array[i + k]);
+ }
+
+ r = 0;
+ r |= pre;
+ r <<= 4;
+ r |= next;
+
+ /*System.out.print(l);
+ System.out.print(' ');
+ System.out.print(r);
+ System.out.println();*/
+
+ frameSorter.insertKmer(l, r);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+ try {
+ frameSorter.processLastFrame();
+ SplitTaskState state = new SplitTaskState(ctx.getJobletContext().getJobId(), MytaskId,
+ frameSorter.GetRuns());
+ ctx.setStateObject(state);
+
+ //writer.close();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ public List<IFrameReader> GetRuns() {
+ return frameSorter.GetRuns();
+ }
+
+ //for debug
+ /* private void DumpBlock(ByteBuffer f){
+
+ int n = f.array().length/13;
+
+ for(int i = 0 ; i < n ; i++){
+ long t = LongPointable.getLong(f.array(), 13 * i);
+ System.out.print(t);
+ System.out.print(' ');
+ }
+ System.out.println();
+ }*/
+}
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/SplitFrame.java b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/SplitFrame.java
new file mode 100644
index 0000000..179c983
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/SplitFrame.java
@@ -0,0 +1,484 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
+import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
+
+public class SplitFrame {
+
+ private static int HASH_SIZE = 4096;
+ private final SerializableHashTable table;
+ private final TuplePointer tempPointer;
+ private ArrayTupleBuilder tupleBuilder;
+ private final int buf_size;
+
+ private final IHyracksTaskContext ctx;
+ private final int[] sortFields;
+ private final INormalizedKeyComputer nkc;
+ private final IBinaryComparator[] comparators;
+ private final ByteBuffer[] buffers;
+
+ private final FrameTupleAccessor fta1;
+ private final FrameTupleAccessor fta2;
+
+ private final FrameTupleAppender appender;
+
+ private final ByteBuffer outFrame;
+
+ private int dataFrameCount;
+ private int[] tPointers;
+ private int tupleCount;
+ private final List<IFrameReader> runs;
+ private int flushCount;
+ private RecordDescriptor recordDescriptor;
+
+ private int FrameTupleCount;
+
+ public SplitFrame(IHyracksTaskContext ctx, int[] sortFields,
+ INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor, int buf_size) {
+ this.ctx = ctx;
+ this.sortFields = sortFields;
+ this.recordDescriptor = recordDescriptor;
+
+ nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
+ comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ outFrame = ctx.allocateFrame();
+ table = new SerializableHashTable(HASH_SIZE, ctx);
+ dataFrameCount = 0;
+
+ tempPointer = new TuplePointer();
+ tupleBuilder = new ArrayTupleBuilder(3);
+ this.buf_size = buf_size;
+ buffers = new ByteBuffer[buf_size];
+ for (int i = 0; i < buf_size; i++) {
+ buffers[i] = ByteBuffer.allocate(ctx.getFrameSize());
+ }
+ appender.reset(buffers[0], true);
+ flushCount = 0;
+ runs = new LinkedList<IFrameReader>();
+ FrameTupleCount = 0;
+ }
+
+ public void reset() {
+ dataFrameCount = 0;
+ tupleCount = 0;
+ appender.reset(buffers[0], true);
+ }
+
+ public int getFrameCount() {
+ return dataFrameCount;
+ }
+
+ private void SearchHashTable(long entry, TuplePointer dataPointer) {
+ int offset = 0;
+ int tp = (int) (entry % HASH_SIZE);
+ if (tp < 0) {
+ tp = -tp;
+ }
+ do {
+ table.getTuplePointer(tp, offset, dataPointer);// what is the offset mean?
+ if (dataPointer.frameIndex < 0 || dataPointer.tupleIndex < 0)
+ break;
+ int bIndex = dataPointer.frameIndex;
+ int tIndex = dataPointer.tupleIndex;
+ fta1.reset(buffers[bIndex]);
+
+ /* System.out.print("a:");
+ System.out.print(tIndex);
+ System.out.print(" b");
+ System.out.print(fta1.getTupleCount());
+ System.out.println();*/
+
+ int tupleOffset = fta1.getTupleStartOffset(tIndex);
+ int fieldOffset = fta1.getFieldStartOffset(tIndex, 0);
+ int slotLength = fta1.getFieldSlotsLength();
+ int pos = tupleOffset + fieldOffset + slotLength;
+ long l = buffers[bIndex].getLong(pos);
+ if (l == entry) {
+ break;
+ }
+ offset += 1;
+ } while (true);
+ }
+
+ private void InsertHashTable(long entry, int frame_id, int tuple_id) {
+
+ tempPointer.frameIndex = frame_id;
+ tempPointer.tupleIndex = tuple_id;
+
+ //System.out.print(frame_id);
+ //System.out.print(' ');
+ //System.out.println(tuple_id);
+
+ int tp = (int) (entry % HASH_SIZE);
+ if (tp < 0) {
+ tp = -tp;
+ }
+ table.insert(tp, tempPointer);
+
+ }
+
+ public void insertKmer(long l, byte r) {
+ try {
+ SearchHashTable(l, tempPointer);
+ if (tempPointer.frameIndex != -1 && tempPointer.tupleIndex != -1) {
+ fta1.reset(buffers[tempPointer.frameIndex]);
+ int tStart = fta1.getTupleStartOffset(tempPointer.tupleIndex);
+ int f0StartRel = fta1.getFieldStartOffset(tempPointer.tupleIndex, 1);
+ int slotLength = fta1.getFieldSlotsLength();
+ int pos = f0StartRel + tStart + slotLength;
+
+ buffers[tempPointer.frameIndex].array()[pos] |= r;
+ buffers[tempPointer.frameIndex].position(pos + 1);
+ int temp_int = buffers[tempPointer.frameIndex].getInt();
+ temp_int += 1;
+ buffers[tempPointer.frameIndex].position(pos + 1);
+ buffers[tempPointer.frameIndex].putInt(temp_int);
+ } else {
+ tupleBuilder.reset();
+ tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE, l);
+ tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, r);
+ tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, 1);
+
+ /*System.out.print(l);
+ System.out.print(' ');
+ System.out.print(r);
+ System.out.println();*/
+ boolean b = appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize());
+
+ if (!b) {
+ dataFrameCount++;
+ FrameTupleCount = 0;
+ if (dataFrameCount < buf_size) {
+ appender.reset(buffers[dataFrameCount], true);
+ } else {
+ sortFrames();
+ FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+ ExternalSortRunGenerator.class.getSimpleName());
+ RunFileWriter writer = new RunFileWriter(file, ctx.getIOManager());
+ writer.open();
+ try {
+ flushCount += 1;
+ flushFrames(writer);
+ } finally {
+ writer.close();
+ }
+ runs.add(writer.createReader());
+ dataFrameCount = 0;
+ appender.reset(buffers[dataFrameCount], true);
+ }
+ boolean tb = appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize());
+ if (!tb) {
+ throw new HyracksDataException(
+ "Failed to copy an record into a frame: the record size is too large");
+ }
+ }
+ InsertHashTable(l, dataFrameCount, FrameTupleCount);
+ FrameTupleCount += 1;
+ }
+ } catch (HyracksDataException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ public void sortFrames() {
+ int nBuffers = dataFrameCount;
+ tupleCount = 0;
+ for (int i = 0; i < nBuffers; ++i) {
+ fta1.reset(buffers[i]);
+ tupleCount += fta1.getTupleCount();
+ }
+ int sfIdx = sortFields[0];
+ tPointers = tPointers == null || tPointers.length < tupleCount * 4 ? new int[tupleCount * 4] : tPointers;
+ int ptr = 0;
+ for (int i = 0; i < nBuffers; ++i) {
+ fta1.reset(buffers[i]);
+ int tCount = fta1.getTupleCount();
+ byte[] array = fta1.getBuffer().array();
+ for (int j = 0; j < tCount; ++j) {
+ int tStart = fta1.getTupleStartOffset(j);
+ int tEnd = fta1.getTupleEndOffset(j);
+ tPointers[ptr * 4] = i;
+ tPointers[ptr * 4 + 1] = tStart;
+ tPointers[ptr * 4 + 2] = tEnd;
+ int f0StartRel = fta1.getFieldStartOffset(j, sfIdx);
+ int f0EndRel = fta1.getFieldEndOffset(j, sfIdx);
+ int f0Start = f0StartRel + tStart + fta1.getFieldSlotsLength();
+ tPointers[ptr * 4 + 3] = nkc == null ? 0 : nkc.normalize(array, f0Start, f0EndRel - f0StartRel);
+ ++ptr;
+ }
+ }
+ if (tupleCount > 0) {
+ sort(tPointers, 0, tupleCount);
+ }
+
+ DumpAllBuffers();
+ //point the pointer to the first one
+ dataFrameCount = 0;
+ }
+
+ public void flushFrames(IFrameWriter writer) throws HyracksDataException {
+ appender.reset(outFrame, true);
+ for (int ptr = 0; ptr < tupleCount; ++ptr) {
+ int i = tPointers[ptr * 4];
+ int tStart = tPointers[ptr * 4 + 1];
+ int tEnd = tPointers[ptr * 4 + 2];
+ ByteBuffer buffer = buffers[i];
+ fta1.reset(buffer);
+ if (!appender.append(fta1, tStart, tEnd)) {
+ FrameUtils.flushFrame(outFrame, writer);
+ appender.reset(outFrame, true);
+ if (!appender.append(fta1, tStart, tEnd)) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outFrame, writer);
+ }
+ }
+
+ private void sort(int[] tPointers, int offset, int length) {
+ int m = offset + (length >> 1);
+ int mi = tPointers[m * 4];
+ int mj = tPointers[m * 4 + 1];
+ int mv = tPointers[m * 4 + 3];
+
+ int a = offset;
+ int b = a;
+ int c = offset + length - 1;
+ int d = c;
+ while (true) {
+ while (b <= c) {
+ int cmp = compare(tPointers, b, mi, mj, mv);
+ if (cmp > 0) {
+ break;
+ }
+ if (cmp == 0) {
+ swap(tPointers, a++, b);
+ }
+ ++b;
+ }
+ while (c >= b) {
+ int cmp = compare(tPointers, c, mi, mj, mv);
+ if (cmp < 0) {
+ break;
+ }
+ if (cmp == 0) {
+ swap(tPointers, c, d--);
+ }
+ --c;
+ }
+ if (b > c)
+ break;
+ swap(tPointers, b++, c--);
+ }
+
+ int s;
+ int n = offset + length;
+ s = Math.min(a - offset, b - a);
+ vecswap(tPointers, offset, b - s, s);
+ s = Math.min(d - c, n - d - 1);
+ vecswap(tPointers, b, n - s, s);
+
+ if ((s = b - a) > 1) {
+ sort(tPointers, offset, s);
+ }
+ if ((s = d - c) > 1) {
+ sort(tPointers, n - s, s);
+ }
+ }
+
+ private void swap(int x[], int a, int b) {
+ for (int i = 0; i < 4; ++i) {
+ int t = x[a * 4 + i];
+ x[a * 4 + i] = x[b * 4 + i];
+ x[b * 4 + i] = t;
+ }
+ }
+
+ private void vecswap(int x[], int a, int b, int n) {
+ for (int i = 0; i < n; i++, a++, b++) {
+ swap(x, a, b);
+ }
+ }
+
+ private int compare(int[] tPointers, int tp1, int tp2i, int tp2j, int tp2v) {
+ int i1 = tPointers[tp1 * 4];
+ int j1 = tPointers[tp1 * 4 + 1];
+ int v1 = tPointers[tp1 * 4 + 3];
+ if (v1 != tp2v) {
+ return ((((long) v1) & 0xffffffffL) < (((long) tp2v) & 0xffffffffL)) ? -1 : 1;
+ }
+ int i2 = tp2i;
+ int j2 = tp2j;
+ ByteBuffer buf1 = buffers[i1];
+ ByteBuffer buf2 = buffers[i2];
+ byte[] b1 = buf1.array();
+ byte[] b2 = buf2.array();
+ fta1.reset(buf1);
+ fta2.reset(buf2);
+ for (int f = 0; f < comparators.length; ++f) {
+ int fIdx = sortFields[f];
+ int f1Start = fIdx == 0 ? 0 : buf1.getInt(j1 + (fIdx - 1) * 4);
+ int f1End = buf1.getInt(j1 + fIdx * 4);
+ int s1 = j1 + fta1.getFieldSlotsLength() + f1Start;
+ int l1 = f1End - f1Start;
+ int f2Start = fIdx == 0 ? 0 : buf2.getInt(j2 + (fIdx - 1) * 4);
+ int f2End = buf2.getInt(j2 + fIdx * 4);
+ int s2 = j2 + fta2.getFieldSlotsLength() + f2Start;
+ int l2 = f2End - f2Start;
+ int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+
+ public void close() {
+ //this.buffers.clear();
+ }
+
+ public int getFlushCount() {
+ return flushCount;
+ }
+
+ /*public void AddRuns(RunFileWriter r){
+ try {
+ runs.add(r.createReader());
+ } catch (HyracksDataException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }*/
+
+ public List<IFrameReader> GetRuns() {
+ return runs;
+ }
+
+ private void DumpAllBuffers() {
+ FrameTupleAccessor tfa = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+
+ for (int i = 0; i < dataFrameCount; i++) {
+ tfa.reset(buffers[i]);
+ int t = tfa.getTupleCount();
+ for (int j = 0; j < t; j++) {
+ int tupleOffset = tfa.getTupleStartOffset(j);
+
+ int r = tfa.getFieldStartOffset(j, 0);
+ int pos = tupleOffset + r + tfa.getFieldSlotsLength();
+ long l = buffers[i].getLong(pos);
+ System.out.print(l);
+ System.out.print(' ');
+
+ r = tfa.getFieldStartOffset(j, 1);
+ pos = tupleOffset + r + tfa.getFieldSlotsLength();
+ byte b = buffers[i].array()[pos];
+ System.out.print(b);
+ System.out.print(' ');
+
+ r = tfa.getFieldStartOffset(j, 2);
+ pos = tupleOffset + r + tfa.getFieldSlotsLength();
+ int o = buffers[i].getInt(pos);
+ System.out.print(o);
+ System.out.print(' ');
+
+ System.out.println();
+ }
+ }
+ System.out.println("---------------------------------");
+ }
+
+ //functions for dubugging
+ // private void DumpBuffer(byte[] f) {
+ // int n = f.length;
+ //
+ // int count = 0;
+ // for (int i = 0; i < n; i++) {
+ // if (i % 13 == 0) {
+ // if (count != 0) {
+ // System.out.print(")(");
+ // } else {
+ // System.out.print("(");
+ // }
+ // System.out.print(count);
+ // System.out.print(':');
+ // count += 1;
+ // }
+ // System.out.print(f[i]);
+ // System.out.print(' ');
+ // }
+ // System.out.println(')');
+ // }
+
+ public void processLastFrame() {
+ sortFrames();
+ FileReference file;
+
+ DumpAllBuffers();
+ try {
+ file = ctx.getJobletContext().createManagedWorkspaceFile(ExternalSortRunGenerator.class.getSimpleName());
+ RunFileWriter writer = new RunFileWriter(file, ctx.getIOManager());
+ writer.open();
+ try {
+ flushCount += 1;
+ flushFrames(writer);
+ } finally {
+ writer.close();
+ }
+ runs.add(writer.createReader());
+ } catch (HyracksDataException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ //frameSorter.AddRuns((RunFileWriter) writer);
+
+ }
+}
\ No newline at end of file
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
new file mode 100644
index 0000000..eeeecea
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -0,0 +1,460 @@
+package edu.uci.ics.genomix.dataflow;
+
+import java.io.File;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.genomix.data.normalizers.Integer64NormalizedKeyComputerFactory;
+import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;
+import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;
+import edu.uci.ics.genomix.dataflow.aggregators.MergeKmerAggregateFactory;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.LongPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
+
+public class Tester {
+
+ private static final Logger LOGGER = Logger.getLogger(Tester.class.getName());
+ public static final String NC1_ID = "nc1";
+ public static final String NC2_ID = "nc2";
+
+ private static ClusterControllerService cc;
+ private static NodeControllerService nc1;
+ private static NodeControllerService nc2;
+ private static IHyracksClientConnection hcc;
+
+ //private static final boolean DEBUG = true;
+
+ public static void main(String[] args) throws Exception {
+
+ LOGGER.setLevel(Level.OFF);
+
+ init();
+
+ // Options options = new Options();
+
+ IHyracksClientConnection hcc = new HyracksConnection("127.0.0.1", 39000);
+
+ /*
+ * JobSpecification job =
+ * createJob(parseFileSplits(options.inFileCustomerSplits),
+ * parseFileSplits(options.inFileOrderSplits),
+ * parseFileSplits(options.outFileSplits), options.numJoinPartitions,
+ * options.algo, options.graceInputSize, options.graceRecordsPerFrame,
+ * options.graceFactor, options.memSize, options.tableSize,
+ * options.hasGroupBy);
+ */
+
+ int k, page_num;
+ String file_name = args[0];
+ k = Integer.parseInt(args[1]);
+ page_num = Integer.parseInt(args[2]);
+
+ JobSpecification job = createJob(file_name, k, page_num);
+
+ long start = System.currentTimeMillis();
+ JobId jobId = hcc.startJob("test", job);
+ hcc.waitForCompletion(jobId);
+ long end = System.currentTimeMillis();
+ System.err.println(start + " " + end + " " + (end - start));
+ }
+
+ // private static FileSplit[] parseFileSplits(String fileSplits) {
+ // String[] splits = fileSplits.split(",");
+ // FileSplit[] fSplits = new FileSplit[splits.length];
+ // for (int i = 0; i < splits.length; ++i) {
+ // String s = splits[i].trim();
+ // int idx = s.indexOf(':');
+ // if (idx < 0) {
+ // throw new IllegalArgumentException("File split " + s + " not well formed");
+ // }
+ // fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
+ // }
+ // return fSplits;
+ // }
+
+ public static void init() throws Exception {
+ CCConfig ccConfig = new CCConfig();
+ ccConfig.clientNetIpAddress = "127.0.0.1";
+ ccConfig.clientNetPort = 39000;
+ ccConfig.clusterNetIpAddress = "127.0.0.1";
+ ccConfig.clusterNetPort = 39001;
+ ccConfig.profileDumpPeriod = 10000;
+ File outDir = new File("target/ClusterController");
+ outDir.mkdirs();
+ File ccRoot = File.createTempFile(Tester.class.getName(), ".data", outDir);
+ ccRoot.delete();
+ ccRoot.mkdir();
+ ccConfig.ccRoot = ccRoot.getAbsolutePath();
+ cc = new ClusterControllerService(ccConfig);
+ cc.start();
+
+ NCConfig ncConfig1 = new NCConfig();
+ ncConfig1.ccHost = "localhost";
+ ncConfig1.ccPort = 39001;
+ ncConfig1.clusterNetIPAddress = "127.0.0.1";
+ ncConfig1.dataIPAddress = "127.0.0.1";
+ ncConfig1.nodeId = NC1_ID;
+ nc1 = new NodeControllerService(ncConfig1);
+ nc1.start();
+
+ NCConfig ncConfig2 = new NCConfig();
+ ncConfig2.ccHost = "localhost";
+ ncConfig2.ccPort = 39001;
+ ncConfig2.clusterNetIPAddress = "127.0.0.1";
+ ncConfig2.dataIPAddress = "127.0.0.1";
+ ncConfig2.nodeId = NC2_ID;
+ nc2 = new NodeControllerService(ncConfig2);
+ nc2.start();
+
+ hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
+ hcc.createApplication("test", null);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
+ }
+ }
+
+ private static JobSpecification createJob(String filename, int k, int page_num) throws HyracksDataException {
+ JobSpecification spec = new JobSpecification();
+
+ /*
+ * IFileSplitProvider custSplitsProvider = new
+ * ConstantFileSplitProvider(customerSplits);
+ * //ConstantFileSplitProvider is the operator to provide the tuples
+ * RecordDescriptor custDesc = new RecordDescriptor(new
+ * ISerializerDeserializer[] {
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE });
+ *
+ * IFileSplitProvider ordersSplitsProvider = new
+ * ConstantFileSplitProvider(orderSplits); RecordDescriptor ordersDesc =
+ * new RecordDescriptor(new ISerializerDeserializer[] {
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE });
+ *
+ * RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new
+ * ISerializerDeserializer[] {
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * UTF8StringSerializerDeserializer.INSTANCE });
+ *
+ * FileScanOperatorDescriptor ordScanner = new
+ * FileScanOperatorDescriptor(spec, ordersSplitsProvider, new
+ * DelimitedDataTupleParserFactory(new IValueParserFactory[] {
+ * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ * UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ * createPartitionConstraint(spec, ordScanner, orderSplits);
+ *
+ * FileScanOperatorDescriptor custScanner = new
+ * FileScanOperatorDescriptor(spec, custSplitsProvider, new
+ * DelimitedDataTupleParserFactory(new IValueParserFactory[] {
+ * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE },
+ * '|'), custDesc); createPartitionConstraint(spec, custScanner,
+ * customerSplits);
+ *
+ * IOperatorDescriptor join;
+ *
+ * if ("nestedloop".equalsIgnoreCase(algo)) { join = new
+ * NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
+ * PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0,
+ * 1), custOrderJoinDesc, memSize);
+ *
+ * } else if ("gracehash".equalsIgnoreCase(algo)) { join = new
+ * GraceHashJoinOperatorDescriptor( spec, memSize, graceInputSize,
+ * graceRecordsPerFrame, graceFactor, new int[] { 0 }, new int[] { 1 },
+ * new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ * .of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] {
+ * PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ * custOrderJoinDesc);
+ *
+ * } else if ("hybridhash".equalsIgnoreCase(algo)) { join = new
+ * HybridHashJoinOperatorDescriptor( spec, memSize, graceInputSize,
+ * graceRecordsPerFrame, graceFactor, new int[] { 0 }, new int[] { 1 },
+ * new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ * .of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] {
+ * PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ * custOrderJoinDesc);
+ *
+ * } else { join = new InMemoryHashJoinOperatorDescriptor( spec, new
+ * int[] { 0 }, new int[] { 1 }, new IBinaryHashFunctionFactory[] {
+ * PointableBinaryHashFunctionFactory .of(UTF8StringPointable.FACTORY)
+ * }, new IBinaryComparatorFactory[] {
+ * PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ * custOrderJoinDesc, 6000000); }
+ *
+ * PartitionConstraintHelper.addPartitionCountConstraint(spec, join,
+ * numJoinPartitions);
+ *
+ * IConnectorDescriptor ordJoinConn = new
+ * MToNPartitioningConnectorDescriptor(spec, new
+ * FieldHashPartitionComputerFactory(new int[] { 1 }, new
+ * IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ * .of(UTF8StringPointable.FACTORY) })); spec.connect(ordJoinConn,
+ * ordScanner, 0, join, 1);
+ *
+ * IConnectorDescriptor custJoinConn = new
+ * MToNPartitioningConnectorDescriptor(spec, new
+ * FieldHashPartitionComputerFactory(new int[] { 0 }, new
+ * IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ * .of(UTF8StringPointable.FACTORY) })); spec.connect(custJoinConn,
+ * custScanner, 0, join, 0);
+ *
+ * IOperatorDescriptor endingOp = join;
+ *
+ * if (hasGroupBy) {
+ *
+ * RecordDescriptor groupResultDesc = new RecordDescriptor(new
+ * ISerializerDeserializer[] {
+ * UTF8StringSerializerDeserializer.INSTANCE,
+ * IntegerSerializerDeserializer.INSTANCE });
+ *
+ * HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(
+ * spec, new int[] { 6 }, new FieldHashPartitionComputerFactory(new
+ * int[] { 6 }, new IBinaryHashFunctionFactory[] {
+ * PointableBinaryHashFunctionFactory .of(UTF8StringPointable.FACTORY)
+ * }), new IBinaryComparatorFactory[] {
+ * PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ * new MultiFieldsAggregatorFactory( new
+ * IFieldAggregateDescriptorFactory[] { new
+ * CountFieldAggregatorFactory(true) }), groupResultDesc, 16);
+ * createPartitionConstraint(spec, gby, resultSplits);
+ *
+ * IConnectorDescriptor joinGroupConn = new
+ * MToNPartitioningConnectorDescriptor(spec, new
+ * FieldHashPartitionComputerFactory(new int[] { 6 }, new
+ * IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ * .of(UTF8StringPointable.FACTORY) })); spec.connect(joinGroupConn,
+ * join, 0, gby, 0);
+ *
+ * endingOp = gby; }
+ */
+
+ // IOperatorDescriptor printer = DEBUG ? new
+ // PrinterOperatorDescriptor(spec)
+ // : new NullSinkOperatorDescriptor(spec);
+ // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ // printer, NC1_ID);
+
+ spec.setFrameSize(128);
+
+ FileScanDescriptor scan = new FileScanDescriptor(spec, k);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 0 };
+ int frameLimits = 10;
+ int tableSize = 128;
+
+ ExternalGroupOperatorDescriptor single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
+ new Integer64NormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
+
+ new MergeKmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
+ outputRec, new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(LongPointable.FACTORY) }), tableSize), true);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID, NC2_ID);
+
+ IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(readfileConn, scan, 0, single_grouper, 0);
+
+ IConnectorDescriptor conn_partition = new MToNPartitioningConnectorDescriptor(spec,
+ new KmerHashPartitioncomputerFactory());
+
+ ExternalGroupOperatorDescriptor cross_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
+ new Integer64NormalizedKeyComputerFactory(), new DistributedMergeLmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
+
+ new DistributedMergeLmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
+ outputRec, new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(LongPointable.FACTORY) }), tableSize), true);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper, NC1_ID, NC2_ID);
+ spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);
+
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID, NC2_ID);
+
+ IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(printConn, cross_grouper, 0, printer, 0);
+
+ //spec.connect(printConn, cross_grouper, 0, printer, 0);
+
+ /*
+ * GenKmerDescriptor kmerGen = new GenKmerDescriptor(spec, page_num, k);
+ * PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ * kmerGen, NC1_ID);
+ *
+ * IConnectorDescriptor readfileConn = new
+ * OneToOneConnectorDescriptor(spec); spec.connect(readfileConn, scan,
+ * 0, kmerGen, 0);
+ *
+ * PrinterOperatorDescriptor printer = new
+ * PrinterOperatorDescriptor(spec);
+ * PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ * printer, NC1_ID);
+ *
+ * IConnectorDescriptor printConn = new
+ * OneToOneConnectorDescriptor(spec); //spec.connect(printConn, scan, 0,
+ * printer, 0);
+ *
+ * //IConnectorDescriptor printConn = new
+ * OneToOneConnectorDescriptor(spec); spec.connect(printConn, kmerGen,
+ * 0, printer, 0);
+ *
+ * //IFileSplitProvider outSplitProvider = new
+ * ConstantFileSplitProvider(resultSplits);
+ * //FrameFileWriterOperatorDescriptor writer = new
+ * FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+ * //createPartitionConstraint(spec, writer, resultSplits);
+ *
+ * //IConnectorDescriptor endingPrinterConn = new
+ * OneToOneConnectorDescriptor(spec); //spec.connect(endingPrinterConn,
+ * endingOp, 0, writer, 0);
+ */
+
+ spec.addRoot(printer);
+
+ // System.out.println(spec.toString());
+ return spec;
+ }
+
+ // private static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
+ // String[] parts = new String[splits.length];
+ // for (int i = 0; i < splits.length; ++i) {
+ // parts[i] = splits[i].getNodeName();
+ // }
+ // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);
+ // }
+
+ static class JoinComparatorFactory implements ITuplePairComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ private final IBinaryComparatorFactory bFactory;
+ private final int pos0;
+ private final int pos1;
+
+ public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {
+ this.bFactory = bFactory;
+ this.pos0 = pos0;
+ this.pos1 = pos1;
+ }
+
+ @Override
+ public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
+ return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
+ }
+ }
+
+ static class JoinComparator implements ITuplePairComparator {
+
+ private final IBinaryComparator bComparator;
+ private final int field0;
+ private final int field1;
+
+ public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {
+ this.bComparator = bComparator;
+ this.field0 = field0;
+ this.field1 = field1;
+ }
+
+ @Override
+ public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) {
+ int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+ int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+ int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+ int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+ int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);
+ int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);
+ int fLen0 = fEnd0 - fStart0;
+
+ int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);
+ int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
+ int fLen1 = fEnd1 - fStart1;
+
+ int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
+ .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+ if (c != 0) {
+ return c;
+ }
+ return 0;
+ }
+ }
+
+}
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
new file mode 100644
index 0000000..2c70a6e
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
@@ -0,0 +1,160 @@
+package edu.uci.ics.genomix.dataflow.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class DistributedMergeLmerAggregateFactory implements IAggregatorDescriptorFactory {
+ private static final long serialVersionUID = 1L;
+
+ public DistributedMergeLmerAggregateFactory() {
+ }
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ return new IAggregatorDescriptor() {
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public AggregateState createAggregateStates() {
+ // TODO Auto-generated method stub
+ return new AggregateState(new Object() {
+ });
+ }
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ byte bitmap = 0;
+ int count = 0;
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
+ bitmap |= ByteSerializerDeserializer.getByte(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
+
+ tupleOffset = accessor.getTupleStartOffset(tIndex);
+ fieldStart = accessor.getFieldStartOffset(tIndex, 2);
+ int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+
+ count += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), offset);
+
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeByte(bitmap);
+ tupleBuilder.addFieldEndOffset();
+ fieldOutput.writeInt(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when initializing the aggregator.");
+ }
+
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ byte bitmap = 0;
+ int count = 0;
+
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
+ int offset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
+ bitmap |= ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);
+
+ tupleOffset = accessor.getTupleStartOffset(tIndex);
+ fieldStart = accessor.getFieldStartOffset(tIndex, 2);
+ offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+ count += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), offset);
+
+ int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
+ int statefieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
+ int stateoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + statefieldStart;
+
+ byte[] data = stateAccessor.getBuffer().array();
+
+ ByteBuffer buf = ByteBuffer.wrap(data);
+ bitmap |= buf.getChar(stateoffset);
+ count += buf.getInt(stateoffset + 1);
+ buf.put(stateoffset, bitmap);
+ buf.putInt(stateoffset + 1, count);
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ byte bitmap;
+ int count;
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ byte[] data = accessor.getBuffer().array();
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldOffset = accessor.getFieldStartOffset(tIndex, 1);
+
+ int offset = fieldOffset + accessor.getFieldSlotsLength() + tupleOffset;
+ bitmap = ByteSerializerDeserializer.getByte(data, offset);
+
+ count = IntegerSerializerDeserializer.getInt(data, offset + 1);
+ try {
+ fieldOutput.writeByte(bitmap);
+ tupleBuilder.addFieldEndOffset();
+ fieldOutput.writeInt(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
+
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ byte bitmap;
+ int count;
+
+ byte[] data = accessor.getBuffer().array();
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldOffset = accessor.getFieldStartOffset(tIndex, 1);
+ int offset = tupleOffset + accessor.getFieldSlotsLength() + fieldOffset;
+
+ bitmap = ByteSerializerDeserializer.getByte(data, offset);
+ count = IntegerSerializerDeserializer.getInt(data, offset + 1);
+
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeByte(bitmap);
+ tupleBuilder.addFieldEndOffset();
+ fieldOutput.writeInt(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
+ }
+
+ };
+ }
+
+}
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
new file mode 100644
index 0000000..b6d3492
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -0,0 +1,151 @@
+package edu.uci.ics.genomix.dataflow.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {
+ private static final long serialVersionUID = 1L;
+
+ public MergeKmerAggregateFactory() {
+ }
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ return new IAggregatorDescriptor() {
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public AggregateState createAggregateStates() {
+ // TODO Auto-generated method stub
+ return new AggregateState(new Object() {
+ });
+ }
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ byte bitmap = 0;
+ int count = 0;
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
+ bitmap |= ByteSerializerDeserializer.getByte(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
+ count += 1;
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeByte(bitmap);
+ tupleBuilder.addFieldEndOffset();
+ fieldOutput.writeInt(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when initializing the aggregator.");
+ }
+
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ byte bitmap = 0;
+ int count = 0;
+
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
+ int offset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
+
+ bitmap |= ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);
+ count += 1;
+
+ int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
+ int statefieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
+ int stateoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + statefieldStart;
+
+ byte[] data = stateAccessor.getBuffer().array();
+
+ ByteBuffer buf = ByteBuffer.wrap(data);
+ bitmap |= buf.getChar(stateoffset);
+ count += buf.getInt(stateoffset + 1);
+ buf.put(stateoffset, bitmap);
+ buf.putInt(stateoffset + 1, count);
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ byte bitmap;
+ int count;
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ byte[] data = accessor.getBuffer().array();
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldOffset = accessor.getFieldStartOffset(tIndex, 1);
+
+ int offset = fieldOffset + accessor.getFieldSlotsLength() + tupleOffset;
+ bitmap = ByteSerializerDeserializer.getByte(data, offset);
+
+ count = IntegerSerializerDeserializer.getInt(data, offset + 1);
+ try {
+ fieldOutput.writeByte(bitmap);
+ tupleBuilder.addFieldEndOffset();
+ fieldOutput.writeInt(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
+
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ byte bitmap;
+ int count;
+
+ byte[] data = accessor.getBuffer().array();
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldOffset = accessor.getFieldStartOffset(tIndex, 1);
+ int offset = tupleOffset + accessor.getFieldSlotsLength() + fieldOffset;
+
+ bitmap = ByteSerializerDeserializer.getByte(data, offset);
+ count = IntegerSerializerDeserializer.getInt(data, offset + 1);
+
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeByte(bitmap);
+ tupleBuilder.addFieldEndOffset();
+ fieldOutput.writeInt(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
+ }
+
+ };
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index f9ec81c..39a574a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,4 +1,4 @@
-
+<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>edu.uci.ics.hyracks</groupId>
@@ -67,5 +67,6 @@
<module>hyracks</module>
<module>algebricks</module>
<module>pregelix</module>
+ <module>genomix</module>
</modules>
-</project>
+</project>
\ No newline at end of file