initial checkin of the genomix project, from Hongzhi's code

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2652 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/HyracksCodeFormatProfile.xml b/genomix/HyracksCodeFormatProfile.xml
new file mode 100644
index 0000000..2cde66d
--- /dev/null
+++ b/genomix/HyracksCodeFormatProfile.xml
@@ -0,0 +1,279 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<profiles version="11">
+<profile kind="CodeFormatterProfile" name="HyracksCodeFormatProfile" version="11">
+<setting id="org.eclipse.jdt.core.formatter.comment.insert_new_line_before_root_tags" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.disabling_tag" value="@formatter:off"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_annotation" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_anonymous_type_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_case" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_brace_in_array_initializer" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.new_lines_at_block_boundaries" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_annotation_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_closing_brace_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_field" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_while" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.use_on_off_tags" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_annotation_type_member_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_else_in_if_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_prefix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_else_statement_on_same_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_ellipsis" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.insert_new_line_for_parameter" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_annotation_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_breaks_compare_to_cases" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_multiple_fields" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_expressions_in_array_initializer" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_conditional_expression" value="80"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_binary_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_question_in_wildcard" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_array_initializer" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_finally_in_try_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_local_variable" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_catch_in_try_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_while" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_after_package" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.continuation_indentation" value="2"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_postfix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_method_invocation" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_superinterfaces" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_new_chunk" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_binary_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_package" value="0"/>
+<setting id="org.eclipse.jdt.core.compiler.source" value="1.5"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_constant_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_line_comments" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_declarations" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.join_wrapped_lines" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_block" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_call" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_invocation_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_member_type" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.align_type_members_on_columns" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_for" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_method_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_selector_in_method_invocation" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_switch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_unary_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_case" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.indent_parameter_description" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_switch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_block_comment" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.lineSplit" value="120"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_if" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_brackets_in_array_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_parenthesized_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_explicitconstructorcall_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_constructor_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_first_class_body_declaration" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.indentation.size" value="4"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.enabling_tag" value="@formatter:on"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superclass_in_type_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_assignment" value="0"/>
+<setting id="org.eclipse.jdt.core.compiler.problem.assertIdentifier" value="error"/>
+<setting id="org.eclipse.jdt.core.formatter.tabulation.char" value="space"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_prefix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_statements_compare_to_body" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_method" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_outer_expressions_when_nested" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.format_guardian_clause_on_one_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_cast" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_parameters_in_constructor_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_labeled_statement" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_annotation_type_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_method_body" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_method_declaration" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_constant" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation_type_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_throws" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_if" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_switch" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_throws" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_return" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_question_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_question_in_wildcard" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_throw" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.compiler.problem.enumIdentifier" value="error"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_switch" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_ellipsis" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_block" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_inits" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_method_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.compact_else_if" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_increments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.format_line_comment_starting_on_first_column" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_enum_constant" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.indent_root_tags" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_declarations" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_explicitconstructorcall_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_switch" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_superinterfaces" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.tabulation.size" value="4"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_opening_brace_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_brace_in_block" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_constant" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_if" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_throws" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_constructor_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_assignment_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_assignment_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_empty_lines" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_synchronized" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_paren_in_cast" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_block_in_case" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.number_of_empty_lines_to_preserve" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_catch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_and_in_type_parameter" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_qualified_allocation_expression" value="16"/>
+<setting id="org.eclipse.jdt.core.compiler.compliance" value="1.5"/>
+<setting id="org.eclipse.jdt.core.formatter.continuation_indentation_for_array_initializer" value="2"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_brackets_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_at_in_annotation_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_allocation_expression" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_cast" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_unary_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_anonymous_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_empty_array_initializer_on_one_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_imple_if_on_one_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_at_end_of_file_if_missing" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_labeled_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_type_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_binary_expression" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_enum_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_while" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode" value="enabled"/>
+<setting id="org.eclipse.jdt.core.formatter.put_empty_statement_on_new_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_label" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_parameter" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_while_in_do_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_enum_constant" value="48"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_javadoc_comments" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.line_length" value="9999"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_between_import_groups" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_constant_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_semicolon" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_constructor_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.number_of_blank_lines_at_beginning_of_method_body" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_type_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation_type_member_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_before_binary_operator" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_declaration_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_between_type_declarations" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_synchronized" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_statements_compare_to_block" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_enum_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.join_lines_in_comments" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_question_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_field_declarations" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_compact_if" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_inits" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_cases" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_array_initializer" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_default" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_and_in_type_parameter" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_imports" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_assert" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_html" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_method_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_anonymous_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_for" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_postfix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_source_code" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_synchronized" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_allocation_expression" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_throws" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_parameters_in_method_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_brace_in_array_initializer" value="insert"/>
+<setting id="org.eclipse.jdt.core.compiler.codegen.targetPlatform" value="1.5"/>
+<setting id="org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_annotation" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_member" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_header" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_block_comments" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_enum_constants" value="49"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_block" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_annotation_declaration_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_parenthesized_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_parenthesized_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_catch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_local_declarations" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_switch" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_increments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_assert" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_type_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_array_initializer" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_braces_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_for" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_catch" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_field_declarations" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_parameterized_type_reference" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_invocation_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.new_lines_at_javadoc_boundaries" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_after_imports" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_local_declarations" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_constant_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.never_indent_line_comments_on_first_column" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.never_indent_block_comments_on_first_column" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_then_statement_on_same_line" value="false"/>
+</profile>
+</profiles>
diff --git a/genomix/pom.xml b/genomix/pom.xml
new file mode 100644
index 0000000..299a208
--- /dev/null
+++ b/genomix/pom.xml
@@ -0,0 +1,201 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<groupId>edu.uci.ics.hyracks</groupId>
+	<artifactId>genomix</artifactId>
+	<version>0.2.3-SNAPSHOT</version>
+	<packaging>pom</packaging>
+	<name>genomix</name>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>2.0.2</version>
+				<configuration>
+					<source>1.6</source>
+					<target>1.6</target>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>appassembler-maven-plugin</artifactId>
+				<executions>
+					<execution>
+						<configuration>
+							<programs>
+								<program>
+									<mainClass>edu.uci.ics.genomix.Driver</mainClass>
+									<name>genomix</name>
+								</program>
+							</programs>
+							<repositoryLayout>flat</repositoryLayout>
+							<repositoryName>lib</repositoryName>
+						</configuration>
+						<phase>package</phase>
+						<goals>
+							<goal>assemble</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<version>2.2-beta-5</version>
+				<executions>
+					<execution>
+						<configuration>
+							<descriptors>
+								<descriptor>src/main/assembly/binary-assembly.xml</descriptor>
+							</descriptors>
+						</configuration>
+						<phase>package</phase>
+						<goals>
+							<goal>attached</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.7.2</version>
+				<configuration>
+					<forkMode>pertest</forkMode>
+					<argLine>-enableassertions -Xmx512m -XX:MaxPermSize=300m
+						-Dfile.encoding=UTF-8
+						-Djava.util.logging.config.file=src/test/resources/logging.properties</argLine>
+					<includes>
+						<include>**/*TestSuite.java</include>
+						<include>**/*Test.java</include>
+					</includes>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-clean-plugin</artifactId>
+				<configuration>
+					<filesets>
+						<fileset>
+							<directory>.</directory>
+							<includes>
+								<include>teststore*</include>
+								<include>edu*</include>
+								<include>actual*</include>
+								<include>build*</include>
+								<include>expect*</include>
+								<include>ClusterController*</include>
+							</includes>
+						</fileset>
+					</filesets>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+	<distributionManagement>
+		<repository>
+			<id>hyracks-releases</id>
+			<url>http://obelix.ics.uci.edu/nexus/content/repositories/hyracks-releases/</url>
+		</repository>
+		<snapshotRepository>
+			<id>hyracks-snapshots</id>
+			<url>http://obelix.ics.uci.edu/nexus/content/repositories/hyracks-snapshots/</url>
+		</snapshotRepository>
+	</distributionManagement>
+
+	<repositories>
+		<repository>
+			<id>hyracks-public</id>
+			<url>http://obelix.ics.uci.edu/nexus/content/groups/hyracks-public/</url>
+		</repository>
+		<repository>
+			<id>jboss-public</id>
+			<url>https://repository.jboss.org/nexus/content/groups/public/</url>
+		</repository>
+	</repositories>
+
+	<pluginRepositories>
+		<pluginRepository>
+			<id>hyracks-public</id>
+			<url>http://obelix.ics.uci.edu/nexus/content/groups/hyracks-public/</url>
+			<releases>
+				<updatePolicy>always</updatePolicy>
+			</releases>
+		</pluginRepository>
+	</pluginRepositories>
+
+	<dependencies>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.8.1</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-dataflow-std</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-api</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-dataflow-common</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-data-std</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-control-cc</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-control-nc</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.kenai.nbpwr</groupId>
+			<artifactId>org-apache-commons-io</artifactId>
+			<version>1.3.1-201002241208</version>
+			<type>nbm</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks.examples</groupId>
+			<artifactId>hyracks-integration-tests</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-ipc</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+	</dependencies>
+</project>
\ No newline at end of file
diff --git a/genomix/src/main/assembly/binary-assembly.xml b/genomix/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..0500499
--- /dev/null
+++ b/genomix/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,19 @@
+<assembly>
+  <id>binary-assembly</id>
+  <formats>
+    <format>zip</format>
+    <format>dir</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>target/appassembler/bin</directory>
+      <outputDirectory>bin</outputDirectory>
+      <fileMode>0755</fileMode>
+    </fileSet>
+    <fileSet>
+      <directory>target/appassembler/lib</directory>
+      <outputDirectory>lib</outputDirectory>
+    </fileSet>
+  </fileSets>
+</assembly>
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java b/genomix/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..bb868ce
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
@@ -0,0 +1,47 @@
+package edu.uci.ics.genomix.data.normalizers;

