Revised FileScan

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2911 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java
index 77cfd6e..3b8f22c 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java
@@ -64,6 +64,7 @@
 			if (bytes[i] != 0) {

 				break;

 			}

+			i += 1;

 		}

 		return bytes;

 	}

@@ -78,30 +79,41 @@
 	public int compareTo(byte[] bytes, int start, int length) {

 

 		int be = this.start;

-		int n = (this.bytes[be] < bytes[start]) ? this.bytes[be] : bytes[start];

+		

+		if(this.bytes[be] != bytes[start]){

+			return (this.bytes[be] < bytes[start]) ? -1 : 1;

+		}

+		

+		int n = this.bytes[be];

 		int l = (int) Math.ceil(n / 4);

 		for (int i = 0; i <= l; i++) {

-			if (this.bytes[i] < bytes[i]) {

+			if (this.bytes[be + i] < bytes[start + i]) {

 				return -1;

-			} else if (this.bytes[i] > bytes[i]) {

+			} else if (this.bytes[be + i] > bytes[start + i]) {

 				return 1;

 			}

 

 		}

-

-		return (this.bytes[be] < bytes[start]) ? -1

-				: ((this.bytes[be] > bytes[start]) ? 1 : 0);

+		return 0;

 	}

 

 	@Override

 	public int hash() {// BKDRHash

-		int seed = 131; // 31 131 1313 13131 131313 etc..

+		int hash = 1;

+		for (int i = start + 1; i <= start + length; i++)

+			hash = (31 * hash) + (int)bytes[i];

+		if(hash < 0){

+			hash = -hash;

+		}

+		//System.err.println(hash);

+		return hash;

+/*		int seed = 131; // 31 131 1313 13131 131313 etc..

 		int hash = 0;

 		int l = (int) Math.ceil((double) bytes[start] / 4.0);

 		for (int i = start + 1; i <= start + l; i++) {

 			hash = hash * seed + bytes[i];

 		}

-		return (hash & 0x7FFFFFFF);

+		return (hash & 0x7FFFFFFF);*/

 	}

 

 	@Override

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
index ebc5346..f0b277a 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
@@ -93,15 +93,19 @@
 								new InputStreamReader(

 										new FileInputStream(fa[i])));

 						String read = readsfile.readLine();

+						int count  = 0;

 						while (read != null) {

 							read = readsfile.readLine();

+							//if(count % 4 == 1)

 							SplitReads(read.getBytes());

 							// read.getBytes();

-

 							read = readsfile.readLine();

+							

 							read = readsfile.readLine();

 

 							read = readsfile.readLine();

+							count += 1;

+							System.err.println(count);

 						}

 					}

 					if (outputAppender.getTupleCount() > 0) {

@@ -220,14 +224,15 @@
 				bytes[i] <<= 2;

 				bytes[i] &= filter2;

 				i -= 1;

-				while (i > 0) {

+				while (i >= 0) {

 					byte f = (byte) (bytes[i] & filter0);

 					f >>= 6;

 					bytes[i + 1] |= f;

 					bytes[i] <<= 2;

 					bytes[i] &= filter1;

+					i -= 1;

 				}

-				bytes[i + 1] |= ConvertSymbol(c);

+				bytes[0] |= ConvertSymbol(c);

 			}

 

 			private void SplitReads(byte[] array) {

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
index f455604..22cb0f7 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -136,7 +136,7 @@
         nc1 = new NodeControllerService(ncConfig1);

         nc1.start();

 

-        NCConfig ncConfig2 = new NCConfig();

+  /*      NCConfig ncConfig2 = new NCConfig();

         ncConfig2.ccHost = "localhost";

         ncConfig2.ccPort = 39001;

         ncConfig2.clusterNetIPAddress = "127.0.0.1";

@@ -161,7 +161,7 @@
         ncConfig4.dataIPAddress = "127.0.0.1";

         ncConfig4.nodeId = NC4_ID;

         nc4 = new NodeControllerService(ncConfig4);

-        nc4.start();

+        nc4.start();*/

 

         hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);

         hcc.createApplication("test", null);

@@ -174,11 +174,11 @@
         JobSpecification spec = new JobSpecification();

 

         //spec.setFrameSize(32768);

-        spec.setFrameSize(64);

+        spec.setFrameSize(32768);

 

         FileScanDescriptor scan = new FileScanDescriptor(spec, k, filename);

-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

-        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID);

+        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID);

 

         RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {null, ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE});

                 //Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,

@@ -271,21 +271,21 @@
                     outputRec, true);            

         }

         

-        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID);

-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID);

+        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

         

         IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(spec);

         spec.connect(readfileConn, scan, 0, single_grouper, 0);

         

 

-        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper,NC1_ID);

-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper,NC1_ID);

+        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

         spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);

 

         //PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec, "G:\\data\\result");

-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);

-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

-        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);

+        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec, "G:\\data\\result");

+        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);

 

         IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);

         spec.connect(printConn, cross_grouper, 0, printer, 0);