finished Driver for genomix
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2858 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-core/pom.xml b/genomix/genomix-core/pom.xml
index b88c75d..4cd78c8 100644
--- a/genomix/genomix-core/pom.xml
+++ b/genomix/genomix-core/pom.xml
@@ -216,5 +216,27 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>0.20.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-test</artifactId>
+ <version>0.20.2</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
index 1a3b927..3f0f194 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
@@ -6,6 +6,8 @@
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
+import org.apache.hadoop.fs.Path;
+
import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
@@ -13,6 +15,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -23,14 +26,14 @@
private static final long serialVersionUID = 1L;
private int k;
+ private Path [] filesplit = null ;
+ private String pathSurfix ;
private int byteNum;
- private String filename;
- public FileScanDescriptor(IOperatorDescriptorRegistry spec, int k, String filename) {
+ public FileScanDescriptor(IOperatorDescriptorRegistry spec, int k, String path) {
super(spec, 0, 1);
- // TODO Auto-generated constructor stub
this.k = k;
- this.filename = filename;
+ this.pathSurfix = path;
byteNum = (byte)Math.ceil((double)k/4.0);
//recordDescriptors[0] = news RecordDescriptor(
@@ -39,7 +42,19 @@
null, null});
}
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ public FileScanDescriptor(JobSpecification jobSpec, int kmers,
+ Path[] inputPaths) {
+ super(jobSpec, 0, 1);
+ this.k = k;
+ this.filesplit = inputPaths;
+ this.pathSurfix = inputPaths[0].toString();
+ //recordDescriptors[0] = news RecordDescriptor(
+ // new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+ recordDescriptors[0] = new RecordDescriptor(new ISerializerDeserializer[] {
+ null, ByteSerializerDeserializer.INSTANCE });
+ }
+
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
final int temp = partition;
@@ -60,7 +75,7 @@
outputAppender.reset(outputBuffer, true);
try {// one try with multiple catch?
writer.open();
- String s = filename + String.valueOf(temp);
+ String s = pathSurfix + String.valueOf(temp);
File tf = new File(s);
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
index 06bde29..0b0aa07 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -184,8 +184,8 @@
//ByteSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
- int frameLimits = 4096;
- int tableSize = 10485767;
+ int frameLimits = 4096; // hyracks oriented
+ int tableSize = 10485767; // hyracks oriented
AbstractOperatorDescriptor single_grouper;
IConnectorDescriptor conn_partition;
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java
new file mode 100644
index 0000000..93137d7
--- /dev/null
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java
@@ -0,0 +1,111 @@
+package edu.uci.ics.genomix.driver;
+
+import java.net.URL;
+import java.util.EnumSet;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import edu.uci.ics.genomix.job.GenomixJob;
+import edu.uci.ics.genomix.job.JobGen;
+import edu.uci.ics.genomix.job.JobGenBrujinGraph;
+import edu.uci.ics.genomix.job.JobGenContigsGeneration;
+import edu.uci.ics.genomix.job.JobGenGraphCleanning;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class Driver {
+ public static enum Plan {
+ BUILD_DEBRUJIN_GRAPH,
+ GRAPH_CLEANNING,
+ CONTIGS_GENERATION,
+ }
+
+ private static final String applicationName = "genomix";
+ private static final Log LOG = LogFactory.getLog(Driver.class);
+ private JobGen jobGen;
+ private boolean profiling;
+
+ private int numPartitionPerMachine;
+
+ private IHyracksClientConnection hcc;
+
+ public Driver(String ipAddress, int port, int numPartitionPerMachine) throws HyracksException{
+ try{
+ hcc = new HyracksConnection(ipAddress, port);
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ this.numPartitionPerMachine = numPartitionPerMachine;
+ }
+
+ public void runJob(GenomixJob job) throws HyracksException {
+ runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
+ }
+
+ public void runJob(GenomixJob job, Plan planChoice, boolean profiling)
+ throws HyracksException {
+ /** add hadoop configurations */
+ //TODO need to include the hadoophome to the classpath in the way below
+ URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
+ job.getConfiguration().addResource(hadoopCore);
+ URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
+ job.getConfiguration().addResource(hadoopMapRed);
+ URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
+ job.getConfiguration().addResource(hadoopHdfs);
+
+ LOG.info("job started");
+ long start = System.currentTimeMillis();
+ long end = start;
+ long time = 0;
+
+ this.profiling = profiling;
+ try {
+ Map<String, NodeControllerInfo> ncMap = hcc.getNodeControllerInfos();
+ switch (planChoice) {
+ case BUILD_DEBRUJIN_GRAPH:default:
+ jobGen = new JobGenBrujinGraph(job, ncMap, numPartitionPerMachine);
+ break;
+ case GRAPH_CLEANNING:
+ jobGen = new JobGenGraphCleanning(job);
+ break;
+ case CONTIGS_GENERATION:
+ jobGen = new JobGenContigsGeneration(job);
+ break;
+ }
+
+ start = System.currentTimeMillis();
+ runCreate(jobGen);
+ end = System.currentTimeMillis();
+ time = end - start;
+ LOG.info("result writing finished " + time + "ms");
+ LOG.info("job finished");
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ private void runCreate(JobGen jobGen) throws Exception {
+ try {
+ JobSpecification createJob = jobGen.generateJob();
+ execute(createJob);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
+ private void execute(JobSpecification job) throws Exception {
+ job.setUseConnectorPolicyForScheduling(false);
+ JobId jobId = hcc.startJob(applicationName, job,
+ profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+ hcc.waitForCompletion(jobId);
+ }
+}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
new file mode 100644
index 0000000..24a7561
--- /dev/null
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
@@ -0,0 +1,59 @@
+package edu.uci.ics.genomix.job;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+
+public class GenomixJob extends Job {
+
+ /** Kmers length */
+ public static final String KMER_LENGTH = "genomix.kmer";
+ /** Frame Size */
+ public static final String FRAME_SIZE = "genomix.framesize";
+ /** Frame Limit, hyracks need */
+ public static final String FRAME_LIMIT = "genomix.framelimit";
+ /** Table Size, hyracks need */
+ public static final String TABLE_SIZE = "genomix.tablesize";
+ /** Groupby types ? */
+ public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
+
+
+ /** Configurations used by hybrid groupby function in graph build phrase */
+ public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
+ public static final String GROUPBY_HYBRID_INPUTKEYS = "genomix.graph.groupby.hybrid.inputkeys";
+ public static final String GROUPBY_HYBRID_RECORDSIZE_SINGLE = "genomix.graph.groupby.hybrid.recordsize.single";
+ public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
+ public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
+
+ public GenomixJob(String jobname) throws IOException {
+ super(new Configuration(), jobname);
+ }
+
+ public GenomixJob(Configuration conf, String jobName) throws IOException {
+ super(conf, jobName);
+ }
+
+ /**
+ * Set the kmer length
+ *
+ * @param the
+ * desired frame size
+ */
+ final public void setKmerLength(int kmerlength) {
+ getConfiguration().setInt(KMER_LENGTH, kmerlength);
+ }
+
+ final public void setFrameSize(int frameSize) {
+ getConfiguration().setInt(FRAME_SIZE, frameSize);
+ }
+
+ final public void setFrameLimit(int frameLimit) {
+ getConfiguration().setInt(FRAME_LIMIT, frameLimit);
+ }
+
+ final public void setTableSize(int tableSize) {
+ getConfiguration().setInt(TABLE_SIZE, tableSize);
+ }
+
+}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGen.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGen.java
new file mode 100644
index 0000000..66bf79d
--- /dev/null
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGen.java
@@ -0,0 +1,27 @@
+package edu.uci.ics.genomix.job;
+
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public abstract class JobGen {
+
+ protected final Configuration conf;
+ protected final GenomixJob genomixJob;
+ protected String jobId = new UUID(System.currentTimeMillis(),
+ System.nanoTime()).toString();
+
+ public JobGen(GenomixJob job) {
+ this.conf = job.getConfiguration();
+ this.genomixJob = job;
+ this.initJobConfiguration();
+ }
+
+ protected abstract void initJobConfiguration();
+
+ public abstract JobSpecification generateJob () throws HyracksDataException;
+
+}
\ No newline at end of file
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
new file mode 100644
index 0000000..067ad37
--- /dev/null
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
@@ -0,0 +1,230 @@
+package edu.uci.ics.genomix.job;
+
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import edu.uci.ics.genomix.data.normalizers.Integer64NormalizedKeyComputerFactory;
+import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;
+import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.data.std.accessors.LongBinaryHashFunctionFamily;
+import edu.uci.ics.genomix.dataflow.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.genomix.dataflow.FileScanDescriptor;
+import edu.uci.ics.genomix.dataflow.PrinterOperatorDescriptor;
+import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;
+import edu.uci.ics.genomix.dataflow.aggregators.MergeKmerAggregateFactory;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.LongPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+
+public class JobGenBrujinGraph extends JobGen {
+ public enum GroupbyType {
+ EXTERNAL, PRECLUSTER, HYBRIDHASH,
+ }
+
+ private final Map<String, NodeControllerInfo> ncMap;
+ private String [] ncNodeNames;
+
+ private int kmers;
+ private int frameLimits;
+ private int tableSize;
+ private Path[] inputPaths;
+ private Path outputPath;
+ private GroupbyType groupbyType;
+
+ private AbstractOperatorDescriptor singleGrouper;
+ private IConnectorDescriptor connPartition;
+ private AbstractOperatorDescriptor crossGrouper;
+ private RecordDescriptor outputRec;
+
+ public JobGenBrujinGraph(GenomixJob job,
+ final Map<String, NodeControllerInfo> ncMap, int numPartitionPerMachine) {
+ super(job);
+ this.ncMap = ncMap;
+ String [] nodes = new String[ncMap.size()];
+ ncMap.keySet().toArray(nodes);
+ ncNodeNames = new String[nodes.length * numPartitionPerMachine];
+ for (int i = 0; i < numPartitionPerMachine; i++){
+ System.arraycopy(nodes, 0, ncNodeNames, i*nodes.length, nodes.length);
+ }
+ }
+
+ private ExternalGroupOperatorDescriptor newExternalGroupby(
+ JobSpecification jobSpec, int[] keyFields, IAggregatorDescriptorFactory aggeragater) {
+ return new ExternalGroupOperatorDescriptor(
+ jobSpec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(LongPointable.FACTORY) },
+ new Integer64NormalizedKeyComputerFactory(),
+ aggeragater,
+ new DistributedMergeLmerAggregateFactory(),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(LongPointable.FACTORY) }),
+ tableSize), true);
+ }
+
+ private HybridHashGroupOperatorDescriptor newHybridGroupby(
+ JobSpecification jobSpec, int[] keyFields,
+ long inputSizeInRawRecords, long inputSizeInUniqueKeys,
+ int recordSizeInBytes, int hashfuncStartLevel)
+ throws HyracksDataException {
+ return new HybridHashGroupOperatorDescriptor(
+ jobSpec,
+ keyFields,
+ frameLimits,
+ inputSizeInRawRecords,
+ inputSizeInUniqueKeys,
+ recordSizeInBytes,
+ tableSize,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(LongPointable.FACTORY) },
+ new IBinaryHashFunctionFamily[] { new LongBinaryHashFunctionFamily() },
+ // new IBinaryHashFunctionFamily[]
+ // {MurmurHash3BinaryHashFunctionFamily.INSTANCE},
+ hashfuncStartLevel,
+ new Integer64NormalizedKeyComputerFactory(),
+ new MergeKmerAggregateFactory(),
+ new DistributedMergeLmerAggregateFactory(), outputRec, true);
+ }
+
+ private void generateDescriptorbyType(JobSpecification jobSpec)
+ throws HyracksDataException {
+ int[] keyFields = new int[] { 0 }; // the id of grouped key
+
+ outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ Integer64SerializerDeserializer.INSTANCE,
+ ByteSerializerDeserializer.INSTANCE,
+ ByteSerializerDeserializer.INSTANCE });
+ switch (groupbyType) {
+ case EXTERNAL:
+ singleGrouper = newExternalGroupby(jobSpec, keyFields, new MergeKmerAggregateFactory());
+ connPartition = new MToNPartitioningConnectorDescriptor(jobSpec,
+ new KmerHashPartitioncomputerFactory());
+ crossGrouper = newExternalGroupby(jobSpec, keyFields,new DistributedMergeLmerAggregateFactory());
+ break;
+ case PRECLUSTER:
+ singleGrouper = newExternalGroupby(jobSpec, keyFields,new MergeKmerAggregateFactory());
+ connPartition = new MToNPartitioningMergingConnectorDescriptor(
+ jobSpec,
+ new KmerHashPartitioncomputerFactory(),
+ keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(LongPointable.FACTORY) });
+ crossGrouper = new PreclusteredGroupOperatorDescriptor(
+ jobSpec,
+ keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(LongPointable.FACTORY) },
+ new DistributedMergeLmerAggregateFactory(), outputRec);
+ break;
+ case HYBRIDHASH:
+ default:
+ long inputSizeInRawRecords = conf.getLong(
+ GenomixJob.GROUPBY_HYBRID_INPUTSIZE, 154000000);
+ long inputSizeInUniqueKeys = conf.getLong(
+ GenomixJob.GROUPBY_HYBRID_INPUTKEYS, 38500000);
+ int recordSizeInBytes = conf.getInt(
+ GenomixJob.GROUPBY_HYBRID_RECORDSIZE_SINGLE, 9);
+ int hashfuncStartLevel = conf.getInt(
+ GenomixJob.GROUPBY_HYBRID_HASHLEVEL, 1);
+
+ singleGrouper = newHybridGroupby(jobSpec, keyFields,
+ inputSizeInRawRecords, inputSizeInUniqueKeys,
+ recordSizeInBytes, hashfuncStartLevel);
+ connPartition = new MToNPartitioningConnectorDescriptor(jobSpec,
+ new KmerHashPartitioncomputerFactory());
+ /** here read the different recordSize why ? */
+ recordSizeInBytes = conf.getInt(
+ GenomixJob.GROUPBY_HYBRID_RECORDSIZE_CROSS, 13);
+ crossGrouper = newHybridGroupby(jobSpec, keyFields,
+ inputSizeInRawRecords, inputSizeInUniqueKeys,
+ recordSizeInBytes, hashfuncStartLevel);
+ break;
+ }
+ }
+
+ @Override
+ public JobSpecification generateJob() throws HyracksDataException {
+
+ JobSpecification jobSpec = new JobSpecification();
+ //File input
+ FileScanDescriptor scan = new FileScanDescriptor(jobSpec, kmers, inputPaths);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, scan,ncNodeNames);
+
+ generateDescriptorbyType(jobSpec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+ singleGrouper, ncNodeNames);
+
+ IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(
+ jobSpec);
+ jobSpec.connect(readfileConn, scan, 0, singleGrouper, 0);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+ crossGrouper, ncNodeNames);
+ jobSpec.connect(connPartition, singleGrouper, 0, crossGrouper, 0);
+
+ //Output
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(jobSpec,
+ outputPath.getName());
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, printer,
+ ncNodeNames);
+
+ IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
+ jobSpec.connect(printConn, crossGrouper, 0, printer, 0);
+ jobSpec.addRoot(printer);
+
+ if (groupbyType == GroupbyType.PRECLUSTER) {
+ jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ }
+ return jobSpec;
+ }
+
+ @Override
+ protected void initJobConfiguration() {
+ kmers = conf.getInt(GenomixJob.KMER_LENGTH, 25);
+ frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT, 4096);
+ tableSize = conf.getInt(GenomixJob.TABLE_SIZE, 10485767);
+ inputPaths = FileInputFormat.getInputPaths(genomixJob);
+ outputPath = FileOutputFormat.getOutputPath(genomixJob);
+
+ String type = conf.get(GenomixJob.GROUPBY_TYPE, "hybrid");
+ if (type.equalsIgnoreCase("external")) {
+ groupbyType = GroupbyType.EXTERNAL;
+ } else if (type.equalsIgnoreCase("precluster")) {
+ groupbyType = GroupbyType.PRECLUSTER;
+ } else {
+ groupbyType = GroupbyType.HYBRIDHASH;
+ }
+ }
+
+}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java
new file mode 100644
index 0000000..d221834
--- /dev/null
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java
@@ -0,0 +1,23 @@
+package edu.uci.ics.genomix.job;
+
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class JobGenContigsGeneration extends JobGen {
+
+ public JobGenContigsGeneration(GenomixJob job) {
+ super(job);
+ }
+
+ @Override
+ public JobSpecification generateJob() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ protected void initJobConfiguration() {
+ // TODO Auto-generated method stub
+
+ }
+
+}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java
new file mode 100644
index 0000000..6b7d98e
--- /dev/null
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java
@@ -0,0 +1,23 @@
+package edu.uci.ics.genomix.job;
+
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class JobGenGraphCleanning extends JobGen {
+
+ public JobGenGraphCleanning(GenomixJob job) {
+ super(job);
+ }
+
+ @Override
+ public JobSpecification generateJob() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ protected void initJobConfiguration() {
+ // TODO Auto-generated method stub
+
+ }
+
+}
diff --git a/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java b/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
new file mode 100644
index 0000000..5de25ba
--- /dev/null
+++ b/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
@@ -0,0 +1,113 @@
+package edu.uci.ics.genomix.example.jobrun;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.Test;
+
+import edu.uci.ics.genomix.driver.Driver;
+import edu.uci.ics.hyracks.hdfs.utils.HyracksUtils;
+
+public class JobRunTestCase extends TestCase{
+ private static final String ACTUAL_RESULT_DIR = "actual";
+ private static final String EXPECTED_RESULT_PATH = "src/test/resources/expected";
+ private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+
+ private static final String DATA_PATH = "src/test/resources/data/customer.tbl";
+ private static final String HDFS_INPUT_PATH = "/customer/";
+ private static final String HDFS_OUTPUT_PATH = "/customer_result/";
+
+ private static final String HYRACKS_APP_NAME = "genomix";
+ private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
+ private MiniDFSCluster dfsCluster;
+
+ private JobConf conf = new JobConf();
+ private int numberOfNC = 2;
+ private int numPartitionPerMachine=2;
+
+ private Driver myDriver;
+ @Override
+ protected void setUp() throws Exception {
+ cleanupStores();
+ HyracksUtils.init();
+ HyracksUtils.createApp(HYRACKS_APP_NAME);
+ FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+ FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+ startHDFS();
+
+ myDriver = new Driver(HyracksUtils.CC_HOST, HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
+ }
+
+ private void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
+
+ private void startHDFS() throws IOException {
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+ FileSystem dfs = FileSystem.get(conf);
+ Path src = new Path(DATA_PATH);
+ Path dest = new Path(HDFS_INPUT_PATH);
+ Path result = new Path(HDFS_OUTPUT_PATH);
+ dfs.mkdirs(dest);
+ dfs.mkdirs(result);
+ dfs.copyFromLocalFile(src, dest);
+
+ DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
+ conf.writeXml(confOutput);
+ confOutput.flush();
+ confOutput.close();
+ }
+
+
+ @Override
+ protected void runTest() throws Throwable {
+ TestExternalGroupby();
+ TestPreClusterGroupby();
+ TestHybridGroupby();
+ }
+
+ void TestExternalGroupby() throws Exception{
+ // TODO
+ }
+
+ void TestPreClusterGroupby() throws Exception{
+ // TODO
+ }
+
+ void TestHybridGroupby() throws Exception{
+ // TODO
+ }
+
+
+ @Override
+ protected void tearDown() throws Exception {
+ HyracksUtils.destroyApp(HYRACKS_APP_NAME);
+ HyracksUtils.deinit();
+ cleanupHDFS();
+ }
+
+ private void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ }
+
+}