+

+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;

+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;

+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;

+

+public class Integer64NormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {

+    private static final long serialVersionUID = 8735044913496854551L;

+

+    @Override

+    public INormalizedKeyComputer createNormalizedKeyComputer() {

+        return new INormalizedKeyComputer() {

+            private static final int POSTIVE_LONG_MASK = (3 << 30);

+            private static final int NON_NEGATIVE_INT_MASK = (2 << 30);

+            private static final int NEGATIVE_LONG_MASK = (0 << 30);

+

+            @Override

+            public int normalize(byte[] bytes, int start, int length) {

+                long value = Integer64SerializerDeserializer.getLong(bytes, start);

+                int highValue = (int) (value >> 32);

+                if (highValue > 0) {

+                    /** * larger than Integer.MAX */

+                    int highNmk = getKey(highValue);

+                    highNmk >>= 2;

+                    highNmk |= POSTIVE_LONG_MASK;

+                    return highNmk;

+                } else if (highValue == 0) {

+                    /** * smaller than Integer.MAX but >=0 */

+                    int lowNmk = (int) value;

+                    lowNmk >>= 2;

+                    lowNmk |= NON_NEGATIVE_INT_MASK;

+                    return lowNmk;

+                } else {

+                    /** * less than 0: have not optimized for that */

+                    int highNmk = getKey(highValue);

+                    highNmk >>= 2;

+                    highNmk |= NEGATIVE_LONG_MASK;

+                    return highNmk;

+                }

+            }

+

+            private int getKey(int value) {

+                return value ^ Integer.MIN_VALUE;

+            }

+        };

+    }

+}

diff --git a/genomix/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java b/genomix/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
new file mode 100644
index 0000000..90bd12f
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.genomix.data.partition;

+

+import java.nio.ByteBuffer;

+

+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;

+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;

+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;

+

+public class KmerHashPartitioncomputerFactory implements ITuplePartitionComputerFactory {

+

+    private static final long serialVersionUID = 1L;

+

+    public KmerHashPartitioncomputerFactory() {

+    }

+

+    @Override

+    public ITuplePartitionComputer createPartitioner() {

+        return new ITuplePartitionComputer() {

+            @Override

+            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) {

+                if (nParts == 1) {

+                    return 0;

+                }

+                int startOffset = accessor.getTupleStartOffset(tIndex);

+                int fieldOffset = accessor.getFieldStartOffset(tIndex, 0);

+                int slotLength = accessor.getFieldSlotsLength();

+

+                ByteBuffer buf = accessor.getBuffer();

+                buf.position(startOffset + fieldOffset + slotLength);

+                long l = accessor.getBuffer().getLong();

+                return (int) (l % nParts);

+            }

+        };

+    }

+}

diff --git a/genomix/src/main/java/edu/uci/ics/genomix/data/serde/ByteSerializerDeserializer.java b/genomix/src/main/java/edu/uci/ics/genomix/data/serde/ByteSerializerDeserializer.java
new file mode 100644
index 0000000..41dcb07
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/data/serde/ByteSerializerDeserializer.java
@@ -0,0 +1,45 @@
+package edu.uci.ics.genomix.data.serde;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class ByteSerializerDeserializer implements ISerializerDeserializer<Byte> {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final ByteSerializerDeserializer INSTANCE = new ByteSerializerDeserializer();
+
+    private ByteSerializerDeserializer() {
+    }
+
+    @Override
+    public Byte deserialize(DataInput in) throws HyracksDataException {
+        try {
+            return in.readByte();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void serialize(Byte instance, DataOutput out) throws HyracksDataException {
+        try {
+            out.writeByte(instance.intValue());
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public static byte getByte(byte[] bytes, int offset) {
+        return bytes[offset];
+    }
+
+    public static void putByte(byte val, byte[] bytes, int offset) {
+        bytes[offset] = val;
+    }
+
+}
\ No newline at end of file
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
new file mode 100644
index 0000000..e7a8e6c
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
@@ -0,0 +1,241 @@
+package edu.uci.ics.genomix.dataflow;

+

+import java.io.BufferedReader;

+import java.io.FileInputStream;

+import java.io.InputStream;

+import java.io.InputStreamReader;

+import java.nio.ByteBuffer;

+

+import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;

+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;

+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;

+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;

+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;

+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;

+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;

+

+public class FileScanDescriptor extends AbstractSingleActivityOperatorDescriptor {

+

+    private static final long serialVersionUID = 1L;

+    private int k;

+

+    public FileScanDescriptor(IOperatorDescriptorRegistry spec, int k) {

+        super(spec, 0, 1);

+        // TODO Auto-generated constructor stub

+        this.k = k;

+        //recordDescriptors[0] = news RecordDescriptor(

+        //		new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });

+        recordDescriptors[0] = new RecordDescriptor(new ISerializerDeserializer[] {

+                Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE });

+    }

+

+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,

+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {

+

+        final int temp = partition;

+

+        // TODO Auto-generated method stub

+        return (IOperatorNodePushable) new AbstractUnaryOutputSourceOperatorNodePushable() {

+            private ArrayTupleBuilder tupleBuilder;

+            private ByteBuffer outputBuffer;

+            private FrameTupleAppender outputAppender;

+            private long window;

+

+            @Override

+            public void initialize() {

+

+                window = 0;

+                for (int i = 0; i < k; i++) {

+                    window <<= 2;

+                    window |= 3;

+                }

+

+                tupleBuilder = new ArrayTupleBuilder(2);

+                outputBuffer = ctx.allocateFrame();

+                outputAppender = new FrameTupleAppender(ctx.getFrameSize());

+                outputAppender.reset(outputBuffer, true);

+                try {// one try with multiple catch?

+                     //FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(

+                     //        FileScanDescriptor.class.getSimpleName());

+                     //writer = new RunFileWriter(file, ctx.getIOManager());

+                    writer.open();

+                    // read the file

+                    InputStream filenames;

+                    /*File roots = new File("G:\\data");

+                    for (File file : roots.listFiles())

+                    	System.out.println(file);

+                    String s = "G:" + File.separator + "data"

+                    		+ File.separator + "filename.txt";*/

+

+                    String s = "g:\\data\\filename" + String.valueOf(temp) + ".txt";

+

+                    filenames = new FileInputStream(s);

+                    // filenames = new FileInputStream("filename.txt");

+

+                    String line;

+                    BufferedReader reader = new BufferedReader(new InputStreamReader(filenames));

+                    line = reader.readLine();

+                    while (line != null) {

+                        BufferedReader readsfile = new BufferedReader(new InputStreamReader(new FileInputStream(line)));

+                        String read = readsfile.readLine();

+                        while (read != null) {

+                            read = readsfile.readLine();

+                            SplitReads(read.getBytes());

+                            //read.getBytes();

+

+                            read = readsfile.readLine();

+                            read = readsfile.readLine();

+

+                            read = readsfile.readLine();

+                        }

+                        line = reader.readLine();

+                        readsfile.close();

+                    }

+                    reader.close();

+                    filenames.close();

+                    if (outputAppender.getTupleCount() > 0) {

+                        FrameUtils.flushFrame(outputBuffer, writer);

+                    }

+                    outputAppender = null;

+                    outputBuffer = null;

+                    // sort code for external sort here?

+                    writer.close();

+                } catch (Exception e) {

+                    // TODO Auto-generated catch block

+                    throw new IllegalStateException(e);

+                }

+            }

+

+            private long CompressKmer(byte[] array, int start) {

+                // a: 00; c: 01; G: 10; T: 11

+                long l = 0;

+                for (int i = start; i < start + k; i++) {

+                    l <<= 2;

+                    switch (array[i]) {

+                        case 'A':

+                        case 'a':

+                            l |= 0;

+                            break;

+                        case 'C':

+                        case 'c':

+                            l |= 1;

+                            break;

+                        case 'G':

+                        case 'g':

+                            l |= 2;

+                            break;

+                        case 'T':

+                        case 't':

+                            l |= 3;

+                            break;

+                    }

+                }

+                return l;

+            }

+

+            private byte GetBitmap(byte t) {

+                byte r = 0;

+                switch (t) {

+                    case 'A':

+                    case 'a':

+                        r = 1;

+                        break;

+                    case 'C':

+                    case 'c':

+                        r = 2;

+                        break;

+                    case 'G':

+                    case 'g':

+                        r = 4;

+                        break;

+                    case 'T':

+                    case 't':

+                        r = 8;

+                        break;

+                }

+                return r;

+            }

+

+            private byte ConvertSymbol(byte t) {

+                byte r = 0;

+                switch (t) {

+                    case 'A':

+                    case 'a':

+                        r = 0;

+                        break;

+                    case 'C':

+                    case 'c':

+                        r = 1;

+                        break;

+                    case 'G':

+                    case 'g':

+                        r = 2;

+                        break;

+                    case 'T':

+                    case 't':

+                        r = 3;

+                        break;

+                }

+                return r;

+            }

+

+            private void SplitReads(byte[] array) {

+                try {

+                    long l = 0;

+

+                    byte pre = 0, next = 0;

+                    byte r;

+

+                    for (int i = 0; i < array.length - k + 1; i++) {

+                        if (0 == i) {

+                            l = CompressKmer(array, i);

+                        } else {

+                            l <<= 2;

+                            l &= window;

+                            l |= ConvertSymbol(array[i + k - 1]);

+                            pre = GetBitmap(array[i - 1]);

+                        }

+                        if (i + k != array.length) {

+                            next = GetBitmap(array[i + k]);

+                        }

+

+                        r = 0;

+                        r |= pre;

+                        r <<= 4;

+                        r |= next;

+

+                        /*System.out.print(l);

+                        System.out.print(' ');

+                        System.out.print(r);

+                        System.out.println();*/

+

+                        tupleBuilder.reset();

+

+                        tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE, l);

+                        tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, r);

+

+                        if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,

+                                tupleBuilder.getSize())) {

+                            FrameUtils.flushFrame(outputBuffer, writer);

+                            outputAppender.reset(outputBuffer, true);

+                            if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(),

+                                    0, tupleBuilder.getSize())) {

+                                throw new IllegalStateException(

+                                        "Failed to copy an record into a frame: the record size is too large.");

+                            }

+                        }

+                    }

+                } catch (Exception e) {

+                    throw new IllegalStateException(e);

+                }

+            }

+        };

