git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2713 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
index eeeecea..23822a5 100644
--- a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
+++ b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -1,6 +1,13 @@
package edu.uci.ics.genomix.dataflow;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -18,11 +25,16 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
@@ -31,14 +43,21 @@
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
import edu.uci.ics.hyracks.data.std.primitive.LongPointable;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
public class Tester {
@@ -46,10 +65,14 @@
private static final Logger LOGGER = Logger.getLogger(Tester.class.getName());
public static final String NC1_ID = "nc1";
public static final String NC2_ID = "nc2";
+ public static final String NC3_ID = "nc3";
+ public static final String NC4_ID = "nc4";
private static ClusterControllerService cc;
private static NodeControllerService nc1;
private static NodeControllerService nc2;
+ private static NodeControllerService nc3;
+ private static NodeControllerService nc4;
private static IHyracksClientConnection hcc;
//private static final boolean DEBUG = true;
@@ -78,29 +101,28 @@
String file_name = args[0];
k = Integer.parseInt(args[1]);
page_num = Integer.parseInt(args[2]);
+ int type = Integer.parseInt(args[3]);
- JobSpecification job = createJob(file_name, k, page_num);
+ JobSpecification job = createJob(file_name, k, page_num, type);
long start = System.currentTimeMillis();
JobId jobId = hcc.startJob("test", job);
hcc.waitForCompletion(jobId);
long end = System.currentTimeMillis();
System.err.println(start + " " + end + " " + (end - start));
- }
+
+ FileOutputStream filenames;
- // private static FileSplit[] parseFileSplits(String fileSplits) {
- // String[] splits = fileSplits.split(",");
- // FileSplit[] fSplits = new FileSplit[splits.length];
- // for (int i = 0; i < splits.length; ++i) {
- // String s = splits[i].trim();
- // int idx = s.indexOf(':');
- // if (idx < 0) {
- // throw new IllegalArgumentException("File split " + s + " not well formed");
- // }
- // fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
- // }
- // return fSplits;
- // }
+ String s = "g:\\data\\results.txt" ;
+
+ filenames = new FileOutputStream(s);
+ // filenames = new FileInputStream("filename.txt");
+
+ BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(filenames));
+ writer.write((int) (end-start));
+ writer.close();
+
+ }
public static void init() throws Exception {
CCConfig ccConfig = new CCConfig();
@@ -135,6 +157,24 @@
ncConfig2.nodeId = NC2_ID;
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
+
+ NCConfig ncConfig3 = new NCConfig();
+ ncConfig3.ccHost = "localhost";
+ ncConfig3.ccPort = 39001;
+ ncConfig3.clusterNetIPAddress = "127.0.0.1";
+ ncConfig3.dataIPAddress = "127.0.0.1";
+ ncConfig3.nodeId = NC3_ID;
+ nc3 = new NodeControllerService(ncConfig3);
+ nc3.start();
+
+ NCConfig ncConfig4 = new NCConfig();
+ ncConfig4.ccHost = "localhost";
+ ncConfig4.ccPort = 39001;
+ ncConfig4.clusterNetIPAddress = "127.0.0.1";
+ ncConfig4.dataIPAddress = "127.0.0.1";
+ ncConfig4.nodeId = NC4_ID;
+ nc4 = new NodeControllerService(ncConfig4);
+ nc4.start();
hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
hcc.createApplication("test", null);
@@ -143,263 +183,132 @@
}
}
- private static JobSpecification createJob(String filename, int k, int page_num) throws HyracksDataException {
+ private static JobSpecification createJob(String filename, int k, int page_num, int type) throws HyracksDataException {
JobSpecification spec = new JobSpecification();
- /*
- * IFileSplitProvider custSplitsProvider = new
- * ConstantFileSplitProvider(customerSplits);
- * //ConstantFileSplitProvider is the operator to provide the tuples
- * RecordDescriptor custDesc = new RecordDescriptor(new
- * ISerializerDeserializer[] {
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE });
- *
- * IFileSplitProvider ordersSplitsProvider = new
- * ConstantFileSplitProvider(orderSplits); RecordDescriptor ordersDesc =
- * new RecordDescriptor(new ISerializerDeserializer[] {
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE });
- *
- * RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new
- * ISerializerDeserializer[] {
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE,
- * UTF8StringSerializerDeserializer.INSTANCE });
- *
- * FileScanOperatorDescriptor ordScanner = new
- * FileScanOperatorDescriptor(spec, ordersSplitsProvider, new
- * DelimitedDataTupleParserFactory(new IValueParserFactory[] {
- * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- * UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
- * createPartitionConstraint(spec, ordScanner, orderSplits);
- *
- * FileScanOperatorDescriptor custScanner = new
- * FileScanOperatorDescriptor(spec, custSplitsProvider, new
- * DelimitedDataTupleParserFactory(new IValueParserFactory[] {
- * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE },
- * '|'), custDesc); createPartitionConstraint(spec, custScanner,
- * customerSplits);
- *
- * IOperatorDescriptor join;
- *
- * if ("nestedloop".equalsIgnoreCase(algo)) { join = new
- * NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
- * PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0,
- * 1), custOrderJoinDesc, memSize);
- *
- * } else if ("gracehash".equalsIgnoreCase(algo)) { join = new
- * GraceHashJoinOperatorDescriptor( spec, memSize, graceInputSize,
- * graceRecordsPerFrame, graceFactor, new int[] { 0 }, new int[] { 1 },
- * new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- * .of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] {
- * PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- * custOrderJoinDesc);
- *
- * } else if ("hybridhash".equalsIgnoreCase(algo)) { join = new
- * HybridHashJoinOperatorDescriptor( spec, memSize, graceInputSize,
- * graceRecordsPerFrame, graceFactor, new int[] { 0 }, new int[] { 1 },
- * new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- * .of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] {
- * PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- * custOrderJoinDesc);
- *
- * } else { join = new InMemoryHashJoinOperatorDescriptor( spec, new
- * int[] { 0 }, new int[] { 1 }, new IBinaryHashFunctionFactory[] {
- * PointableBinaryHashFunctionFactory .of(UTF8StringPointable.FACTORY)
- * }, new IBinaryComparatorFactory[] {
- * PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- * custOrderJoinDesc, 6000000); }
- *
- * PartitionConstraintHelper.addPartitionCountConstraint(spec, join,
- * numJoinPartitions);
- *
- * IConnectorDescriptor ordJoinConn = new
- * MToNPartitioningConnectorDescriptor(spec, new
- * FieldHashPartitionComputerFactory(new int[] { 1 }, new
- * IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- * .of(UTF8StringPointable.FACTORY) })); spec.connect(ordJoinConn,
- * ordScanner, 0, join, 1);
- *
- * IConnectorDescriptor custJoinConn = new
- * MToNPartitioningConnectorDescriptor(spec, new
- * FieldHashPartitionComputerFactory(new int[] { 0 }, new
- * IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- * .of(UTF8StringPointable.FACTORY) })); spec.connect(custJoinConn,
- * custScanner, 0, join, 0);
- *
- * IOperatorDescriptor endingOp = join;
- *
- * if (hasGroupBy) {
- *
- * RecordDescriptor groupResultDesc = new RecordDescriptor(new
- * ISerializerDeserializer[] {
- * UTF8StringSerializerDeserializer.INSTANCE,
- * IntegerSerializerDeserializer.INSTANCE });
- *
- * HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(
- * spec, new int[] { 6 }, new FieldHashPartitionComputerFactory(new
- * int[] { 6 }, new IBinaryHashFunctionFactory[] {
- * PointableBinaryHashFunctionFactory .of(UTF8StringPointable.FACTORY)
- * }), new IBinaryComparatorFactory[] {
- * PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- * new MultiFieldsAggregatorFactory( new
- * IFieldAggregateDescriptorFactory[] { new
- * CountFieldAggregatorFactory(true) }), groupResultDesc, 16);
- * createPartitionConstraint(spec, gby, resultSplits);
- *
- * IConnectorDescriptor joinGroupConn = new
- * MToNPartitioningConnectorDescriptor(spec, new
- * FieldHashPartitionComputerFactory(new int[] { 6 }, new
- * IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- * .of(UTF8StringPointable.FACTORY) })); spec.connect(joinGroupConn,
- * join, 0, gby, 0);
- *
- * endingOp = gby; }
- */
-
- // IOperatorDescriptor printer = DEBUG ? new
- // PrinterOperatorDescriptor(spec)
- // : new NullSinkOperatorDescriptor(spec);
- // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- // printer, NC1_ID);
-
- spec.setFrameSize(128);
+ spec.setFrameSize(32768);
FileScanDescriptor scan = new FileScanDescriptor(spec, k);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
+ //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID);
RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,
IntegerSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
- int frameLimits = 10;
- int tableSize = 128;
+ int frameLimits = 4096;
+ int tableSize = 10485767;
- ExternalGroupOperatorDescriptor single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
- new Integer64NormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
+ AbstractOperatorDescriptor single_grouper;
+ IConnectorDescriptor conn_partition;
+ AbstractOperatorDescriptor cross_grouper;
+
- new MergeKmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
- outputRec, new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(LongPointable.FACTORY) }), tableSize), true);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID, NC2_ID);
+
+ if(0 == type){//external group by
+ single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
+ new Integer64NormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
+ new DistributedMergeLmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
+ outputRec, new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(LongPointable.FACTORY) }), tableSize), true);
+
+ conn_partition = new MToNPartitioningConnectorDescriptor(spec,
+ new KmerHashPartitioncomputerFactory());
+ cross_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
+ new Integer64NormalizedKeyComputerFactory(), new DistributedMergeLmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
+
+ new DistributedMergeLmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
+ outputRec, new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(LongPointable.FACTORY) }), tableSize), true);
+ }
+ else if( 1 == type){
+ single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
+ new Integer64NormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
+
+ new DistributedMergeLmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
+ outputRec, new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(LongPointable.FACTORY) }), tableSize), true);
+ conn_partition = new MToNPartitioningMergingConnectorDescriptor(spec, new KmerHashPartitioncomputerFactory(),
+ keyFields, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY)} );
+ cross_grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) }, new DistributedMergeLmerAggregateFactory(),
+ outputRec);
+ }
+ else{
+ long inputSizeInRawRecords = 32768;
+ long inputSizeInUniqueKeys = 32768;
+ int recordSizeInBytes = 9;
+ int hashfuncStartLevel = 1;
+ single_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,
+ frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
+ new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
+ hashfuncStartLevel,
+ new Integer64NormalizedKeyComputerFactory(),
+ new DistributedMergeLmerAggregateFactory(),
+ new DistributedMergeLmerAggregateFactory(),
+ outputRec);
+ conn_partition = new MToNPartitioningConnectorDescriptor(spec,
+ new KmerHashPartitioncomputerFactory());
+ recordSizeInBytes = 13;
+ cross_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,
+ frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
+ new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
+ hashfuncStartLevel,
+ new Integer64NormalizedKeyComputerFactory(),
+ new DistributedMergeLmerAggregateFactory(),
+ new DistributedMergeLmerAggregateFactory(),
+ outputRec);
+
+ }
+
+ //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
+
IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(spec);
spec.connect(readfileConn, scan, 0, single_grouper, 0);
- IConnectorDescriptor conn_partition = new MToNPartitioningConnectorDescriptor(spec,
- new KmerHashPartitioncomputerFactory());
-
- ExternalGroupOperatorDescriptor cross_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
- new Integer64NormalizedKeyComputerFactory(), new DistributedMergeLmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
-
- new DistributedMergeLmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
- outputRec, new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(LongPointable.FACTORY) }), tableSize), true);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper, NC1_ID, NC2_ID);
+ //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper,NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
+ //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);
spec.connect(printConn, cross_grouper, 0, printer, 0);
- //spec.connect(printConn, cross_grouper, 0, printer, 0);
-
- /*
- * GenKmerDescriptor kmerGen = new GenKmerDescriptor(spec, page_num, k);
- * PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- * kmerGen, NC1_ID);
- *
- * IConnectorDescriptor readfileConn = new
- * OneToOneConnectorDescriptor(spec); spec.connect(readfileConn, scan,
- * 0, kmerGen, 0);
- *
- * PrinterOperatorDescriptor printer = new
- * PrinterOperatorDescriptor(spec);
- * PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- * printer, NC1_ID);
- *
- * IConnectorDescriptor printConn = new
- * OneToOneConnectorDescriptor(spec); //spec.connect(printConn, scan, 0,
- * printer, 0);
- *
- * //IConnectorDescriptor printConn = new
- * OneToOneConnectorDescriptor(spec); spec.connect(printConn, kmerGen,
- * 0, printer, 0);
- *
- * //IFileSplitProvider outSplitProvider = new
- * ConstantFileSplitProvider(resultSplits);
- * //FrameFileWriterOperatorDescriptor writer = new
- * FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
- * //createPartitionConstraint(spec, writer, resultSplits);
- *
- * //IConnectorDescriptor endingPrinterConn = new
- * OneToOneConnectorDescriptor(spec); //spec.connect(endingPrinterConn,
- * endingOp, 0, writer, 0);
- */
-
spec.addRoot(printer);
+ if( 1 == type ){
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ }
// System.out.println(spec.toString());
return spec;
}
- // private static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
- // String[] parts = new String[splits.length];
- // for (int i = 0; i < splits.length; ++i) {
- // parts[i] = splits[i].getNodeName();
- // }
- // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);
- // }
static class JoinComparatorFactory implements ITuplePairComparatorFactory {
private static final long serialVersionUID = 1L;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index 62320c5..1cd98fb 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -139,7 +139,7 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("open(" + pid + " by " + taId);
}
- fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString());
+ fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString().replace(':', '_'));
handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
size = 0;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
index 47a8616..0af2dd8 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
@@ -48,10 +48,10 @@
@Override
public void writeData(Object[] data) throws HyracksDataException {
for (int i = 0; i < data.length; ++i) {
- System.err.print(StringSerializationUtils.toString(data[i]));
- System.err.print(", ");
+ // System.err.print(StringSerializationUtils.toString(data[i]));
+ // System.err.print(", ");
}
- System.err.println();
+ //System.err.println();
}
@Override