+

+    }

+}

diff --git a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/GenKmerDescriptor.java b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/GenKmerDescriptor.java
new file mode 100644
index 0000000..abcb54e
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/GenKmerDescriptor.java
@@ -0,0 +1,159 @@
+package edu.uci.ics.genomix.dataflow;

+

+import java.io.DataInput;

+import java.io.DataOutput;

+import java.io.IOException;

+import java.util.List;

+

+import edu.uci.ics.genomix.data.normalizers.Integer64NormalizedKeyComputerFactory;

+import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

+import edu.uci.ics.hyracks.api.comm.IFrameReader;

+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

+import edu.uci.ics.hyracks.api.dataflow.ActivityId;

+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;

+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;

+import edu.uci.ics.hyracks.api.dataflow.TaskId;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;

+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;

+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;

+import edu.uci.ics.hyracks.api.job.JobId;

+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;

+import edu.uci.ics.hyracks.data.std.primitive.LongPointable;

+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;

+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;

+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;

+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;

+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;

+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;

+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunMerger;

+import edu.uci.ics.hyracks.dataflow.std.sort.FrameSorter;

+

+public class GenKmerDescriptor extends AbstractOperatorDescriptor {

+

+    private static final long serialVersionUID = 1L;

+    private static final int SPLIT_ACTIVITY_ID = 0;

+    private static final int MERGE_ACTIVITY_ID = 1;

+    private final int framesLimit;

+    private final int k;

+

+    public GenKmerDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int k) {

+        super(spec, 1, 1);

+        this.framesLimit = framesLimit;

+        this.k = k;

+

+        // TODO Auto-generated constructor stub

+        recordDescriptors[0] = new RecordDescriptor(new ISerializerDeserializer[] {

+                Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,

+                IntegerSerializerDeserializer.INSTANCE });

+    }

+

+    @Override

+    public void contributeActivities(IActivityGraphBuilder builder) {

+        // TODO Auto-generated method stub

+        SplitActivity sa = new SplitActivity(new ActivityId(odId, SPLIT_ACTIVITY_ID));

+        MergeActivity ma = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));

+        builder.addActivity(this, sa);

+        builder.addSourceEdge(0, sa, 0);

+

+        builder.addActivity(this, ma);

+        builder.addTargetEdge(0, ma, 0);

+

+        builder.addBlockingEdge(sa, ma);

+    }

+

+    private class SplitActivity extends AbstractActivityNode {

+        /**

+		 * 

+		 */

+        private static final long serialVersionUID = 1L;

+

+        public SplitActivity(ActivityId activityID) {

+            super(activityID);

+            // TODO Auto-generated constructor stub

+        }

+

+        public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,

+                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)

+                throws HyracksDataException {

+            // TODO Auto-generated method stub

+            //IHyracksTaskContext ctx, int k, RecordDescriptor rd_in, int buffer_size

+            KmerSplitOperatorNodePushable op = new KmerSplitOperatorNodePushable(ctx, k, new RecordDescriptor(

+                    new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE }), framesLimit,

+                    new TaskId(this.id, partition));

+            return op;

+        }

+    }

+

+    public static class SplitTaskState extends AbstractStateObject {

+        List<IFrameReader> runs;

+

+        public SplitTaskState() {

+        }

+

+        public SplitTaskState(JobId jobId, TaskId taskId, List<IFrameReader> runs) {

+            super(jobId, taskId);

+            this.runs = runs;

+        }

+

+        @Override

+        public void toBytes(DataOutput out) throws IOException {

+

+        }

+

+        @Override

+        public void fromBytes(DataInput in) throws IOException {

+

+        }

+    }

+

+    private class MergeActivity extends AbstractActivityNode {

+

+        /**

+		 * 

+		 */

+        private static final long serialVersionUID = 1L;

+

+        public MergeActivity(ActivityId id) {

+            super(id);

+            // TODO Auto-generated constructor stub

+        }

+

+        @Override

+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,

+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)

+                throws HyracksDataException {

+            // TODO Auto-generated method stub

+            IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {

+                @Override

+                public void initialize() throws HyracksDataException {

+                    SplitTaskState state = (SplitTaskState) ctx.getStateObject(new TaskId(new ActivityId(

+                            getOperatorId(), SPLIT_ACTIVITY_ID), partition));

+                    //List<IFrameReader> runs = runs = new LinkedList<IFrameReader>();;

+

+                    IBinaryComparator[] comparators = new IBinaryComparator[1];

+                    IBinaryComparatorFactory cf = PointableBinaryComparatorFactory.of(LongPointable.FACTORY);

+                    comparators[0] = cf.createBinaryComparator();

+

+                    //int necessaryFrames = Math.min(runs.size() + 2, framesLimit);

+

+                    FrameSorter frameSorter = new FrameSorter(

+                            ctx,

+                            new int[] { 0 },

+                            new Integer64NormalizedKeyComputerFactory(),

+                            new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },

+                            recordDescriptors[0]);

+

+                    ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, frameSorter, state.runs,

+                            new int[] { 0 }, comparators, recordDescriptors[0], framesLimit, writer);

+                    merger.process();

+                }

+            };

+            return op;

+        }

+    }

+}

diff --git a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/KmerSplitOperatorNodePushable.java b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/KmerSplitOperatorNodePushable.java
new file mode 100644
index 0000000..1f6731d
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/KmerSplitOperatorNodePushable.java
@@ -0,0 +1,254 @@
+package edu.uci.ics.genomix.dataflow;

+

+import java.nio.ByteBuffer;

+import java.util.List;

+

+import edu.uci.ics.genomix.data.normalizers.Integer64NormalizedKeyComputerFactory;

+import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

+import edu.uci.ics.genomix.dataflow.GenKmerDescriptor.SplitTaskState;

+import edu.uci.ics.hyracks.api.comm.IFrameReader;

+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

+import edu.uci.ics.hyracks.api.dataflow.TaskId;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;

+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;

+import edu.uci.ics.hyracks.data.std.primitive.LongPointable;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;

+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;

+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;

+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;

+

+public class KmerSplitOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {

+

+    private final int k;

+    private long window;

+

+    private final SplitFrame frameSorter;

+

+    private FrameTupleAccessor accessor;

+    private ArrayTupleBuilder tupleBuilder;

+    private TaskId MytaskId;

+    private IHyracksTaskContext ctx;

+

+    public KmerSplitOperatorNodePushable(IHyracksTaskContext ctx, int k, RecordDescriptor rd_in, int buffer_size,

+            TaskId taskid) {

+

+        tupleBuilder = new ArrayTupleBuilder(3);

+        this.k = k;

+

+        RecordDescriptor rd = new RecordDescriptor(new ISerializerDeserializer[] {

+                Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,

+                IntegerSerializerDeserializer.INSTANCE });

+

+        int[] sortFields = { 0 };

+        frameSorter = new SplitFrame(ctx, sortFields, new Integer64NormalizedKeyComputerFactory(),

+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) }, rd,

+                buffer_size);

+

+        accessor = new FrameTupleAccessor(ctx.getFrameSize(), rd_in);

+

+        new FrameTupleAccessor(ctx.getFrameSize(), rd);

+        new FrameTupleAppender(ctx.getFrameSize());

+

+        ByteBuffer.allocate(ctx.getFrameSize());

+

+        //initialize the window

+        window = 0;

+        for (int i = 0; i < k; i++) {

+            window <<= 2;

+            window |= 3;

+        }

+

+        MytaskId = taskid;

+        this.ctx = ctx;

+    }

+

+    @Override

+    public void open() throws HyracksDataException {

+        // TODO Auto-generated method stub

+        //writer.open();

+

+    }

+

+    @Override

+    public void nextFrame(ByteBuffer buffer) {

+        accessor.reset(buffer);

+        int tupleCount = accessor.getTupleCount();

+        ByteBuffer temp_buf = accessor.getBuffer();

+        for (int i = 0; i < tupleCount; i++) {

+            int tupleStartOffset = accessor.getTupleStartOffset(i);

+            int fieldStartOffset = accessor.getFieldStartOffset(i, 0);

+            int loadLength = accessor.getFieldLength(i, 0);

+            //int loadLength = temp_buf.getInt(tupleStartOffset);

+            byte[] read = new byte[loadLength];

+            int slotLength = accessor.getFieldSlotsLength();

+            //temp_buf.position(tupleStartOffset+fieldStartOffset + accessor.getFieldSlotsLength());

+            int pos = tupleStartOffset + fieldStartOffset + slotLength;

+            //temp_buf

+            try {

+                temp_buf.position(pos);

+                temp_buf.get(read, 0, loadLength);

+                SplitReads(read);

+            } catch (Exception e) {

+                // TODO Auto-generated catch block

+                e.printStackTrace();

+            }

+

+            tupleBuilder.reset();

+

+        }

+    }

+

+    private long CompressKmer(byte[] array, int start, int k) {

+        // a: 00; c: 01; G: 10; T: 11

+        long l = 0;

+        for (int i = start; i < start + k; i++) {

+            l <<= 2;

+            switch (array[start + i]) {

+                case 'A':

+                case 'a':

+                    l |= 0;

+                    break;

+                case 'C':

+                case 'c':

+                    l |= 1;

+                    break;

+                case 'G':

+                case 'g':

+                    l |= 2;

+                    break;

+                case 'T':

+                case 't':

+                    l |= 3;

+                    break;

+            }

+        }

+        return l;

+    }

+

+    private byte GetBitmap(byte t) {

+        byte r = 0;

+        switch (t) {

+            case 'A':

+            case 'a':

+                r = 1;

+                break;

+            case 'C':

+            case 'c':

+                r = 2;

+                break;

+            case 'G':

+            case 'g':

+                r = 4;

+                break;

+            case 'T':

+            case 't':

+                r = 8;

+                break;

+        }

+        return r;

+    }

+

+    private byte ConvertSymbol(byte t) {

+        byte r = 0;

+        switch (t) {

+            case 'A':

+            case 'a':

+                r = 0;

+                break;

+            case 'C':

+            case 'c':

+                r = 1;

+                break;

+            case 'G':

+            case 'g':

+                r = 2;

+                break;

+            case 'T':

+            case 't':

+                r = 3;

+                break;

+        }

+        return r;

+    }

+

+    private void SplitReads(byte[] array) {

+        try {

+            long l = 0;

+

+            byte pre = 0, next = 0;

+            byte r;

+

+            for (int i = 2; i < array.length - k + 1; i++) {

+                if (2 == i) {

+                    l = CompressKmer(array, i, k);

+                } else {

+                    l <<= 2;

+                    l &= window;

+                    l |= ConvertSymbol(array[i + k - 1]);

+                    pre = GetBitmap(array[i - 1]);

+                }

+                if (i + k != array.length) {

+                    next = GetBitmap(array[i + k]);

+                }

+

+                r = 0;

+                r |= pre;

+                r <<= 4;

+                r |= next;

+

+                /*System.out.print(l);

+                System.out.print(' ');

+                System.out.print(r);

+                System.out.println();*/

+

+                frameSorter.insertKmer(l, r);

+            }

+        } catch (Exception e) {

+            e.printStackTrace();

+        }

+    }

+

+    @Override

+    public void fail() throws HyracksDataException {

+        // TODO Auto-generated method stub

+

+    }

+

+    @Override

+    public void close() {

+        // TODO Auto-generated method stub

+        try {

+            frameSorter.processLastFrame();

+            SplitTaskState state = new SplitTaskState(ctx.getJobletContext().getJobId(), MytaskId,

+                    frameSorter.GetRuns());

+            ctx.setStateObject(state);

+

+            //writer.close();

+        } catch (Exception e) {

+            // TODO Auto-generated catch block

+            e.printStackTrace();

+        }

+    }

+

+    public List<IFrameReader> GetRuns() {

+        return frameSorter.GetRuns();

+    }

+

+    //for debug

+    /*	private void DumpBlock(ByteBuffer f){

+    		

+    		int n = f.array().length/13;

+    		

+    		for(int i = 0 ; i < n ; i++){

+    			long t = LongPointable.getLong(f.array(), 13 * i);

+    			System.out.print(t);

+    			System.out.print(' ');

+    		}

+    		System.out.println();

+    	}*/

+}

diff --git a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/SplitFrame.java b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/SplitFrame.java
new file mode 100644
index 0000000..179c983
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/SplitFrame.java
@@ -0,0 +1,484 @@
+/*

+ * Copyright 2009-2010 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ * 

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ * 

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+package edu.uci.ics.genomix.dataflow;

+

+import java.nio.ByteBuffer;

+import java.util.LinkedList;

+import java.util.List;

+

+import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

+import edu.uci.ics.hyracks.api.comm.IFrameReader;

+import edu.uci.ics.hyracks.api.comm.IFrameWriter;

+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;

+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;

+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

+import edu.uci.ics.hyracks.api.io.FileReference;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;

+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;

+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;

+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;

+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;

+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunGenerator;

+import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;

+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;

+

+public class SplitFrame {

+

+    private static int HASH_SIZE = 4096;

+    private final SerializableHashTable table;

+    private final TuplePointer tempPointer;

+    private ArrayTupleBuilder tupleBuilder;

+    private final int buf_size;

+

+    private final IHyracksTaskContext ctx;

+    private final int[] sortFields;

+    private final INormalizedKeyComputer nkc;

+    private final IBinaryComparator[] comparators;

+    private final ByteBuffer[] buffers;

+

+    private final FrameTupleAccessor fta1;

+    private final FrameTupleAccessor fta2;

+

+    private final FrameTupleAppender appender;

+

+    private final ByteBuffer outFrame;

+

+    private int dataFrameCount;

+    private int[] tPointers;

+    private int tupleCount;

+    private final List<IFrameReader> runs;

+    private int flushCount;

+    private RecordDescriptor recordDescriptor;

+

+    private int FrameTupleCount;

+

+    public SplitFrame(IHyracksTaskContext ctx, int[] sortFields,

+            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,

+            RecordDescriptor recordDescriptor, int buf_size) {

+        this.ctx = ctx;

+        this.sortFields = sortFields;

+        this.recordDescriptor = recordDescriptor;

+

+        nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();

+        comparators = new IBinaryComparator[comparatorFactories.length];

+        for (int i = 0; i < comparatorFactories.length; ++i) {

+            comparators[i] = comparatorFactories[i].createBinaryComparator();

+        }

+        fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);

+        fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);

+        appender = new FrameTupleAppender(ctx.getFrameSize());

+        outFrame = ctx.allocateFrame();

+        table = new SerializableHashTable(HASH_SIZE, ctx);

+        dataFrameCount = 0;

+

+        tempPointer = new TuplePointer();

+        tupleBuilder = new ArrayTupleBuilder(3);

+        this.buf_size = buf_size;

+        buffers = new ByteBuffer[buf_size];

+        for (int i = 0; i < buf_size; i++) {

+            buffers[i] = ByteBuffer.allocate(ctx.getFrameSize());

+        }

+        appender.reset(buffers[0], true);

+        flushCount = 0;

+        runs = new LinkedList<IFrameReader>();

+        FrameTupleCount = 0;

+    }

+

+    public void reset() {

+        dataFrameCount = 0;

+        tupleCount = 0;

+        appender.reset(buffers[0], true);

+    }

+

+    public int getFrameCount() {

+        return dataFrameCount;

+    }

+

+    private void SearchHashTable(long entry, TuplePointer dataPointer) {

+        int offset = 0;

+        int tp = (int) (entry % HASH_SIZE);

+        if (tp < 0) {

+            tp = -tp;

+        }

+        do {

+            table.getTuplePointer(tp, offset, dataPointer);// what is the offset mean?

+            if (dataPointer.frameIndex < 0 || dataPointer.tupleIndex < 0)

+                break;

+            int bIndex = dataPointer.frameIndex;

+            int tIndex = dataPointer.tupleIndex;

+            fta1.reset(buffers[bIndex]);

+

+            /* System.out.print("a:");

+             System.out.print(tIndex);

+             System.out.print(" b");

+             System.out.print(fta1.getTupleCount());

+             System.out.println();*/

+

+            int tupleOffset = fta1.getTupleStartOffset(tIndex);

+            int fieldOffset = fta1.getFieldStartOffset(tIndex, 0);

+            int slotLength = fta1.getFieldSlotsLength();

+            int pos = tupleOffset + fieldOffset + slotLength;

+            long l = buffers[bIndex].getLong(pos);

+            if (l == entry) {

+                break;

+            }

+            offset += 1;

+        } while (true);

+    }

+

+    private void InsertHashTable(long entry, int frame_id, int tuple_id) {

+

+        tempPointer.frameIndex = frame_id;

+        tempPointer.tupleIndex = tuple_id;

+

+        //System.out.print(frame_id);

+        //System.out.print(' ');

+        //System.out.println(tuple_id);

+

+        int tp = (int) (entry % HASH_SIZE);

+        if (tp < 0) {

+            tp = -tp;

+        }

+        table.insert(tp, tempPointer);

+

+    }

+

+    public void insertKmer(long l, byte r) {

+        try {

+            SearchHashTable(l, tempPointer);

+            if (tempPointer.frameIndex != -1 && tempPointer.tupleIndex != -1) {

+                fta1.reset(buffers[tempPointer.frameIndex]);

+                int tStart = fta1.getTupleStartOffset(tempPointer.tupleIndex);

+                int f0StartRel = fta1.getFieldStartOffset(tempPointer.tupleIndex, 1);

+                int slotLength = fta1.getFieldSlotsLength();

+                int pos = f0StartRel + tStart + slotLength;

+

+                buffers[tempPointer.frameIndex].array()[pos] |= r;

+                buffers[tempPointer.frameIndex].position(pos + 1);

+                int temp_int = buffers[tempPointer.frameIndex].getInt();

+                temp_int += 1;

+                buffers[tempPointer.frameIndex].position(pos + 1);

+                buffers[tempPointer.frameIndex].putInt(temp_int);

+            } else {

+                tupleBuilder.reset();

+                tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE, l);

+                tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, r);

+                tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, 1);

+

+                /*System.out.print(l);

+                System.out.print(' ');

+                System.out.print(r);

+                System.out.println();*/

+                boolean b = appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,

+                        tupleBuilder.getSize());

+

+                if (!b) {

+                    dataFrameCount++;

+                    FrameTupleCount = 0;

+                    if (dataFrameCount < buf_size) {

+                        appender.reset(buffers[dataFrameCount], true);

+                    } else {

+                        sortFrames();

+                        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(

+                                ExternalSortRunGenerator.class.getSimpleName());

+                        RunFileWriter writer = new RunFileWriter(file, ctx.getIOManager());

+                        writer.open();

+                        try {

+                            flushCount += 1;

+                            flushFrames(writer);

+                        } finally {

+                            writer.close();

+                        }

+                        runs.add(writer.createReader());

+                        dataFrameCount = 0;

+                        appender.reset(buffers[dataFrameCount], true);

+                    }

+                    boolean tb = appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,

+                            tupleBuilder.getSize());

+                    if (!tb) {

+                        throw new HyracksDataException(

+                                "Failed to copy an record into a frame: the record size is too large");

+                    }

+                }

+                InsertHashTable(l, dataFrameCount, FrameTupleCount);

+                FrameTupleCount += 1;

+            }

+        } catch (HyracksDataException e) {

+            // TODO Auto-generated catch block

+            e.printStackTrace();

+        }

+    }

+

+    public void sortFrames() {

+        int nBuffers = dataFrameCount;

+        tupleCount = 0;

+        for (int i = 0; i < nBuffers; ++i) {

+            fta1.reset(buffers[i]);

+            tupleCount += fta1.getTupleCount();

+        }

+        int sfIdx = sortFields[0];

+        tPointers = tPointers == null || tPointers.length < tupleCount * 4 ? new int[tupleCount * 4] : tPointers;

+        int ptr = 0;

+        for (int i = 0; i < nBuffers; ++i) {

+            fta1.reset(buffers[i]);

+            int tCount = fta1.getTupleCount();

+            byte[] array = fta1.getBuffer().array();

+            for (int j = 0; j < tCount; ++j) {

+                int tStart = fta1.getTupleStartOffset(j);

+                int tEnd = fta1.getTupleEndOffset(j);

+                tPointers[ptr * 4] = i;

+                tPointers[ptr * 4 + 1] = tStart;

+                tPointers[ptr * 4 + 2] = tEnd;

+                int f0StartRel = fta1.getFieldStartOffset(j, sfIdx);

+                int f0EndRel = fta1.getFieldEndOffset(j, sfIdx);

+                int f0Start = f0StartRel + tStart + fta1.getFieldSlotsLength();

+                tPointers[ptr * 4 + 3] = nkc == null ? 0 : nkc.normalize(array, f0Start, f0EndRel - f0StartRel);

+                ++ptr;

+            }

+        }

+        if (tupleCount > 0) {

+            sort(tPointers, 0, tupleCount);

+        }

+

+        DumpAllBuffers();

+        //point the pointer to the first one

+        dataFrameCount = 0;

+    }

+

+    public void flushFrames(IFrameWriter writer) throws HyracksDataException {

+        appender.reset(outFrame, true);

+        for (int ptr = 0; ptr < tupleCount; ++ptr) {

+            int i = tPointers[ptr * 4];

+            int tStart = tPointers[ptr * 4 + 1];

+            int tEnd = tPointers[ptr * 4 + 2];

+            ByteBuffer buffer = buffers[i];

+            fta1.reset(buffer);

+            if (!appender.append(fta1, tStart, tEnd)) {

+                FrameUtils.flushFrame(outFrame, writer);

+                appender.reset(outFrame, true);

+                if (!appender.append(fta1, tStart, tEnd)) {

+                    throw new IllegalStateException();

+                }

+            }

+        }

+        if (appender.getTupleCount() > 0) {

+            FrameUtils.flushFrame(outFrame, writer);

+        }

+    }

+

+    private void sort(int[] tPointers, int offset, int length) {

+        int m = offset + (length >> 1);

+        int mi = tPointers[m * 4];

+        int mj = tPointers[m * 4 + 1];

+        int mv = tPointers[m * 4 + 3];

+

+        int a = offset;

+        int b = a;

+        int c = offset + length - 1;

+        int d = c;

+        while (true) {

+            while (b <= c) {

+                int cmp = compare(tPointers, b, mi, mj, mv);

+                if (cmp > 0) {

+                    break;

+                }

+                if (cmp == 0) {

+                    swap(tPointers, a++, b);

+                }

+                ++b;

+            }

+            while (c >= b) {

+                int cmp = compare(tPointers, c, mi, mj, mv);

+                if (cmp < 0) {

+                    break;

+                }

+                if (cmp == 0) {

+                    swap(tPointers, c, d--);

+                }

+                --c;

+            }

+            if (b > c)

+                break;

+            swap(tPointers, b++, c--);

+        }

+

+        int s;

+        int n = offset + length;

+        s = Math.min(a - offset, b - a);

+        vecswap(tPointers, offset, b - s, s);

+        s = Math.min(d - c, n - d - 1);

+        vecswap(tPointers, b, n - s, s);

+

+        if ((s = b - a) > 1) {

+            sort(tPointers, offset, s);

+        }

+        if ((s = d - c) > 1) {

+            sort(tPointers, n - s, s);

+        }

+    }

+

+    private void swap(int x[], int a, int b) {

+        for (int i = 0; i < 4; ++i) {

+            int t = x[a * 4 + i];

+            x[a * 4 + i] = x[b * 4 + i];

+            x[b * 4 + i] = t;

+        }

+    }

+

+    private void vecswap(int x[], int a, int b, int n) {

+        for (int i = 0; i < n; i++, a++, b++) {

+            swap(x, a, b);

+        }

+    }

+

+    private int compare(int[] tPointers, int tp1, int tp2i, int tp2j, int tp2v) {

+        int i1 = tPointers[tp1 * 4];

+        int j1 = tPointers[tp1 * 4 + 1];

+        int v1 = tPointers[tp1 * 4 + 3];

+        if (v1 != tp2v) {

+            return ((((long) v1) & 0xffffffffL) < (((long) tp2v) & 0xffffffffL)) ? -1 : 1;

+        }

+        int i2 = tp2i;

+        int j2 = tp2j;

+        ByteBuffer buf1 = buffers[i1];

+        ByteBuffer buf2 = buffers[i2];

+        byte[] b1 = buf1.array();

+        byte[] b2 = buf2.array();

+        fta1.reset(buf1);

+        fta2.reset(buf2);

+        for (int f = 0; f < comparators.length; ++f) {

+            int fIdx = sortFields[f];

+            int f1Start = fIdx == 0 ? 0 : buf1.getInt(j1 + (fIdx - 1) * 4);

+            int f1End = buf1.getInt(j1 + fIdx * 4);

+            int s1 = j1 + fta1.getFieldSlotsLength() + f1Start;

+            int l1 = f1End - f1Start;

+            int f2Start = fIdx == 0 ? 0 : buf2.getInt(j2 + (fIdx - 1) * 4);

+            int f2End = buf2.getInt(j2 + fIdx * 4);

+            int s2 = j2 + fta2.getFieldSlotsLength() + f2Start;

+            int l2 = f2End - f2Start;

+            int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);

+            if (c != 0) {

+                return c;

+            }

+        }

+        return 0;

+    }

+

+    public void close() {

+        //this.buffers.clear();

+    }

+

+    public int getFlushCount() {

+        return flushCount;

+    }

+

+    /*public void AddRuns(RunFileWriter r){

+    	try {

+    		runs.add(r.createReader());

+    	} catch (HyracksDataException e) {

+    		// TODO Auto-generated catch block

+    		e.printStackTrace();

+    	}

+    }*/

+

+    public List<IFrameReader> GetRuns() {

+        return runs;

+    }

+

+    private void DumpAllBuffers() {

+        FrameTupleAccessor tfa = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);

+

+        for (int i = 0; i < dataFrameCount; i++) {

+            tfa.reset(buffers[i]);

+            int t = tfa.getTupleCount();

+            for (int j = 0; j < t; j++) {

+                int tupleOffset = tfa.getTupleStartOffset(j);

+

+                int r = tfa.getFieldStartOffset(j, 0);

+                int pos = tupleOffset + r + tfa.getFieldSlotsLength();

+                long l = buffers[i].getLong(pos);

+                System.out.print(l);

+                System.out.print(' ');

+

+                r = tfa.getFieldStartOffset(j, 1);

+                pos = tupleOffset + r + tfa.getFieldSlotsLength();

+                byte b = buffers[i].array()[pos];

+                System.out.print(b);

+                System.out.print(' ');

+

+                r = tfa.getFieldStartOffset(j, 2);

+                pos = tupleOffset + r + tfa.getFieldSlotsLength();

+                int o = buffers[i].getInt(pos);

+                System.out.print(o);

+                System.out.print(' ');

+

+                System.out.println();

+            }

+        }

+        System.out.println("---------------------------------");

+    }

+

+    //functions for dubugging

+    //    private void DumpBuffer(byte[] f) {

+    //        int n = f.length;

+    //

+    //        int count = 0;

+    //        for (int i = 0; i < n; i++) {

+    //            if (i % 13 == 0) {

+    //                if (count != 0) {

+    //                    System.out.print(")(");

+    //                } else {

+    //                    System.out.print("(");

+    //                }

+    //                System.out.print(count);

+    //                System.out.print(':');

+    //                count += 1;

+    //            }

+    //            System.out.print(f[i]);

+    //            System.out.print(' ');

+    //        }

+    //        System.out.println(')');

+    //    }

+

+    public void processLastFrame() {

+        sortFrames();

+        FileReference file;

+

+        DumpAllBuffers();

+        try {

+            file = ctx.getJobletContext().createManagedWorkspaceFile(ExternalSortRunGenerator.class.getSimpleName());

+            RunFileWriter writer = new RunFileWriter(file, ctx.getIOManager());

+            writer.open();

+            try {

+                flushCount += 1;

+                flushFrames(writer);

+            } finally {

+                writer.close();

+            }

+            runs.add(writer.createReader());

+        } catch (HyracksDataException e) {

+            // TODO Auto-generated catch block

+            e.printStackTrace();

+        }

+        //frameSorter.AddRuns((RunFileWriter) writer);

+

+    }

+}
\ No newline at end of file
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
new file mode 100644
index 0000000..eeeecea
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -0,0 +1,460 @@
+package edu.uci.ics.genomix.dataflow;

+

+import java.io.File;

+import java.util.logging.Level;

+import java.util.logging.Logger;

+

+import edu.uci.ics.genomix.data.normalizers.Integer64NormalizedKeyComputerFactory;

+import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;

+import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

+import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;

+import edu.uci.ics.genomix.dataflow.aggregators.MergeKmerAggregateFactory;

+import edu.uci.ics.hyracks.api.client.HyracksConnection;

+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;

+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;

+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;

+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;

+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;

+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;

+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;

+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

+import edu.uci.ics.hyracks.api.job.JobId;

+import edu.uci.ics.hyracks.api.job.JobSpecification;

+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;

+import edu.uci.ics.hyracks.control.common.controllers.CCConfig;

+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;

+import edu.uci.ics.hyracks.control.nc.NodeControllerService;

+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;

+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;

+import edu.uci.ics.hyracks.data.std.primitive.LongPointable;

+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;

+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;

+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;

+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;

+import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;

+

+public class Tester {

+

+    private static final Logger LOGGER = Logger.getLogger(Tester.class.getName());

+    public static final String NC1_ID = "nc1";

+    public static final String NC2_ID = "nc2";

+

+    private static ClusterControllerService cc;

+    private static NodeControllerService nc1;

+    private static NodeControllerService nc2;

+    private static IHyracksClientConnection hcc;

+

+    //private static final boolean DEBUG = true;

+

+    public static void main(String[] args) throws Exception {

+

+        LOGGER.setLevel(Level.OFF);

+

+        init();

+

+        // Options options = new Options();

+

+        IHyracksClientConnection hcc = new HyracksConnection("127.0.0.1", 39000);

+

+        /*

+         * JobSpecification job =

+         * createJob(parseFileSplits(options.inFileCustomerSplits),

+         * parseFileSplits(options.inFileOrderSplits),

+         * parseFileSplits(options.outFileSplits), options.numJoinPartitions,

+         * options.algo, options.graceInputSize, options.graceRecordsPerFrame,

+         * options.graceFactor, options.memSize, options.tableSize,

+         * options.hasGroupBy);

+         */

+

+        int k, page_num;

+        String file_name = args[0];

+        k = Integer.parseInt(args[1]);

+        page_num = Integer.parseInt(args[2]);

+

+        JobSpecification job = createJob(file_name, k, page_num);

+

+        long start = System.currentTimeMillis();

+        JobId jobId = hcc.startJob("test", job);

+        hcc.waitForCompletion(jobId);

+        long end = System.currentTimeMillis();

+        System.err.println(start + " " + end + " " + (end - start));

+    }

+

+    //    private static FileSplit[] parseFileSplits(String fileSplits) {

+    //        String[] splits = fileSplits.split(",");

+    //        FileSplit[] fSplits = new FileSplit[splits.length];

+    //        for (int i = 0; i < splits.length; ++i) {

+    //            String s = splits[i].trim();

+    //            int idx = s.indexOf(':');

+    //            if (idx < 0) {

+    //                throw new IllegalArgumentException("File split " + s + " not well formed");

+    //            }

+    //            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));

+    //        }

+    //        return fSplits;

+    //    }

+

+    public static void init() throws Exception {

+        CCConfig ccConfig = new CCConfig();

+        ccConfig.clientNetIpAddress = "127.0.0.1";

+        ccConfig.clientNetPort = 39000;

+        ccConfig.clusterNetIpAddress = "127.0.0.1";

+        ccConfig.clusterNetPort = 39001;

+        ccConfig.profileDumpPeriod = 10000;

+        File outDir = new File("target/ClusterController");

+        outDir.mkdirs();

+        File ccRoot = File.createTempFile(Tester.class.getName(), ".data", outDir);

+        ccRoot.delete();

+        ccRoot.mkdir();

+        ccConfig.ccRoot = ccRoot.getAbsolutePath();

+        cc = new ClusterControllerService(ccConfig);

+        cc.start();

+

+        NCConfig ncConfig1 = new NCConfig();

+        ncConfig1.ccHost = "localhost";

+        ncConfig1.ccPort = 39001;

+        ncConfig1.clusterNetIPAddress = "127.0.0.1";

+        ncConfig1.dataIPAddress = "127.0.0.1";

+        ncConfig1.nodeId = NC1_ID;

+        nc1 = new NodeControllerService(ncConfig1);

+        nc1.start();

+

+        NCConfig ncConfig2 = new NCConfig();

+        ncConfig2.ccHost = "localhost";

+        ncConfig2.ccPort = 39001;

+        ncConfig2.clusterNetIPAddress = "127.0.0.1";

+        ncConfig2.dataIPAddress = "127.0.0.1";

+        ncConfig2.nodeId = NC2_ID;

+        nc2 = new NodeControllerService(ncConfig2);

+        nc2.start();

+

+        hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);

+        hcc.createApplication("test", null);

+        if (LOGGER.isLoggable(Level.INFO)) {

+            LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());

+        }

+    }

+

+    private static JobSpecification createJob(String filename, int k, int page_num) throws HyracksDataException {

+        JobSpecification spec = new JobSpecification();

+

+        /*

+         * IFileSplitProvider custSplitsProvider = new

+         * ConstantFileSplitProvider(customerSplits);

+         * //ConstantFileSplitProvider is the operator to provide the tuples

+         * RecordDescriptor custDesc = new RecordDescriptor(new

+         * ISerializerDeserializer[] {

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE });

+         * 

+         * IFileSplitProvider ordersSplitsProvider = new

+         * ConstantFileSplitProvider(orderSplits); RecordDescriptor ordersDesc =

+         * new RecordDescriptor(new ISerializerDeserializer[] {

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE });

+         * 

+         * RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new

+         * ISerializerDeserializer[] {

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * UTF8StringSerializerDeserializer.INSTANCE });

+         * 

+         * FileScanOperatorDescriptor ordScanner = new

+         * FileScanOperatorDescriptor(spec, ordersSplitsProvider, new

+         * DelimitedDataTupleParserFactory(new IValueParserFactory[] {

+         * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,

+         * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,

+         * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,

+         * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,

+         * UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);

+         * createPartitionConstraint(spec, ordScanner, orderSplits);

+         * 

+         * FileScanOperatorDescriptor custScanner = new

+         * FileScanOperatorDescriptor(spec, custSplitsProvider, new

+         * DelimitedDataTupleParserFactory(new IValueParserFactory[] {

+         * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,

+         * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,

+         * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,

+         * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE },

+         * '|'), custDesc); createPartitionConstraint(spec, custScanner,

+         * customerSplits);

+         * 

+         * IOperatorDescriptor join;

+         * 

+         * if ("nestedloop".equalsIgnoreCase(algo)) { join = new

+         * NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(

+         * PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0,

+         * 1), custOrderJoinDesc, memSize);

+         * 

+         * } else if ("gracehash".equalsIgnoreCase(algo)) { join = new

+         * GraceHashJoinOperatorDescriptor( spec, memSize, graceInputSize,

+         * graceRecordsPerFrame, graceFactor, new int[] { 0 }, new int[] { 1 },

+         * new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+         * .of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] {

+         * PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },

+         * custOrderJoinDesc);

+         * 

+         * } else if ("hybridhash".equalsIgnoreCase(algo)) { join = new

+         * HybridHashJoinOperatorDescriptor( spec, memSize, graceInputSize,

+         * graceRecordsPerFrame, graceFactor, new int[] { 0 }, new int[] { 1 },

+         * new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+         * .of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] {

+         * PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },

+         * custOrderJoinDesc);

+         * 

+         * } else { join = new InMemoryHashJoinOperatorDescriptor( spec, new

+         * int[] { 0 }, new int[] { 1 }, new IBinaryHashFunctionFactory[] {

+         * PointableBinaryHashFunctionFactory .of(UTF8StringPointable.FACTORY)

+         * }, new IBinaryComparatorFactory[] {

+         * PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },

+         * custOrderJoinDesc, 6000000); }

+         * 

+         * PartitionConstraintHelper.addPartitionCountConstraint(spec, join,

+         * numJoinPartitions);

+         * 

+         * IConnectorDescriptor ordJoinConn = new

+         * MToNPartitioningConnectorDescriptor(spec, new

+         * FieldHashPartitionComputerFactory(new int[] { 1 }, new

+         * IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+         * .of(UTF8StringPointable.FACTORY) })); spec.connect(ordJoinConn,

+         * ordScanner, 0, join, 1);

+         * 

+         * IConnectorDescriptor custJoinConn = new

+         * MToNPartitioningConnectorDescriptor(spec, new

+         * FieldHashPartitionComputerFactory(new int[] { 0 }, new

+         * IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+         * .of(UTF8StringPointable.FACTORY) })); spec.connect(custJoinConn,

+         * custScanner, 0, join, 0);

+         * 

+         * IOperatorDescriptor endingOp = join;

+         * 

+         * if (hasGroupBy) {

+         * 

+         * RecordDescriptor groupResultDesc = new RecordDescriptor(new

+         * ISerializerDeserializer[] {

+         * UTF8StringSerializerDeserializer.INSTANCE,

+         * IntegerSerializerDeserializer.INSTANCE });

+         * 

+         * HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(

+         * spec, new int[] { 6 }, new FieldHashPartitionComputerFactory(new

+         * int[] { 6 }, new IBinaryHashFunctionFactory[] {

+         * PointableBinaryHashFunctionFactory .of(UTF8StringPointable.FACTORY)

+         * }), new IBinaryComparatorFactory[] {

+         * PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },

+         * new MultiFieldsAggregatorFactory( new

+         * IFieldAggregateDescriptorFactory[] { new

+         * CountFieldAggregatorFactory(true) }), groupResultDesc, 16);

+         * createPartitionConstraint(spec, gby, resultSplits);

+         * 

+         * IConnectorDescriptor joinGroupConn = new

+         * MToNPartitioningConnectorDescriptor(spec, new

+         * FieldHashPartitionComputerFactory(new int[] { 6 }, new

+         * IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+         * .of(UTF8StringPointable.FACTORY) })); spec.connect(joinGroupConn,

+         * join, 0, gby, 0);

+         * 

+         * endingOp = gby; }

+         */

+

+        // IOperatorDescriptor printer = DEBUG ? new

+        // PrinterOperatorDescriptor(spec)

+        // : new NullSinkOperatorDescriptor(spec);

+        // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+        // printer, NC1_ID);

+

+        spec.setFrameSize(128);

+

+        FileScanDescriptor scan = new FileScanDescriptor(spec, k);

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID, NC2_ID);

+

+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {

+                Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,

+                IntegerSerializerDeserializer.INSTANCE });

+

+        int[] keyFields = new int[] { 0 };

+        int frameLimits = 10;

+        int tableSize = 128;

+

+        ExternalGroupOperatorDescriptor single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,

+                frameLimits,

+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },

+                new Integer64NormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),

+                // new IntSumFieldAggregatorFactory(1, false) }),

+

+                new MergeKmerAggregateFactory(),

+                // new IntSumFieldAggregatorFactory(1, false) }),

+                outputRec, new HashSpillableTableFactory(

+                        new FieldHashPartitionComputerFactory(keyFields,

+                                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+                                        .of(LongPointable.FACTORY) }), tableSize), true);

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID, NC2_ID);

+

+        IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(spec);

+        spec.connect(readfileConn, scan, 0, single_grouper, 0);

+

+        IConnectorDescriptor conn_partition = new MToNPartitioningConnectorDescriptor(spec,

+                new KmerHashPartitioncomputerFactory());

+

+        ExternalGroupOperatorDescriptor cross_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,

+                frameLimits,

+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },

+                new Integer64NormalizedKeyComputerFactory(), new DistributedMergeLmerAggregateFactory(),

+                // new IntSumFieldAggregatorFactory(1, false) }),

+

+                new DistributedMergeLmerAggregateFactory(),

+                // new IntSumFieldAggregatorFactory(1, false) }),

+                outputRec, new HashSpillableTableFactory(

+                        new FieldHashPartitionComputerFactory(keyFields,

+                                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+                                        .of(LongPointable.FACTORY) }), tableSize), true);

+

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper, NC1_ID, NC2_ID);

+        spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);

+

+        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID, NC2_ID);

+

+        IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);

+        spec.connect(printConn, cross_grouper, 0, printer, 0);

+

+        //spec.connect(printConn, cross_grouper, 0, printer, 0);

+

+        /*

+         * GenKmerDescriptor kmerGen = new GenKmerDescriptor(spec, page_num, k);

+         * PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+         * kmerGen, NC1_ID);

+         * 

+         * IConnectorDescriptor readfileConn = new

+         * OneToOneConnectorDescriptor(spec); spec.connect(readfileConn, scan,

+         * 0, kmerGen, 0);

+         * 

+         * PrinterOperatorDescriptor printer = new

+         * PrinterOperatorDescriptor(spec);

+         * PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+         * printer, NC1_ID);

+         * 

+         * IConnectorDescriptor printConn = new

+         * OneToOneConnectorDescriptor(spec); //spec.connect(printConn, scan, 0,

+         * printer, 0);

+         * 

+         * //IConnectorDescriptor printConn = new

+         * OneToOneConnectorDescriptor(spec); spec.connect(printConn, kmerGen,

+         * 0, printer, 0);

+         * 

+         * //IFileSplitProvider outSplitProvider = new

+         * ConstantFileSplitProvider(resultSplits);

+         * //FrameFileWriterOperatorDescriptor writer = new

+         * FrameFileWriterOperatorDescriptor(spec, outSplitProvider);

+         * //createPartitionConstraint(spec, writer, resultSplits);

+         * 

+         * //IConnectorDescriptor endingPrinterConn = new

+         * OneToOneConnectorDescriptor(spec); //spec.connect(endingPrinterConn,

+         * endingOp, 0, writer, 0);

+         */

+

+        spec.addRoot(printer);

+

+        // System.out.println(spec.toString());

+        return spec;

+    }

+

+    //    private static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {

+    //        String[] parts = new String[splits.length];

+    //        for (int i = 0; i < splits.length; ++i) {

+    //            parts[i] = splits[i].getNodeName();

+    //        }

+    //        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);

+    //    }

+

+    static class JoinComparatorFactory implements ITuplePairComparatorFactory {

+        private static final long serialVersionUID = 1L;

+

+        private final IBinaryComparatorFactory bFactory;

+        private final int pos0;

+        private final int pos1;

+

+        public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {

+            this.bFactory = bFactory;

+            this.pos0 = pos0;

+            this.pos1 = pos1;

+        }

+

+        @Override

+        public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {

+            return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);

+        }

+    }

+

+    static class JoinComparator implements ITuplePairComparator {

+

+        private final IBinaryComparator bComparator;

+        private final int field0;

+        private final int field1;

+

+        public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {

+            this.bComparator = bComparator;

+            this.field0 = field0;

+            this.field1 = field1;

+        }

+

+        @Override

+        public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) {

+            int tStart0 = accessor0.getTupleStartOffset(tIndex0);

+            int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;

+

+            int tStart1 = accessor1.getTupleStartOffset(tIndex1);

+            int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;

+

+            int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);

+            int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);

+            int fLen0 = fEnd0 - fStart0;

+

+            int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);

+            int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);

+            int fLen1 = fEnd1 - fStart1;

+

+            int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1

+                    .getBuffer().array(), fStart1 + fStartOffset1, fLen1);

+            if (c != 0) {

+                return c;

+            }

+            return 0;

+        }

+    }

+

+}

diff --git a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
new file mode 100644
index 0000000..2c70a6e
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
@@ -0,0 +1,160 @@
+package edu.uci.ics.genomix.dataflow.aggregators;

+

+import java.io.DataOutput;

+import java.io.IOException;

+import java.nio.ByteBuffer;

+

+import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;

+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;

+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;

+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;

+

+public class DistributedMergeLmerAggregateFactory implements IAggregatorDescriptorFactory {

+    private static final long serialVersionUID = 1L;

+

+    public DistributedMergeLmerAggregateFactory() {

+    }

+

+    @Override

+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,

+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)

+            throws HyracksDataException {

+        return new IAggregatorDescriptor() {

+

+            @Override

+            public void reset() {

+            }

+

+            @Override

+            public void close() {

+                // TODO Auto-generated method stub

+

+            }

+

+            @Override

+            public AggregateState createAggregateStates() {

+                // TODO Auto-generated method stub

+                return new AggregateState(new Object() {

+                });

+            }

+

+            @Override

+            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

+                    AggregateState state) throws HyracksDataException {

+                byte bitmap = 0;

+                int count = 0;

+                int tupleOffset = accessor.getTupleStartOffset(tIndex);

+                int fieldStart = accessor.getFieldStartOffset(tIndex, 1);

+                bitmap |= ByteSerializerDeserializer.getByte(accessor.getBuffer().array(),

+                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart);

+

+                tupleOffset = accessor.getTupleStartOffset(tIndex);

+                fieldStart = accessor.getFieldStartOffset(tIndex, 2);

+                int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();

+

+                count += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), offset);

+

+                DataOutput fieldOutput = tupleBuilder.getDataOutput();

+                try {

+                    fieldOutput.writeByte(bitmap);

+                    tupleBuilder.addFieldEndOffset();

+                    fieldOutput.writeInt(count);

+                    tupleBuilder.addFieldEndOffset();

+                } catch (IOException e) {

+                    throw new HyracksDataException("I/O exception when initializing the aggregator.");

+                }

+

+            }

+

+            @Override

+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,

+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {

+                // TODO Auto-generated method stub

+                byte bitmap = 0;

+                int count = 0;

+

+                int tupleOffset = accessor.getTupleStartOffset(tIndex);

+                int fieldStart = accessor.getFieldStartOffset(tIndex, 1);

+                int offset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;

+                bitmap |= ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);

+

+                tupleOffset = accessor.getTupleStartOffset(tIndex);

+                fieldStart = accessor.getFieldStartOffset(tIndex, 2);

+                offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();

+                count += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), offset);

+

+                int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);

+                int statefieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);

+                int stateoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + statefieldStart;

+

+                byte[] data = stateAccessor.getBuffer().array();

+

+                ByteBuffer buf = ByteBuffer.wrap(data);

+                bitmap |= buf.getChar(stateoffset);

+                count += buf.getInt(stateoffset + 1);

+                buf.put(stateoffset, bitmap);

+                buf.putInt(stateoffset + 1, count);

+            }

+

+            @Override

+            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

+                    AggregateState state) throws HyracksDataException {

+                // TODO Auto-generated method stub

+                byte bitmap;

+                int count;

+                DataOutput fieldOutput = tupleBuilder.getDataOutput();

+                byte[] data = accessor.getBuffer().array();

+                int tupleOffset = accessor.getTupleStartOffset(tIndex);

+                int fieldOffset = accessor.getFieldStartOffset(tIndex, 1);

+

+                int offset = fieldOffset + accessor.getFieldSlotsLength() + tupleOffset;

+                bitmap = ByteSerializerDeserializer.getByte(data, offset);

+

+                count = IntegerSerializerDeserializer.getInt(data, offset + 1);

+                try {

+                    fieldOutput.writeByte(bitmap);

+                    tupleBuilder.addFieldEndOffset();

+                    fieldOutput.writeInt(count);

+                    tupleBuilder.addFieldEndOffset();

+                } catch (IOException e) {

+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");

+                }

+

+            }

+

+            @Override

+            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

+                    AggregateState state) throws HyracksDataException {

+                // TODO Auto-generated method stub

+                byte bitmap;

+                int count;

+

+                byte[] data = accessor.getBuffer().array();

+                int tupleOffset = accessor.getTupleStartOffset(tIndex);

+                int fieldOffset = accessor.getFieldStartOffset(tIndex, 1);

+                int offset = tupleOffset + accessor.getFieldSlotsLength() + fieldOffset;

+

+                bitmap = ByteSerializerDeserializer.getByte(data, offset);

+                count = IntegerSerializerDeserializer.getInt(data, offset + 1);

+

+                DataOutput fieldOutput = tupleBuilder.getDataOutput();

+                try {

+                    fieldOutput.writeByte(bitmap);

+                    tupleBuilder.addFieldEndOffset();

+                    fieldOutput.writeInt(count);

+                    tupleBuilder.addFieldEndOffset();

+                } catch (IOException e) {

+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");

+                }

+            }

+

+        };

+    }

+

+}

diff --git a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
new file mode 100644
index 0000000..b6d3492
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -0,0 +1,151 @@
+package edu.uci.ics.genomix.dataflow.aggregators;

+

+import java.io.DataOutput;

+import java.io.IOException;

+import java.nio.ByteBuffer;

+

+import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;

+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;

+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;

+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;

+

+public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {

+    private static final long serialVersionUID = 1L;

+

+    public MergeKmerAggregateFactory() {

+    }

+

+    @Override

+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,

+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)

+            throws HyracksDataException {

+        return new IAggregatorDescriptor() {

+

+            @Override

+            public void reset() {

+            }

+

+            @Override

+            public void close() {

+                // TODO Auto-generated method stub

+

+            }

+

+            @Override

+            public AggregateState createAggregateStates() {

+                // TODO Auto-generated method stub

+                return new AggregateState(new Object() {

+                });

+            }

+

+            @Override

+            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

+                    AggregateState state) throws HyracksDataException {

+                byte bitmap = 0;

+                int count = 0;

+                int tupleOffset = accessor.getTupleStartOffset(tIndex);

+                int fieldStart = accessor.getFieldStartOffset(tIndex, 1);

+                bitmap |= ByteSerializerDeserializer.getByte(accessor.getBuffer().array(),

+                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart);

+                count += 1;

+                DataOutput fieldOutput = tupleBuilder.getDataOutput();

+                try {

+                    fieldOutput.writeByte(bitmap);

+                    tupleBuilder.addFieldEndOffset();

+                    fieldOutput.writeInt(count);

+                    tupleBuilder.addFieldEndOffset();

+                } catch (IOException e) {

+                    throw new HyracksDataException("I/O exception when initializing the aggregator.");

+                }

+

+            }

+

+            @Override

+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,

+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {

+                // TODO Auto-generated method stub

+                byte bitmap = 0;

+                int count = 0;

+

+                int tupleOffset = accessor.getTupleStartOffset(tIndex);

+                int fieldStart = accessor.getFieldStartOffset(tIndex, 1);

+                int offset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;

+

+                bitmap |= ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);

+                count += 1;

+

+                int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);

+                int statefieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);

+                int stateoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + statefieldStart;

+

+                byte[] data = stateAccessor.getBuffer().array();

+

+                ByteBuffer buf = ByteBuffer.wrap(data);

+                bitmap |= buf.getChar(stateoffset);

+                count += buf.getInt(stateoffset + 1);

+                buf.put(stateoffset, bitmap);

+                buf.putInt(stateoffset + 1, count);

+            }

+

+            @Override

+            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

+                    AggregateState state) throws HyracksDataException {

+                // TODO Auto-generated method stub

+                byte bitmap;

+                int count;

+                DataOutput fieldOutput = tupleBuilder.getDataOutput();

+                byte[] data = accessor.getBuffer().array();

+                int tupleOffset = accessor.getTupleStartOffset(tIndex);

+                int fieldOffset = accessor.getFieldStartOffset(tIndex, 1);

+

+                int offset = fieldOffset + accessor.getFieldSlotsLength() + tupleOffset;

+                bitmap = ByteSerializerDeserializer.getByte(data, offset);

+

+                count = IntegerSerializerDeserializer.getInt(data, offset + 1);

+                try {

+                    fieldOutput.writeByte(bitmap);

+                    tupleBuilder.addFieldEndOffset();

+                    fieldOutput.writeInt(count);

+                    tupleBuilder.addFieldEndOffset();

+                } catch (IOException e) {

+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");

+                }

+

+            }

+

+            @Override

+            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

+                    AggregateState state) throws HyracksDataException {

+                // TODO Auto-generated method stub

+                byte bitmap;

+                int count;

+

+                byte[] data = accessor.getBuffer().array();

+                int tupleOffset = accessor.getTupleStartOffset(tIndex);

+                int fieldOffset = accessor.getFieldStartOffset(tIndex, 1);

+                int offset = tupleOffset + accessor.getFieldSlotsLength() + fieldOffset;

+

+                bitmap = ByteSerializerDeserializer.getByte(data, offset);

+                count = IntegerSerializerDeserializer.getInt(data, offset + 1);

+

+                DataOutput fieldOutput = tupleBuilder.getDataOutput();

+                try {

+                    fieldOutput.writeByte(bitmap);

+                    tupleBuilder.addFieldEndOffset();

+                    fieldOutput.writeInt(count);

+                    tupleBuilder.addFieldEndOffset();

+                } catch (IOException e) {

+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");

+                }

+            }

+

+        };

+    }

+

+}